Skip to content

Commit

Permalink
Fix AVG() sharded planning (#15626)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Apr 3, 2024
1 parent 18d4128 commit 60eafac
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func TestAggregateTypes(t *testing.T) {
mcmp.SkipIfBinaryIsBelowVersion(19, "vtgate")
mcmp.AssertMatches("select avg(val1) from aggr_test", `[[FLOAT64(0)]]`)
})
mcmp.Run("Average with group by without selecting the grouped columns", func(mcmp *utils.MySQLCompare) {
mcmp.SkipIfBinaryIsBelowVersion(20, "vtgate")
mcmp.AssertMatches("select avg(val2) from aggr_test group by val1 order by val1", `[[DECIMAL(1.0000)] [DECIMAL(1.0000)] [DECIMAL(3.5000)] [NULL] [DECIMAL(1.0000)]]`)
})
}

func TestGroupBy(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtgate/planbuilder/operators/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ var _ selectExpressions = (*Projection)(nil)

// createSimpleProjection returns a projection where all columns are offsets.
// used to change the name and order of the columns in the final output
func createSimpleProjection(ctx *plancontext.PlanningContext, qp *QueryProjection, src Operator) *Projection {
func createSimpleProjection(ctx *plancontext.PlanningContext, selExprs []sqlparser.SelectExpr, src Operator) *Projection {
p := newAliasedProjection(src)
for _, e := range qp.SelectExprs {
ae, err := e.GetAliasedExpr()
if err != nil {
panic(err)
for _, e := range selExprs {
ae, isAe := e.(*sqlparser.AliasedExpr)
if !isAe {
panic(vterrors.VT09015())
}
offset := p.Source.AddColumn(ctx, true, false, ae)
expr := newProjExpr(ae)
Expand Down
21 changes: 12 additions & 9 deletions go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ import (
)

func planQuery(ctx *plancontext.PlanningContext, root Operator) Operator {
var selExpr sqlparser.SelectExprs
if horizon, isHorizon := root.(*Horizon); isHorizon {
sel := sqlparser.GetFirstSelect(horizon.Query)
selExpr = sqlparser.CloneSelectExprs(sel.SelectExprs)
}

output := runPhases(ctx, root)
output = planOffsets(ctx, output)

Expand All @@ -36,7 +42,7 @@ func planQuery(ctx *plancontext.PlanningContext, root Operator) Operator {

output = compact(ctx, output)

return addTruncationOrProjectionToReturnOutput(ctx, root, output)
return addTruncationOrProjectionToReturnOutput(ctx, selExpr, output)
}

// runPhases is the process of figuring out how to perform the operations in the Horizon
Expand Down Expand Up @@ -571,24 +577,21 @@ func tryPushUnion(ctx *plancontext.PlanningContext, op *Union) (Operator, *Apply
}

// addTruncationOrProjectionToReturnOutput uses the original Horizon to make sure that the output columns line up with what the user asked for
func addTruncationOrProjectionToReturnOutput(ctx *plancontext.PlanningContext, oldHorizon Operator, output Operator) Operator {
horizon, ok := oldHorizon.(*Horizon)
if !ok {
func addTruncationOrProjectionToReturnOutput(ctx *plancontext.PlanningContext, selExprs sqlparser.SelectExprs, output Operator) Operator {
if len(selExprs) == 0 {
return output
}

cols := output.GetSelectExprs(ctx)
sel := sqlparser.GetFirstSelect(horizon.Query)
if len(sel.SelectExprs) == len(cols) {
if len(selExprs) == len(cols) {
return output
}

if tryTruncateColumnsAt(output, len(sel.SelectExprs)) {
if tryTruncateColumnsAt(output, len(selExprs)) {
return output
}

qp := horizon.getQP(ctx)
proj := createSimpleProjection(ctx, qp, output)
proj := createSimpleProjection(ctx, selExprs, output)
return proj
}

Expand Down
52 changes: 52 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/select_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,58 @@
]
}
},
{
"comment": "avg in sharded keyspace with group by without selecting the group by columns",
"query": "select avg(intcol) as avg_col from user group by textcol1, textcol2 order by textcol1, textcol2;",
"plan": {
"QueryType": "SELECT",
"Original": "select avg(intcol) as avg_col from user group by textcol1, textcol2 order by textcol1, textcol2;",
"Instructions": {
"OperatorType": "SimpleProjection",
"ColumnNames": [
"avg_col"
],
"Columns": [
0
],
"Inputs": [
{
"OperatorType": "Projection",
"Expressions": [
"sum(intcol) / count(intcol) as avg_col",
":1 as textcol1",
":2 as textcol2"
],
"Inputs": [
{
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "sum(0) AS avg_col, sum_count(3) AS count(intcol)",
"GroupBy": "1 COLLATE latin1_swedish_ci, (2|4) COLLATE ",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select sum(intcol) as avg_col, textcol1, textcol2, count(intcol), weight_string(textcol2) from `user` where 1 != 1 group by textcol1, textcol2, weight_string(textcol2)",
"OrderBy": "1 ASC COLLATE latin1_swedish_ci, (2|4) ASC COLLATE ",
"Query": "select sum(intcol) as avg_col, textcol1, textcol2, count(intcol), weight_string(textcol2) from `user` group by textcol1, textcol2, weight_string(textcol2) order by textcol1 asc, textcol2 asc",
"Table": "`user`"
}
]
}
]
}
]
},
"TablesUsed": [
"user.user"
]
}
},
{
"comment": "don't filter on the vtgate",
"query": "select 42 from dual where false",
Expand Down

0 comments on commit 60eafac

Please sign in to comment.