Skip to content

Commit

Permalink
Multi Target Delete Support (#15294)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Feb 27, 2024
1 parent 69604ed commit fb4abd5
Show file tree
Hide file tree
Showing 22 changed files with 941 additions and 179 deletions.
75 changes: 75 additions & 0 deletions go/test/endtoend/vtgate/queries/dml/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,78 @@ func TestDeleteWithSubquery(t *testing.T) {
utils.AssertMatches(t, mcmp.VtConn, `select region_id, oid, cust_no from order_tbl order by oid`,
`[[INT64(1) INT64(1) INT64(4)] [INT64(1) INT64(2) INT64(2)]]`)
}

// TestMultiTargetDelete executed multi-target delete queries
func TestMultiTargetDelete(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 20, "vtgate")

mcmp, closer := start(t)
defer closer()

// initial rows
mcmp.Exec("insert into order_tbl(region_id, oid, cust_no) values (1,1,4), (1,2,2), (2,3,5), (2,4,55)")
mcmp.Exec("insert into oevent_tbl(oid, ename) values (1,'a'), (2,'b'), (3,'a'), (2,'c')")

// check rows
mcmp.AssertMatches(`select region_id, oid, cust_no from order_tbl order by oid`,
`[[INT64(1) INT64(1) INT64(4)] [INT64(1) INT64(2) INT64(2)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`)
mcmp.AssertMatches(`select oid, ename from oevent_tbl order by oid`,
`[[INT64(1) VARCHAR("a")] [INT64(2) VARCHAR("b")] [INT64(2) VARCHAR("c")] [INT64(3) VARCHAR("a")]]`)

// multi table delete
qr := mcmp.Exec(`delete o, ev from order_tbl o join oevent_tbl ev where o.oid = ev.oid and ev.ename = 'a'`)
assert.EqualValues(t, 4, qr.RowsAffected)

// check rows
mcmp.AssertMatches(`select region_id, oid, cust_no from order_tbl order by oid`,
`[[INT64(1) INT64(2) INT64(2)] [INT64(2) INT64(4) INT64(55)]]`)
mcmp.AssertMatches(`select oid, ename from oevent_tbl order by oid`,
`[[INT64(2) VARCHAR("b")] [INT64(2) VARCHAR("c")]]`)

qr = mcmp.Exec(`delete o, ev from order_tbl o join oevent_tbl ev where o.cust_no = ev.oid`)
assert.EqualValues(t, 3, qr.RowsAffected)

// check rows
mcmp.AssertMatches(`select region_id, oid, cust_no from order_tbl order by oid`,
`[[INT64(2) INT64(4) INT64(55)]]`)
mcmp.AssertMatches(`select oid, ename from oevent_tbl order by oid`,
`[]`)
}

// TestMultiTargetDeleteMore executed multi-target delete queries with additional cases
func TestMultiTargetDeleteMore(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 20, "vtgate")

mcmp, closer := start(t)
defer closer()

// multi table delete on empty table.
qr := mcmp.Exec(`delete o, ev from order_tbl o join oevent_tbl ev on o.oid = ev.oid`)
assert.EqualValues(t, 0, qr.RowsAffected)

// initial rows
mcmp.Exec("insert into order_tbl(region_id, oid, cust_no) values (1,1,4), (1,2,2), (2,3,5), (2,4,55)")
mcmp.Exec("insert into oevent_tbl(oid, ename) values (1,'a'), (2,'b'), (3,'a'), (2,'c')")

// multi table delete on non-existent data.
qr = mcmp.Exec(`delete o, ev from order_tbl o join oevent_tbl ev on o.oid = ev.oid where ev.oid = 10`)
assert.EqualValues(t, 0, qr.RowsAffected)

// check rows
mcmp.AssertMatches(`select region_id, oid, cust_no from order_tbl order by oid`,
`[[INT64(1) INT64(1) INT64(4)] [INT64(1) INT64(2) INT64(2)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`)
mcmp.AssertMatches(`select oid, ename from oevent_tbl order by oid`,
`[[INT64(1) VARCHAR("a")] [INT64(2) VARCHAR("b")] [INT64(2) VARCHAR("c")] [INT64(3) VARCHAR("a")]]`)

// multi table delete with rollback
mcmp.Exec(`begin`)
qr = mcmp.Exec(`delete o, ev from order_tbl o join oevent_tbl ev on o.oid = ev.oid where o.cust_no != 4`)
assert.EqualValues(t, 5, qr.RowsAffected)
mcmp.Exec(`rollback`)

// check rows
mcmp.AssertMatches(`select region_id, oid, cust_no from order_tbl order by oid`,
`[[INT64(1) INT64(1) INT64(4)] [INT64(1) INT64(2) INT64(2)] [INT64(2) INT64(3) INT64(5)] [INT64(2) INT64(4) INT64(55)]]`)
mcmp.AssertMatches(`select oid, ename from oevent_tbl order by oid`,
`[[INT64(1) VARCHAR("a")] [INT64(2) VARCHAR("b")] [INT64(2) VARCHAR("c")] [INT64(3) VARCHAR("a")]]`)
}
22 changes: 16 additions & 6 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 29 additions & 12 deletions go/vt/vtgate/engine/dml_with_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package engine

