Skip to content

Commit

Permalink
refactor: remove more errors from operator planning (#14767)
Browse files Browse the repository at this point in the history
  • Loading branch information
systay authored Dec 13, 2023
1 parent 3b58bee commit 4a6da63
Show file tree
Hide file tree
Showing 20 changed files with 136 additions and 338 deletions.
15 changes: 3 additions & 12 deletions go/vt/vtgate/planbuilder/operators/aggregation_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,7 @@ func pushAggregationThroughHashJoin(ctx *plancontext.PlanningContext, rootAggr *

// The grouping columns need to be pushed down as grouping columns on the respective sides
for _, groupBy := range rootAggr.Grouping {
expr, err := rootAggr.QP.GetSimplifiedExpr(ctx, groupBy.Inner)
if err != nil {
panic(err)
}
expr := rootAggr.QP.GetSimplifiedExpr(ctx, groupBy.Inner)
deps := ctx.SemTable.RecursiveDeps(expr)
switch {
case deps.IsSolvedBy(lhs.tableID):
Expand Down Expand Up @@ -455,10 +452,7 @@ func addColumnsFromLHSInJoinPredicates(ctx *plancontext.PlanningContext, rootAgg
for _, pred := range join.JoinPredicates.columns {
for _, bve := range pred.LHSExprs {
expr := bve.Expr
wexpr, err := rootAggr.QP.GetSimplifiedExpr(ctx, expr)
if err != nil {
panic(err)
}
wexpr := rootAggr.QP.GetSimplifiedExpr(ctx, expr)
idx, found := canReuseColumn(ctx, lhs.pushed.Columns, expr, extractExpr)
if !found {
idx = len(lhs.pushed.Columns)
Expand Down Expand Up @@ -489,10 +483,7 @@ func splitGroupingToLeftAndRight(
columns joinColumns,
) {
for _, groupBy := range rootAggr.Grouping {
expr, err := rootAggr.QP.GetSimplifiedExpr(ctx, groupBy.Inner)
if err != nil {
panic(err)
}
expr := rootAggr.QP.GetSimplifiedExpr(ctx, groupBy.Inner)
deps := ctx.SemTable.RecursiveDeps(expr)
switch {
case deps.IsSolvedBy(lhs.tableID):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,15 @@ func (ab *aggBuilder) handleAggr(ctx *plancontext.PlanningContext, aggr Aggr) er
// TODO: this should be handled better by pushing the function down.
return errAbortAggrPushing
case opcode.AggregateUnassigned:
return vterrors.VT12001(fmt.Sprintf("in scatter query: aggregation function '%s'", sqlparser.String(aggr.Original)))
panic(vterrors.VT12001(fmt.Sprintf("in scatter query: aggregation function '%s'", sqlparser.String(aggr.Original))))
case opcode.AggregateGtid:
// this is only used for SHOW GTID queries that will never contain joins
return vterrors.VT13001("cannot do join with vgtid")
panic(vterrors.VT13001("cannot do join with vgtid"))
case opcode.AggregateSumDistinct, opcode.AggregateCountDistinct:
// we are not going to see values multiple times, so we don't need to multiply with the count(*) from the other side
return ab.handlePushThroughAggregation(ctx, aggr)
default:
return vterrors.VT12001(fmt.Sprintf("aggregation not planned: %s", aggr.OpCode.String()))
panic(vterrors.VT12001(fmt.Sprintf("aggregation not planned: %s", aggr.OpCode.String())))
}
}

Expand Down
15 changes: 5 additions & 10 deletions go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,6 @@ func (aj *ApplyJoin) AddJoinPredicate(ctx *plancontext.PlanningContext, expr sql
aj.RHS = rhs
}

func (aj *ApplyJoin) pushColRight(ctx *plancontext.PlanningContext, e *sqlparser.AliasedExpr, addToGroupBy bool) (int, error) {
offset := aj.RHS.AddColumn(ctx, true, addToGroupBy, e)
return offset, nil
}

func (aj *ApplyJoin) GetColumns(*plancontext.PlanningContext) []*sqlparser.AliasedExpr {
return slice.Map(aj.JoinColumns.columns, func(from applyJoinColumn) *sqlparser.AliasedExpr {
return aeWrap(from.Original)
Expand Down Expand Up @@ -309,7 +304,7 @@ func (aj *ApplyJoin) isColNameMovedFromL2R(bindVarName string) bool {

// findOrAddColNameBindVarName goes through the JoinColumns and looks for the given colName coming from the LHS of the join
// and returns the argument name if found. if it's not found, a new applyJoinColumn passing this through will be added
func (aj *ApplyJoin) findOrAddColNameBindVarName(ctx *plancontext.PlanningContext, col *sqlparser.ColName) (string, error) {
func (aj *ApplyJoin) findOrAddColNameBindVarName(ctx *plancontext.PlanningContext, col *sqlparser.ColName) string {
for i, thisCol := range aj.JoinColumns.columns {
idx := slices.IndexFunc(thisCol.LHSExprs, func(e BindVarExpr) bool {
return ctx.SemTable.EqualsExpr(e.Expr, col)
Expand All @@ -324,23 +319,23 @@ func (aj *ApplyJoin) findOrAddColNameBindVarName(ctx *plancontext.PlanningContex
expr.Name = bvname
aj.JoinColumns.columns[i].LHSExprs[idx] = expr
}
return thisCol.LHSExprs[idx].Name, nil
return thisCol.LHSExprs[idx].Name
}
}
for _, thisCol := range aj.JoinPredicates.columns {
idx := slices.IndexFunc(thisCol.LHSExprs, func(e BindVarExpr) bool {
return ctx.SemTable.EqualsExpr(e.Expr, col)
})
if idx != -1 {
return thisCol.LHSExprs[idx].Name, nil
return thisCol.LHSExprs[idx].Name
}
}

idx := slices.IndexFunc(aj.ExtraLHSVars, func(e BindVarExpr) bool {
return ctx.SemTable.EqualsExpr(e.Expr, col)
})
if idx != -1 {
return aj.ExtraLHSVars[idx].Name, nil
return aj.ExtraLHSVars[idx].Name
}

// we didn't find it, so we need to add it
Expand All @@ -349,7 +344,7 @@ func (aj *ApplyJoin) findOrAddColNameBindVarName(ctx *plancontext.PlanningContex
Name: bvName,
Expr: col,
})
return bvName, nil
return bvName
}

func (a *ApplyJoin) LHSColumnsNeeded(ctx *plancontext.PlanningContext) (needed sqlparser.Exprs) {
Expand Down
14 changes: 5 additions & 9 deletions go/vt/vtgate/planbuilder/operators/ast_to_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func cloneASTAndSemState[T sqlparser.SQLNode](ctx *plancontext.PlanningContext,

// findTablesContained returns the TableSet of all the contained
func findTablesContained(ctx *plancontext.PlanningContext, node sqlparser.SQLNode) (result semantics.TableSet) {
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (bool, error) {
t, ok := node.(*sqlparser.AliasedTableExpr)
if !ok {
return true, nil
Expand All @@ -113,7 +113,7 @@ func checkForCorrelatedSubqueries(
stmt sqlparser.SelectStatement,
subqID semantics.TableSet,
) (correlated bool) {
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (bool, error) {
colname, isColname := node.(*sqlparser.ColName)
if !isColname {
return true, nil
Expand Down Expand Up @@ -177,9 +177,8 @@ func createOperatorFromUnion(ctx *plancontext.PlanningContext, node *sqlparser.U
// 1. verifyAllFKs: For this given statement, do we need to verify validity of all the foreign keys on the vtgate level.
// 2. fkToIgnore: The foreign key constraint to specifically ignore while planning the statement. This field is used in UPDATE CASCADE planning, wherein while planning the child update
// query, we need to ignore the parent foreign key constraint that caused the cascade in question.
func createOpFromStmt(ctx *plancontext.PlanningContext, stmt sqlparser.Statement, verifyAllFKs bool, fkToIgnore string) Operator {
var err error
ctx, err = plancontext.CreatePlanningContext(stmt, ctx.ReservedVars, ctx.VSchema, ctx.PlannerVersion)
func createOpFromStmt(inCtx *plancontext.PlanningContext, stmt sqlparser.Statement, verifyAllFKs bool, fkToIgnore string) Operator {
ctx, err := plancontext.CreatePlanningContext(stmt, inCtx.ReservedVars, inCtx.VSchema, inCtx.PlannerVersion)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -284,10 +283,7 @@ func getOperatorFromAliasedTableExpr(ctx *plancontext.PlanningContext, tableExpr
horizon.TableId = &tableID
horizon.Alias = tableExpr.As.String()
horizon.ColumnAliases = tableExpr.Columns
qp, err := CreateQPFromSelectStatement(ctx, tbl.Select)
if err != nil {
panic(err)
}
qp := CreateQPFromSelectStatement(ctx, tbl.Select)
horizon.QP = qp
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type (
func (d *Distinct) planOffsets(ctx *plancontext.PlanningContext) Operator {
columns := d.GetColumns(ctx)
for idx, col := range columns {
e, err := d.QP.GetSimplifiedExpr(ctx, col.Expr)
e, err := d.QP.TryGetSimplifiedExpr(ctx, col.Expr)
if err != nil {
// ambiguous columns are not a problem for DISTINCT
e = col.Expr
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/planbuilder/operators/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ func compact(ctx *plancontext.PlanningContext, op Operator) Operator {
return newOp
}

func checkValid(op Operator) error {
func checkValid(op Operator) {
type checkable interface {
CheckValid() error
CheckValid()
}

return Visit(op, func(this Operator) error {
_ = Visit(op, func(this Operator) error {
if chk, ok := this.(checkable); ok {
return chk.CheckValid()
chk.CheckValid()
}
return nil
})
Expand Down
9 changes: 2 additions & 7 deletions go/vt/vtgate/planbuilder/operators/horizon.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,9 @@ func (h *Horizon) src() Operator {
}

func (h *Horizon) getQP(ctx *plancontext.PlanningContext) *QueryProjection {
if h.QP != nil {
return h.QP
}
qp, err := CreateQPFromSelectStatement(ctx, h.Query)
if err != nil {
panic(err)
if h.QP == nil {
h.QP = CreateQPFromSelectStatement(ctx, h.Query)
}
h.QP = qp
return h.QP
}

Expand Down
12 changes: 3 additions & 9 deletions go/vt/vtgate/planbuilder/operators/horizon_expanding.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ func createProjectionFromSelect(ctx *plancontext.PlanningContext, horizon *Horiz
return out
}

aggregations, complexAggr, err := qp.AggregationExpressions(ctx, true)
if err != nil {
panic(err)
}
aggregations, complexAggr := qp.AggregationExpressions(ctx, true)

a := &Aggregator{
Source: horizon.src(),
Expand Down Expand Up @@ -249,16 +246,13 @@ func newStarProjection(src Operator, qp *QueryProjection) *Projection {
cols := sqlparser.SelectExprs{}

for _, expr := range qp.SelectExprs {
err := sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
_, isSubQ := node.(*sqlparser.Subquery)
if !isSubQ {
return true, nil
}
return false, vterrors.VT09015()
panic(vterrors.VT09015())
}, expr.Col)
if err != nil {
panic(err)
}
cols = append(cols, expr.Col)
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func createUniqueKeyComp(ins *sqlparser.Insert, expr sqlparser.Expr, vTbl *vinde
return []uComp{{idx, def}}, false
}
var offsets []uComp
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (bool, error) {
col, ok := node.(*sqlparser.ColName)
if !ok {
return true, nil
Expand Down
14 changes: 4 additions & 10 deletions go/vt/vtgate/planbuilder/operators/offset_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@ func useOffsets(ctx *plancontext.PlanningContext, expr sqlparser.Expr, op Operat
in := op.Inputs()[0]
found := func(e sqlparser.Expr, offset int) { exprOffset = sqlparser.NewOffset(offset, e) }

notFound := func(e sqlparser.Expr) error {
notFound := func(e sqlparser.Expr) {
_, addToGroupBy := e.(*sqlparser.ColName)
offset := in.AddColumn(ctx, true, addToGroupBy, aeWrap(e))
exprOffset = sqlparser.NewOffset(offset, e)
return nil
}

visitor := getOffsetRewritingVisitor(ctx, in.FindCol, found, notFound)
Expand Down Expand Up @@ -102,11 +101,10 @@ func addColumnsToInput(ctx *plancontext.PlanningContext, root Operator) Operator
}
addedColumns := false
found := func(expr sqlparser.Expr, i int) {}
notFound := func(e sqlparser.Expr) error {
notFound := func(e sqlparser.Expr) {
_, addToGroupBy := e.(*sqlparser.ColName)
proj.addColumnWithoutPushing(ctx, aeWrap(e), addToGroupBy)
addedColumns = true
return nil
}
visitor := getOffsetRewritingVisitor(ctx, proj.FindCol, found, notFound)

Expand Down Expand Up @@ -151,13 +149,9 @@ func getOffsetRewritingVisitor(
// this function will be called when an expression has been found on the input
found func(sqlparser.Expr, int),
// if we have an expression that mush be fetched, this method will be called
notFound func(sqlparser.Expr) error,
notFound func(sqlparser.Expr),
) func(node, parent sqlparser.SQLNode) bool {
var err error
return func(node, parent sqlparser.SQLNode) bool {
if err != nil {
return false
}
e, ok := node.(sqlparser.Expr)
if !ok {
return true
Expand All @@ -169,7 +163,7 @@ func getOffsetRewritingVisitor(
}

if mustFetchFromInput(e) {
err = notFound(e)
notFound(e)
return false
}

Expand Down
9 changes: 2 additions & 7 deletions go/vt/vtgate/planbuilder/operators/plan_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,8 @@ func PlanQuery(ctx *plancontext.PlanningContext, stmt sqlparser.Statement) (resu
}

op = compact(ctx, op)
if err = checkValid(op); err != nil {
return nil, err
}

if op, err = planQuery(ctx, op); err != nil {
return nil, err
}
checkValid(op)
op = planQuery(ctx, op)

_, isRoute := op.(*Route)
if !isRoute && ctx.SemTable.NotSingleRouteErr != nil {
Expand Down
10 changes: 0 additions & 10 deletions go/vt/vtgate/planbuilder/operators/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ type (
ProjCols interface {
GetColumns() []*sqlparser.AliasedExpr
GetSelectExprs() sqlparser.SelectExprs
AddColumn(*sqlparser.AliasedExpr) (ProjCols, int, error)
}

// Used when there are stars in the expressions that we were unable to expand
Expand Down Expand Up @@ -137,10 +136,6 @@ func (sp StarProjections) GetColumns() []*sqlparser.AliasedExpr {
panic(vterrors.VT09015())
}

func (sp StarProjections) AddColumn(*sqlparser.AliasedExpr) (ProjCols, int, error) {
return nil, 0, vterrors.VT09015()
}

func (sp StarProjections) GetSelectExprs() sqlparser.SelectExprs {
return sqlparser.SelectExprs(sp)
}
Expand All @@ -157,11 +152,6 @@ func (ap AliasedProjections) GetSelectExprs() sqlparser.SelectExprs {
})
}

func (ap AliasedProjections) AddColumn(col *sqlparser.AliasedExpr) (ProjCols, int, error) {
offset := len(ap)
return append(ap, newProjExpr(col)), offset, nil
}

func (pe *ProjExpr) String() string {
var alias, expr, info string
if pe.Original.As.NotEmpty() {
Expand Down
Loading

0 comments on commit 4a6da63

Please sign in to comment.