Skip to content

Commit

Permalink
VReplication: disable foreign_key_checks for bulk data cleanup (#15261)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Feb 15, 2024
1 parent 096b8a8 commit 1c9388e
Show file tree
Hide file tree
Showing 14 changed files with 687 additions and 523 deletions.
27 changes: 25 additions & 2 deletions go/test/endtoend/vreplication/fk_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,23 @@ var (
create table parent(id int, name varchar(128), primary key(id)) engine=innodb;
create table child(id int, parent_id int, name varchar(128), primary key(id), foreign key(parent_id) references parent(id) on delete cascade) engine=innodb;
create view vparent as select * from parent;
create table t1(id int, name varchar(128), primary key(id)) engine=innodb;
create table t2(id int, t1id int, name varchar(128), primary key(id), foreign key(t1id) references t1(id) on delete cascade) engine=innodb;
`
initialFKData = `
insert into parent values(1, 'parent1'), (2, 'parent2');
insert into child values(1, 1, 'child11'), (2, 1, 'child21'), (3, 2, 'child32');`
insert into child values(1, 1, 'child11'), (2, 1, 'child21'), (3, 2, 'child32');
insert into t1 values(1, 't11'), (2, 't12');
insert into t2 values(1, 1, 't21'), (2, 1, 't22'), (3, 2, 't23');
`

initialFKSourceVSchema = `
{
"tables": {
"parent": {},
"child": {}
"child": {},
"t1": {},
"t2": {}
}
}
`
Expand Down Expand Up @@ -59,6 +66,22 @@ insert into child values(1, 1, 'child11'), (2, 1, 'child21'), (3, 2, 'child32');
"name": "reverse_bits"
}
]
},
"t1": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"t2": {
"column_vindexes": [
{
"column": "t1id",
"name": "reverse_bits"
}
]
}
}
}
Expand Down
33 changes: 32 additions & 1 deletion go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

const testWorkflowFlavor = workflowFlavorRandom

// TestFKWorkflow runs a MoveTables workflow with atomic copy for a db with foreign key constraints.
// It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without,
// i.e. with foreign_key_checks=0.
Expand Down Expand Up @@ -77,10 +79,13 @@ func TestFKWorkflow(t *testing.T) {
}()
go ls.simulateLoad()
}

targetKeyspace := "fktarget"
targetTabletId := 200
vc.AddKeyspace(t, []*Cell{cell}, targetKeyspace, shardName, initialFKTargetVSchema, "", 0, 0, targetTabletId, sourceKsOpts)

testFKCancel(t, vc)

workflowName := "fk"
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName)

Expand All @@ -92,7 +97,7 @@ func TestFKWorkflow(t *testing.T) {
},
sourceKeyspace: sourceKeyspace,
atomicCopy: true,
}, workflowFlavorRandom)
}, testWorkflowFlavor)
mt.Create()

waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
Expand Down Expand Up @@ -122,6 +127,7 @@ func TestFKWorkflow(t *testing.T) {
cancel()
<-ch
}
mt.Complete()
}

func insertInitialFKData(t *testing.T) {
Expand All @@ -136,6 +142,9 @@ func insertInitialFKData(t *testing.T) {
log.Infof("Done inserting initial FK data")
waitForRowCount(t, vtgateConn, db, "parent", 2)
waitForRowCount(t, vtgateConn, db, "child", 3)
waitForRowCount(t, vtgateConn, db, "t1", 2)
waitForRowCount(t, vtgateConn, db, "t2", 3)

})
}

Expand Down Expand Up @@ -269,3 +278,25 @@ func (ls *fkLoadSimulator) exec(query string) *sqltypes.Result {
require.NotNil(t, qr)
return qr
}

// testFKCancel confirms that a MoveTables workflow which includes tables with foreign key
// constraints, where the parent table is lexicographically sorted before the child table and
// thus may be dropped first, can be successfully cancelled.
func testFKCancel(t *testing.T, vc *VitessCluster) {
var targetKeyspace = "fktarget"
var sourceKeyspace = "fksource"
var workflowName = "wf2"
var ksWorkflow = fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
mt := newMoveTables(vc, &moveTablesWorkflow{
workflowInfo: &workflowInfo{
vc: vc,
workflowName: workflowName,
targetKeyspace: targetKeyspace,
},
sourceKeyspace: sourceKeyspace,
atomicCopy: true,
}, testWorkflowFlavor)
mt.Create()
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
mt.Cancel()
}
1,006 changes: 509 additions & 497 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go

