Skip to content

Commit

Permalink
Range Query Optimization (For sequential Vindex types) (#17342)
Browse files Browse the repository at this point in the history
Signed-off-by: c-r-dev <[email protected]>
Signed-off-by: Andres Taylor <[email protected]>
Co-authored-by: Andres Taylor <[email protected]>
  • Loading branch information
c-r-dev and systay authored Dec 20, 2024
1 parent 4299951 commit 9714713
Show file tree
Hide file tree
Showing 14 changed files with 611 additions and 77 deletions.
26 changes: 26 additions & 0 deletions go/test/endtoend/vtgate/plan_tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/planbuilder"
)
Expand Down Expand Up @@ -128,6 +129,31 @@ func start(t *testing.T) (utils.MySQLCompare, func()) {
}
}

// splitSQL statements - querySQL may be a multi-line sql blob
func splitSQL(querySQL ...string) ([]string, error) {
parser := sqlparser.NewTestParser()
var sqls []string
for _, sql := range querySQL {
split, err := parser.SplitStatementToPieces(sql)
if err != nil {
return nil, err
}
sqls = append(sqls, split...)
}
return sqls, nil
}

func loadSampleData(t *testing.T, mcmp utils.MySQLCompare) {
sampleDataSQL := readFile("sampledata/user.sql")
insertSQL, err := splitSQL(sampleDataSQL)
if err != nil {
require.NoError(t, err)
}
for _, sql := range insertSQL {
mcmp.ExecNoCompare(sql)
}
}

