Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Range Query Optimization (For sequential Vindex types) #17342

Merged
merged 16 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions go/test/endtoend/vtgate/plan_tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/planbuilder"
)
Expand Down Expand Up @@ -128,6 +129,31 @@ func start(t *testing.T) (utils.MySQLCompare, func()) {
}
}

// splitSQL statements - querySQL may be a multi-line sql blob
func splitSQL(querySQL ...string) ([]string, error) {
parser := sqlparser.NewTestParser()
var sqls []string
for _, sql := range querySQL {
split, err := parser.SplitStatementToPieces(sql)
if err != nil {
return nil, err
}
sqls = append(sqls, split...)
}
return sqls, nil
}

func loadSampleData(t *testing.T, mcmp utils.MySQLCompare) {
sampleDataSQL := readFile("sampledata/user.sql")
insertSQL, err := splitSQL(sampleDataSQL)
if err != nil {
require.NoError(t, err)
}
for _, sql := range insertSQL {
mcmp.ExecNoCompare(sql)
}
}

func readJSONTests(filename string) []planbuilder.PlanTest {
var output []planbuilder.PlanTest
file, err := os.Open(locateFile(filename))
Expand Down
32 changes: 18 additions & 14 deletions go/test/endtoend/vtgate/plan_tests/plan_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,25 @@ import (
"vitess.io/vitess/go/test/endtoend/utils"
)

func TestSelectCases(t *testing.T) {
func TestE2ECases(t *testing.T) {
e2eTestCaseFiles := []string{"select_cases.json", "filter_cases.json"}
mcmp, closer := start(t)
defer closer()
tests := readJSONTests("select_cases.json")
for _, test := range tests {
mcmp.Run(test.Comment, func(mcmp *utils.MySQLCompare) {
if test.SkipE2E {
mcmp.AsT().Skip(test.Query)
}
mcmp.Exec(test.Query)
pd := utils.ExecTrace(mcmp.AsT(), mcmp.VtConn, test.Query)
verifyTestExpectations(mcmp.AsT(), pd, test)
if mcmp.VtConn.IsClosed() {
mcmp.AsT().Fatal("vtgate connection is closed")
}
})
loadSampleData(t, mcmp)
for _, fileName := range e2eTestCaseFiles {
tests := readJSONTests(fileName)
for _, test := range tests {
mcmp.Run(test.Comment, func(mcmp *utils.MySQLCompare) {
if test.SkipE2E {
mcmp.AsT().Skip(test.Query)
}
mcmp.Exec(test.Query)
pd := utils.ExecTrace(mcmp.AsT(), mcmp.VtConn, test.Query)
verifyTestExpectations(mcmp.AsT(), pd, test)
if mcmp.VtConn.IsClosed() {
mcmp.AsT().Fatal("vtgate connection is closed")
}
})
}
}
}
5 changes: 5 additions & 0 deletions go/vt/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func Empty(id []byte) bool {
// KeyRange helper methods
//

// Make a Key Range
func NewKeyRange(start []byte, end []byte) *topodatapb.KeyRange {
return &topodatapb.KeyRange{Start: start, End: end}
}

// KeyRangeAdd adds two adjacent KeyRange values (in any order) into a single value. If the values are not adjacent,
// it returns false.
func KeyRangeAdd(a, b *topodatapb.KeyRange) (*topodatapb.KeyRange, bool) {
Expand Down
43 changes: 43 additions & 0 deletions go/vt/vtgate/engine/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const (
// IN is for routing a statement to a multi shard.
// Requires: A Vindex, and a multi Values.
IN
// Between is for routing a statement to a multi shard
// Requires: A Vindex, and start and end Value.
Between
// MultiEqual is used for routing queries with IN with tuple clause
// Requires: A Vindex, and a multi Tuple Values.
MultiEqual
Expand Down Expand Up @@ -78,6 +81,7 @@ var opName = map[Opcode]string{
EqualUnique: "EqualUnique",
Equal: "Equal",
IN: "IN",
Between: "Between",
MultiEqual: "MultiEqual",
Scatter: "Scatter",
DBA: "DBA",
Expand Down Expand Up @@ -157,6 +161,14 @@ func (rp *RoutingParameters) findRoute(ctx context.Context, vcursor VCursor, bin
default:
return rp.in(ctx, vcursor, bindVars)
}
case Between:
switch rp.Vindex.(type) {
case vindexes.SingleColumn:
return rp.between(ctx, vcursor, bindVars)
default:
// Only SingleColumn vindex supported.
return nil, nil, vterrors.VT13001("between supported on SingleColumn vindex only")
}
case MultiEqual:
switch rp.Vindex.(type) {
case vindexes.MultiColumn:
Expand Down Expand Up @@ -396,6 +408,19 @@ func (rp *RoutingParameters) inMultiCol(ctx context.Context, vcursor VCursor, bi
return rss, shardVarsMultiCol(bindVars, mapVals, isSingleVal), nil
}

func (rp *RoutingParameters) between(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) {
env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor)
value, err := env.Evaluate(rp.Values[0])
if err != nil {
return nil, nil, err
}
rss, values, err := resolveShardsBetween(ctx, vcursor, rp.Vindex.(vindexes.Sequential), rp.Keyspace, value.TupleValues())
if err != nil {
return nil, nil, err
}
return rss, shardVars(bindVars, values), nil
}

func (rp *RoutingParameters) multiEqual(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) {
env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor)
value, err := env.Evaluate(rp.Values[0])
Expand Down Expand Up @@ -520,6 +545,24 @@ func buildMultiColumnVindexValues(shardsValues [][][]sqltypes.Value) [][][]*quer
return shardsIds
}

func resolveShardsBetween(ctx context.Context, vcursor VCursor, vindex vindexes.Sequential, keyspace *vindexes.Keyspace, vindexKeys []sqltypes.Value) ([]*srvtopo.ResolvedShard, [][]*querypb.Value, error) {
// Convert vindexKeys to []*querypb.Value
ids := make([]*querypb.Value, len(vindexKeys))
for i, vik := range vindexKeys {
ids[i] = sqltypes.ValueToProto(vik)
}

// RangeMap using the Vindex
destinations, err := vindex.RangeMap(ctx, vcursor, vindexKeys[0], vindexKeys[1])
if err != nil {
return nil, nil, err

}

// And use the Resolver to map to ResolvedShards.
return vcursor.ResolveDestinations(ctx, keyspace.Name, ids, destinations)
}

func shardVars(bv map[string]*querypb.BindVariable, mapVals [][]*querypb.Value) []map[string]*querypb.BindVariable {
shardVars := make([]map[string]*querypb.BindVariable, len(mapVals))
for i, vals := range mapVals {
Expand Down
40 changes: 40 additions & 0 deletions go/vt/vtgate/planbuilder/operators/sharded_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ func (tr *ShardedRouting) resetRoutingLogic(ctx *plancontext.PlanningContext) Ro
func (tr *ShardedRouting) searchForNewVindexes(ctx *plancontext.PlanningContext, predicate sqlparser.Expr) (Routing, bool) {
newVindexFound := false
switch node := predicate.(type) {
case *sqlparser.BetweenExpr:
return tr.planBetweenOp(ctx, node)

case *sqlparser.ComparisonExpr:
return tr.planComparison(ctx, node)

Expand All @@ -234,6 +237,35 @@ func (tr *ShardedRouting) searchForNewVindexes(ctx *plancontext.PlanningContext,
return nil, newVindexFound
}

func (tr *ShardedRouting) planBetweenOp(ctx *plancontext.PlanningContext, node *sqlparser.BetweenExpr) (routing Routing, foundNew bool) {
column, ok := node.Left.(*sqlparser.ColName)
if !ok {
return nil, false
}
var vdValue sqlparser.ValTuple = sqlparser.ValTuple([]sqlparser.Expr{node.From, node.To})

opcode := func(vindex *vindexes.ColumnVindex) engine.Opcode {
if _, ok := vindex.Vindex.(vindexes.Sequential); ok {
return engine.Between
}
return engine.Scatter
}

sequentialVdx := func(vindex *vindexes.ColumnVindex) vindexes.Vindex {
if _, ok := vindex.Vindex.(vindexes.Sequential); ok {
return vindex.Vindex
}
// if vindex is not of type Sequential, we can't use this vindex at all
return nil
}

val := makeEvalEngineExpr(ctx, vdValue)
if val == nil {
return nil, false
}
return nil, tr.haveMatchingVindex(ctx, node, vdValue, column, val, opcode, sequentialVdx)
}

func (tr *ShardedRouting) planComparison(ctx *plancontext.PlanningContext, cmp *sqlparser.ComparisonExpr) (routing Routing, foundNew bool) {
switch cmp.Operator {
case sqlparser.EqualOp:
Expand Down Expand Up @@ -332,6 +364,8 @@ func (tr *ShardedRouting) Cost() int {
return 5
case engine.IN:
return 10
case engine.Between:
return 10
case engine.MultiEqual:
return 10
case engine.Scatter:
Expand Down Expand Up @@ -441,6 +475,12 @@ func (tr *ShardedRouting) processMultiColumnVindex(
return newVindexFound
}

routeOpcode := opcode(v.ColVindex)
vindex := vfunc(v.ColVindex)
if vindex == nil || routeOpcode == engine.Scatter {
return newVindexFound
}

var newOption []*VindexOption
for _, op := range v.Options {
if op.Ready {
Expand Down
Loading
Loading