Large diffs are not rendered by default.

42 changes: 38 additions & 4 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,9 +510,10 @@ func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType T
query = fmt.Sprintf("rename table %s.%s TO %s.%s", primaryDbName, tableNameEscaped, primaryDbName, renameName)
}
_, err = ts.ws.tmc.ExecuteFetchAsDba(ctx, source.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,
})
if err != nil {
ts.Logger().Errorf("%s: Error removing table %s: %v", source.GetPrimary().String(), tableName, err)
Expand Down Expand Up @@ -1067,7 +1068,6 @@ func (ts *trafficSwitcher) dropSourceReverseVReplicationStreams(ctx context.Cont
}

func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error {
log.Flush()
err := ts.ForAllTargets(func(target *MigrationTarget) error {
log.Infof("ForAllTargets: %+v", target)
for _, tableName := range ts.Tables() {
Expand All @@ -1083,12 +1083,12 @@ func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error {
ts.Logger().Infof("%s: Dropping table %s.%s\n",
target.GetPrimary().String(), target.GetPrimary().DbName(), tableName)
res, err := ts.ws.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,
})
log.Infof("Removed target table with result: %+v", res)
log.Flush()
if err != nil {
ts.Logger().Errorf("%s: Error removing table %s: %v",
target.GetPrimary().String(), tableName, err)
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vttablet/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,11 +488,12 @@ func (client *Client) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb.
}

response, err := c.ExecuteFetchAsDba(ctx, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: req.Query,
DbName: topoproto.TabletDbName(tablet),
MaxRows: req.MaxRows,
DisableBinlogs: req.DisableBinlogs,
ReloadSchema: req.DisableBinlogs,
Query: req.Query,
DbName: topoproto.TabletDbName(tablet),
MaxRows: req.MaxRows,
DisableBinlogs: req.DisableBinlogs,
ReloadSchema: req.DisableBinlogs,
DisableForeignKeyChecks: req.DisableForeignKeyChecks,
})
if err != nil {
return nil, err
Expand Down
24 changes: 21 additions & 3 deletions go/vt/vttablet/tabletmanager/rpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,29 @@ func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, req *tabletmanag
if err := tm.waitForGrantsToHaveApplied(ctx); err != nil {
return nil, err
}
// get a connection
// Get a connection.
conn, err := tm.MysqlDaemon.GetDbaConnection(ctx)
if err != nil {
return nil, err
}
defer conn.Close()

// disable binlogs if necessary
// Disable binlogs if necessary.
if req.DisableBinlogs {
_, err := conn.ExecuteFetch("SET sql_log_bin = OFF", 0, false)
if err != nil {
return nil, err
}
}

// Disable FK checks if requested.
if req.DisableForeignKeyChecks {
_, err := conn.ExecuteFetch("SET SESSION foreign_key_checks = OFF", 0, false)
if err != nil {
return nil, err
}
}

if req.DbName != "" {
// This execute might fail if db does not exist.
// Error is ignored because given query might create this database.
Expand Down Expand Up @@ -130,7 +138,17 @@ func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, req *tabletmanag
}
}

// re-enable binlogs if necessary
// Re-enable FK checks if necessary.
if req.DisableForeignKeyChecks && !conn.IsClosed() {
_, err := conn.ExecuteFetch("SET SESSION foreign_key_checks = ON", 0, false)
if err != nil {
// If we can't reset the FK checks flag,
// let's just close the connection.
conn.Close()
}
}

// Re-enable binlogs if necessary.
if req.DisableBinlogs && !conn.IsClosed() {
_, err := conn.ExecuteFetch("SET sql_log_bin = ON", 0, false)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestVDiffStats(t *testing.T) {
defer testStats.controllers[id].TableDiffPhaseTimings.Record(phase, time.Now())
time.Sleep(sleepTime)
}
want := int64(1.2 * float64(sleepTime)) // Allow 20% overhead for recording timing
want := 10 * sleepTime // Allow 10x overhead for recording timing on flaky test hosts
record(string(initializing))
require.Greater(t, want, testStats.controllers[id].TableDiffPhaseTimings.Histograms()[string(initializing)].Total())
record(string(pickingTablets))
Expand Down
15 changes: 13 additions & 2 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -1790,7 +1791,12 @@ func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType w
source.GetPrimary().String(), source.GetPrimary().DbName(), tableName, source.GetPrimary().DbName(), renameName)
query = fmt.Sprintf("rename table %s.%s TO %s.%s", primaryDbName, tableNameEscaped, primaryDbName, renameName)
}
_, err = ts.wr.ExecuteFetchAsDba(ctx, source.GetPrimary().Alias, query, 1, false, true)
_, err = ts.wr.tmc.ExecuteFetchAsDba(ctx, source.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,
})
if err != nil {
ts.Logger().Errorf("%s: Error removing table %s: %v", source.GetPrimary().String(), tableName, err)
return err
Expand Down Expand Up @@ -1896,7 +1902,12 @@ func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error {
query := fmt.Sprintf("drop table %s.%s", primaryDbName, tableName)
ts.Logger().Infof("%s: Dropping table %s.%s\n",
target.GetPrimary().String(), target.GetPrimary().DbName(), tableName)
_, err = ts.wr.ExecuteFetchAsDba(ctx, target.GetPrimary().Alias, query, 1, false, true)
_, err = ts.wr.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,
})
if err != nil {
ts.Logger().Errorf("%s: Error removing table %s: %v",
target.GetPrimary().String(), tableName, err)
Expand Down
2 changes: 2 additions & 0 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,8 +949,10 @@ func testTableMigrateOneToMany(t *testing.T, keepData, keepRoutingRules bool) {
tme.dbTargetClients[0].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil)
tme.dbTargetClients[1].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil)
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", &sqltypes.Result{}, nil)
tme.tmeDB.AddQuery("SET SESSION foreign_key_checks = OFF", &sqltypes.Result{})
tme.tmeDB.AddQuery(fmt.Sprintf("rename table `vt_ks1`.`t1` TO `vt_ks1`.`%s`", getRenameFileName("t1")), &sqltypes.Result{})
tme.tmeDB.AddQuery(fmt.Sprintf("rename table `vt_ks1`.`t2` TO `vt_ks1`.`%s`", getRenameFileName("t2")), &sqltypes.Result{})
tme.tmeDB.AddQuery("SET SESSION foreign_key_checks = ON", &sqltypes.Result{})
tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", &sqltypes.Result{}, nil) //
tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", &sqltypes.Result{}, nil)
}
Expand Down
2 changes: 2 additions & 0 deletions go/vt/wrangler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,12 +871,14 @@ func expectMoveTablesQueries(t *testing.T, tme *testMigraterEnv, params *VReplic
tme.dbSourceClients[0].addInvariant("select pos, state, message from _vt.vreplication where id=2", state)
tme.dbSourceClients[1].addInvariant("select pos, state, message from _vt.vreplication where id=1", state)
tme.dbSourceClients[1].addInvariant("select pos, state, message from _vt.vreplication where id=2", state)
tme.tmeDB.AddQuery("SET SESSION foreign_key_checks = OFF", &sqltypes.Result{})
tme.tmeDB.AddQuery("USE `vt_ks1`", noResult)
tme.tmeDB.AddQuery("USE `vt_ks2`", noResult)
tme.tmeDB.AddQuery("drop table `vt_ks1`.`t1`", noResult)
tme.tmeDB.AddQuery("drop table `vt_ks1`.`t2`", noResult)
tme.tmeDB.AddQuery("drop table `vt_ks2`.`t1`", noResult)
tme.tmeDB.AddQuery("drop table `vt_ks2`.`t2`", noResult)
tme.tmeDB.AddQuery("SET SESSION foreign_key_checks = ON", &sqltypes.Result{})
tme.tmeDB.AddQuery("update _vt.vreplication set message='Picked source tablet: cell:\"cell1\" uid:10 ' where id=1", noResult)
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1", noResult)
Expand Down
1 change: 1 addition & 0 deletions proto/tabletmanagerdata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ message ExecuteFetchAsDbaRequest {
uint64 max_rows = 3;
bool disable_binlogs = 4;
bool reload_schema = 5;
bool disable_foreign_key_checks = 6;
}

message ExecuteFetchAsDbaResponse {
Expand Down
Loading

0 comments on commit 1c9388e

Please sign in to comment.