func readJSONTests(filename string) []planbuilder.PlanTest {
var output []planbuilder.PlanTest
file, err := os.Open(locateFile(filename))
Expand Down
32 changes: 18 additions & 14 deletions go/test/endtoend/vtgate/plan_tests/plan_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,25 @@ import (
"vitess.io/vitess/go/test/endtoend/utils"
)

func TestSelectCases(t *testing.T) {
func TestE2ECases(t *testing.T) {
e2eTestCaseFiles := []string{"select_cases.json", "filter_cases.json"}
mcmp, closer := start(t)
defer closer()
tests := readJSONTests("select_cases.json")
for _, test := range tests {
mcmp.Run(test.Comment, func(mcmp *utils.MySQLCompare) {
if test.SkipE2E {
mcmp.AsT().Skip(test.Query)
}
mcmp.Exec(test.Query)
pd := utils.ExecTrace(mcmp.AsT(), mcmp.VtConn, test.Query)
verifyTestExpectations(mcmp.AsT(), pd, test)
if mcmp.VtConn.IsClosed() {
mcmp.AsT().Fatal("vtgate connection is closed")
}
})
loadSampleData(t, mcmp)
for _, fileName := range e2eTestCaseFiles {
tests := readJSONTests(fileName)
for _, test := range tests {
mcmp.Run(test.Comment, func(mcmp *utils.MySQLCompare) {
if test.SkipE2E {
mcmp.AsT().Skip(test.Query)
}
mcmp.Exec(test.Query)
pd := utils.ExecTrace(mcmp.AsT(), mcmp.VtConn, test.Query)
verifyTestExpectations(mcmp.AsT(), pd, test)
if mcmp.VtConn.IsClosed() {
mcmp.AsT().Fatal("vtgate connection is closed")
}
})
}
}
}
5 changes: 5 additions & 0 deletions go/vt/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func Empty(id []byte) bool {
// KeyRange helper methods
//

// Make a Key Range
func NewKeyRange(start []byte, end []byte) *topodatapb.KeyRange {
return &topodatapb.KeyRange{Start: start, End: end}
}

// KeyRangeAdd adds two adjacent KeyRange values (in any order) into a single value. If the values are not adjacent,
// it returns false.
func KeyRangeAdd(a, b *topodatapb.KeyRange) (*topodatapb.KeyRange, bool) {
Expand Down
43 changes: 43 additions & 0 deletions go/vt/vtgate/engine/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const (
// IN is for routing a statement to a multi shard.
// Requires: A Vindex, and a multi Values.
IN
// Between is for routing a statement to a multi shard
// Requires: A Vindex, and start and end Value.
Between
// MultiEqual is used for routing queries with IN with tuple clause
// Requires: A Vindex, and a multi Tuple Values.
MultiEqual
Expand Down Expand Up @@ -78,6 +81,7 @@ var opName = map[Opcode]string{
EqualUnique: "EqualUnique",
Equal: "Equal",
IN: "IN",
Between: "Between",
MultiEqual: "MultiEqual",
Scatter: "Scatter",
DBA: "DBA",
Expand Down Expand Up @@ -157,6 +161,14 @@ func (rp *RoutingParameters) findRoute(ctx context.Context, vcursor VCursor, bin
default:
return rp.in(ctx, vcursor, bindVars)
}
case Between:
switch rp.Vindex.(type) {
case vindexes.SingleColumn:
return rp.between(ctx, vcursor, bindVars)
default:
// Only SingleColumn vindex supported.
return nil, nil, vterrors.VT13001("between supported on SingleColumn vindex only")
}
case MultiEqual:
switch rp.Vindex.(type) {
case vindexes.MultiColumn:
Expand Down Expand Up @@ -396,6 +408,19 @@ func (rp *RoutingParameters) inMultiCol(ctx context.Context, vcursor VCursor, bi
return rss, shardVarsMultiCol(bindVars, mapVals, isSingleVal), nil
}

func (rp *RoutingParameters) between(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) {
env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor)
value, err := env.Evaluate(rp.Values[0])
if err != nil {
return nil, nil, err
}
rss, values, err := resolveShardsBetween(ctx, vcursor, rp.Vindex.(vindexes.Sequential), rp.Keyspace, value.TupleValues())
if err != nil {
return nil, nil, err
}
return rss, shardVars(bindVars, values), nil
}

func (rp *RoutingParameters) multiEqual(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) {
env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor)
value, err := env.Evaluate(rp.Values[0])
Expand Down Expand Up @@ -520,6 +545,24 @@ func buildMultiColumnVindexValues(shardsValues [][][]sqltypes.Value) [][][]*quer
return shardsIds
}

func resolveShardsBetween(ctx context.Context, vcursor VCursor, vindex vindexes.Sequential, keyspace *vindexes.Keyspace, vindexKeys []sqltypes.Value) ([]*srvtopo.ResolvedShard, [][]*querypb.Value, error) {
// Convert vindexKeys to []*querypb.Value
ids := make([]*querypb.Value, len(vindexKeys))
for i, vik := range vindexKeys {
ids[i] = sqltypes.ValueToProto(vik)
}

// RangeMap using the Vindex
destinations, err := vindex.RangeMap(ctx, vcursor, vindexKeys[0], vindexKeys[1])
if err != nil {
return nil, nil, err

}

// And use the Resolver to map to ResolvedShards.
return vcursor.ResolveDestinations(ctx, keyspace.Name, ids, destinations)
}

func shardVars(bv map[string]*querypb.BindVariable, mapVals [][]*querypb.Value) []map[string]*querypb.BindVariable {
shardVars := make([]map[string]*querypb.BindVariable, len(mapVals))
for i, vals := range mapVals {
Expand Down
40 changes: 40 additions & 0 deletions go/vt/vtgate/planbuilder/operators/sharded_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ func (tr *ShardedRouting) resetRoutingLogic(ctx *plancontext.PlanningContext) Ro
func (tr *ShardedRouting) searchForNewVindexes(ctx *plancontext.PlanningContext, predicate sqlparser.Expr) (Routing, bool) {
newVindexFound := false
switch node := predicate.(type) {
case *sqlparser.BetweenExpr:
return tr.planBetweenOp(ctx, node)

case *sqlparser.ComparisonExpr:
return tr.planComparison(ctx, node)

Expand All @@ -234,6 +237,35 @@ func (tr *ShardedRouting) searchForNewVindexes(ctx *plancontext.PlanningContext,
return nil, newVindexFound
}

func (tr *ShardedRouting) planBetweenOp(ctx *plancontext.PlanningContext, node *sqlparser.BetweenExpr) (routing Routing, foundNew bool) {
column, ok := node.Left.(*sqlparser.ColName)
if !ok {
return nil, false
}
var vdValue sqlparser.ValTuple = sqlparser.ValTuple([]sqlparser.Expr{node.From, node.To})

opcode := func(vindex *vindexes.ColumnVindex) engine.Opcode {
if _, ok := vindex.Vindex.(vindexes.Sequential); ok {
return engine.Between
}
return engine.Scatter
}

sequentialVdx := func(vindex *vindexes.ColumnVindex) vindexes.Vindex {
if _, ok := vindex.Vindex.(vindexes.Sequential); ok {
return vindex.Vindex
}
// if vindex is not of type Sequential, we can't use this vindex at all
return nil
}

val := makeEvalEngineExpr(ctx, vdValue)
if val == nil {
return nil, false
}
return nil, tr.haveMatchingVindex(ctx, node, vdValue, column, val, opcode, sequentialVdx)
}

func (tr *ShardedRouting) planComparison(ctx *plancontext.PlanningContext, cmp *sqlparser.ComparisonExpr) (routing Routing, foundNew bool) {
switch cmp.Operator {
case sqlparser.EqualOp:
Expand Down Expand Up @@ -332,6 +364,8 @@ func (tr *ShardedRouting) Cost() int {
return 5
case engine.IN:
return 10
case engine.Between:
return 10
case engine.MultiEqual:
return 10
case engine.Scatter:
Expand Down Expand Up @@ -441,6 +475,12 @@ func (tr *ShardedRouting) processMultiColumnVindex(
return newVindexFound
}

routeOpcode := opcode(v.ColVindex)
vindex := vfunc(v.ColVindex)
if vindex == nil || routeOpcode == engine.Scatter {
return newVindexFound
}

var newOption []*VindexOption
for _, op := range v.Options {
if op.Ready {
Expand Down
Loading

0 comments on commit 9714713

Please sign in to comment.