Skip to content

Commit

Permalink
feat: stop receiving data from tablet when we know we don't need it
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Dec 18, 2024
1 parent 880284d commit 128b3d8
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
17 changes: 13 additions & 4 deletions go/vt/vtgate/engine/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package engine
import (
"context"
"fmt"
"io"
"strconv"
"sync"

Expand All @@ -36,9 +37,10 @@ var _ Primitive = (*Limit)(nil)

// Limit is a primitive that performs the LIMIT operation.
type Limit struct {
Count evalengine.Expr
Offset evalengine.Expr
Input Primitive
Count evalengine.Expr
Offset evalengine.Expr
RequireCompleteInput bool
Input Primitive
}

var UpperLimitStr = "__upper_limit"
Expand Down Expand Up @@ -141,9 +143,16 @@ func (l *Limit) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars
return err
}

return nil
if l.RequireCompleteInput || vcursor.Session().InTransaction() {
return nil
}

return io.EOF
})

if err == io.EOF {
return nil
}
if err != nil {
return err
}
Expand Down
19 changes: 9 additions & 10 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ import (
"strconv"
"strings"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/slice"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/sysvars"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/engine/opcode"
Expand Down Expand Up @@ -887,18 +885,18 @@ func transformUnionPlan(ctx *plancontext.PlanningContext, op *operators.Union) (
}

func transformLimit(ctx *plancontext.PlanningContext, op *operators.Limit) (engine.Primitive, error) {
plan, err := transformToPrimitive(ctx, op.Source)
input, err := transformToPrimitive(ctx, op.Source)
if err != nil {
return nil, err
}

return createLimit(plan, op.AST, ctx.VSchema.Environment(), ctx.VSchema.ConnCollation())
return createLimit(ctx, input, op.AST)
}

func createLimit(input engine.Primitive, limit *sqlparser.Limit, env *vtenv.Environment, coll collations.ID) (engine.Primitive, error) {
func createLimit(ctx *plancontext.PlanningContext, input engine.Primitive, limit *sqlparser.Limit) (engine.Primitive, error) {
cfg := &evalengine.Config{
Collation: coll,
Environment: env,
Collation: ctx.VSchema.ConnCollation(),
Environment: ctx.VSchema.Environment(),
}
count, err := evalengine.Translate(limit.Rowcount, cfg)
if err != nil {
Expand All @@ -913,9 +911,10 @@ func createLimit(input engine.Primitive, limit *sqlparser.Limit, env *vtenv.Envi
}

return &engine.Limit{
Input: input,
Count: count,
Offset: offset,
Count: count,
Offset: offset,
RequireCompleteInput: ctx.SemTable.ShouldFetchLastInsertID(),
Input: input,
}, nil
}

Expand Down

0 comments on commit 128b3d8

Please sign in to comment.