import (
"context"
"fmt"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vterrors"
Expand All @@ -34,10 +35,10 @@ const DmlVals = "dml_vals"
type DMLWithInput struct {
txNeeded

DML Primitive
Input Primitive

OutputCols []int
DMLs []Primitive
OutputCols [][]int
}

func (dml *DMLWithInput) RouteType() string {
Expand All @@ -53,7 +54,7 @@ func (dml *DMLWithInput) GetTableName() string {
}

func (dml *DMLWithInput) Inputs() ([]Primitive, []map[string]any) {
return []Primitive{dml.Input, dml.DML}, nil
return append([]Primitive{dml.Input}, dml.DMLs...), nil
}

// TryExecute performs a non-streaming exec.
Expand All @@ -66,15 +67,27 @@ func (dml *DMLWithInput) TryExecute(ctx context.Context, vcursor VCursor, bindVa
return &sqltypes.Result{}, nil
}

var bv *querypb.BindVariable
if len(dml.OutputCols) == 1 {
bv = getBVSingle(inputRes, dml.OutputCols[0])
} else {
bv = getBVMulti(inputRes, dml.OutputCols)
}
var res *sqltypes.Result
for idx, prim := range dml.DMLs {
var bv *querypb.BindVariable
if len(dml.OutputCols[idx]) == 1 {
bv = getBVSingle(inputRes, dml.OutputCols[idx][0])
} else {
bv = getBVMulti(inputRes, dml.OutputCols[idx])
}

bindVars[DmlVals] = bv
return vcursor.ExecutePrimitive(ctx, dml.DML, bindVars, false)
bindVars[DmlVals] = bv
qr, err := vcursor.ExecutePrimitive(ctx, prim, bindVars, false)
if err != nil {
return nil, err
}
if res == nil {
res = qr
} else {
res.RowsAffected += qr.RowsAffected
}
}
return res, nil
}

func getBVSingle(res *sqltypes.Result, offset int) *querypb.BindVariable {
Expand Down Expand Up @@ -113,8 +126,12 @@ func (dml *DMLWithInput) GetFields(context.Context, VCursor, map[string]*querypb
}

func (dml *DMLWithInput) description() PrimitiveDescription {
var offsets []string
for idx, offset := range dml.OutputCols {
offsets = append(offsets, fmt.Sprintf("%d:%v", idx, offset))
}
other := map[string]any{
"Offset": dml.OutputCols,
"Offset": offsets,
}
return PrimitiveDescription{
OperatorType: "DMLWithInput",
Expand Down
78 changes: 72 additions & 6 deletions go/vt/vtgate/engine/dml_with_input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

Expand All @@ -34,7 +35,7 @@ func TestDeleteWithInputSingleOffset(t *testing.T) {

del := &DMLWithInput{
Input: input,
DML: &Delete{
DMLs: []Primitive{&Delete{
DML: &DML{
RoutingParameters: &RoutingParameters{
Opcode: Scatter,
Expand All @@ -45,8 +46,8 @@ func TestDeleteWithInputSingleOffset(t *testing.T) {
},
Query: "dummy_delete",
},
},
OutputCols: []int{0},
}},
OutputCols: [][]int{{0}},
}

vc := newDMLTestVCursor("-20", "20-")
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestDeleteWithInputMultiOffset(t *testing.T) {

del := &DMLWithInput{
Input: input,
DML: &Delete{
DMLs: []Primitive{&Delete{
DML: &DML{
RoutingParameters: &RoutingParameters{
Opcode: Scatter,
Expand All @@ -89,8 +90,8 @@ func TestDeleteWithInputMultiOffset(t *testing.T) {
},
Query: "dummy_delete",
},
},
OutputCols: []int{1, 0},
}},
OutputCols: [][]int{{1, 0}},
}

vc := newDMLTestVCursor("-20", "20-")
Expand All @@ -114,3 +115,68 @@ func TestDeleteWithInputMultiOffset(t *testing.T) {
`ks.20-: dummy_delete {dml_vals: type:TUPLE values:{type:TUPLE value:"\x950\x01a\x89\x02\x011"} values:{type:TUPLE value:"\x950\x01b\x89\x02\x012"} values:{type:TUPLE value:"\x950\x01c\x89\x02\x013"}} true false`,
})
}

func TestDeleteWithMultiTarget(t *testing.T) {
input := &fakePrimitive{results: []*sqltypes.Result{
sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|id|user_id", "int64|int64|int64"),
"1|100|1", "2|100|2", "3|200|3"),
}}

vindex, _ := vindexes.CreateVindex("hash", "", nil)

del1 := &Delete{
DML: &DML{
RoutingParameters: &RoutingParameters{
Opcode: IN,
Keyspace: &vindexes.Keyspace{Name: "ks", Sharded: true},
Vindex: vindex,
Values: []evalengine.Expr{
&evalengine.BindVariable{Key: "dml_vals", Type: sqltypes.Tuple},
},
},
Query: "dummy_delete_1",
},
}

del2 := &Delete{
DML: &DML{
RoutingParameters: &RoutingParameters{
Opcode: MultiEqual,
Keyspace: &vindexes.Keyspace{Name: "ks", Sharded: true},
Vindex: vindex,
Values: []evalengine.Expr{
&evalengine.TupleBindVariable{Key: "dml_vals", Index: 1},
},
},
Query: "dummy_delete_2",
},
}

del := &DMLWithInput{
Input: input,
DMLs: []Primitive{del1, del2},
OutputCols: [][]int{{0}, {1, 2}},
}

vc := newDMLTestVCursor("-20", "20-")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations ks [type:INT64 value:"1" type:INT64 value:"2" type:INT64 value:"3"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`,
`ExecuteMultiShard ks.-20: dummy_delete_1 {dml_vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"}} true true`,
`ResolveDestinations ks [type:INT64 value:"1" type:INT64 value:"2" type:INT64 value:"3"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`,
`ExecuteMultiShard ks.-20: dummy_delete_2 {dml_vals: type:TUPLE values:{type:TUPLE value:"\x89\x02\x03100\x89\x02\x011"} values:{type:TUPLE value:"\x89\x02\x03100\x89\x02\x012"} values:{type:TUPLE value:"\x89\x02\x03200\x89\x02\x013"}} true true`,
})

vc.Rewind()
input.rewind()
err = del.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false, func(result *sqltypes.Result) error { return nil })
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations ks [type:INT64 value:"1" type:INT64 value:"2" type:INT64 value:"3"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`,
`ExecuteMultiShard ks.-20: dummy_delete_1 {dml_vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"}} true true`,
`ResolveDestinations ks [type:INT64 value:"1" type:INT64 value:"2" type:INT64 value:"3"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`,
`ExecuteMultiShard ks.-20: dummy_delete_2 {dml_vals: type:TUPLE values:{type:TUPLE value:"\x89\x02\x03100\x89\x02\x011"} values:{type:TUPLE value:"\x89\x02\x03100\x89\x02\x012"} values:{type:TUPLE value:"\x89\x02\x03200\x89\x02\x013"}} true true`,
})
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/evalengine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions go/vt/vtgate/evalengine/expr_tuple_bvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ type (
Index int
Type sqltypes.Type
Collation collations.ID

// dynamicTypeOffset is set when the type of this bind variable cannot be calculated
// at translation time. Since expressions with dynamic types cannot be compiled ahead of time,
// compilation will be delayed until the expression is first executed with the bind variables
// sent by the user. See: UntypedExpr
dynamicTypeOffset int
}
)

Expand Down
21 changes: 4 additions & 17 deletions go/vt/vtgate/planbuilder/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
"vitess.io/vitess/go/vt/vtgate/semantics"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

Expand Down Expand Up @@ -68,8 +67,10 @@ func gen4DeleteStmtPlanner(
}
}

if err := checkIfDeleteSupported(deleteStmt, ctx.SemTable); err != nil {
return nil, err
// error out here if delete query cannot bypass the planner and
// planner cannot plan such query due to different reason like missing full information, etc.
if ctx.SemTable.NotUnshardedErr != nil {
return nil, ctx.SemTable.NotUnshardedErr
}

op, err := operators.PlanQuery(ctx, deleteStmt)
Expand Down Expand Up @@ -132,17 +133,3 @@ func deleteUnshardedShortcut(stmt *sqlparser.Delete, ks *vindexes.Keyspace, tabl
}
return &primitiveWrapper{prim: &engine.Delete{DML: edml}}
}

// checkIfDeleteSupported checks if the delete query is supported or we must return an error.
func checkIfDeleteSupported(del *sqlparser.Delete, semTable *semantics.SemTable) error {
if semTable.NotUnshardedErr != nil {
return semTable.NotUnshardedErr
}

// Delete is only supported for single Target.
if len(del.Targets) > 1 {
return vterrors.VT12001("multi-table DELETE statement with multi-target")
}

return nil
}
11 changes: 7 additions & 4 deletions go/vt/vtgate/planbuilder/dml_with_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@ import (

type dmlWithInput struct {
input logicalPlan
dml logicalPlan
dmls []logicalPlan

outputCols []int
outputCols [][]int
}

var _ logicalPlan = (*dmlWithInput)(nil)

// Primitive implements the logicalPlan interface
func (d *dmlWithInput) Primitive() engine.Primitive {
inp := d.input.Primitive()
del := d.dml.Primitive()
var dels []engine.Primitive
for _, dml := range d.dmls {
dels = append(dels, dml.Primitive())
}
return &engine.DMLWithInput{
DML: del,
DMLs: dels,
Input: inp,
OutputCols: d.outputCols,
}
Expand Down
Loading

0 comments on commit fb4abd5

Please sign in to comment.