Skip to content

Commit

Permalink
dm: sync partition DDL even if upstream schema is not equal in optimi…
Browse files Browse the repository at this point in the history
…stic mode (#9789) (#9808)

close #9788
  • Loading branch information
ti-chi-bot authored Sep 26, 2023
1 parent a9f7d7a commit 7e90ede
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 4 deletions.
2 changes: 1 addition & 1 deletion dm/master/shardddl/optimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ func (t *testOptimistSuite) TestOptimistLockConflict() {
case <-time.After(watchTimeout):
t.T().Fatal("timeout")
case op3 := <-opCh:
require.Equal(t.T(), []string{}, op3.DDLs)
require.Equal(t.T(), DDLs1, op3.DDLs)
require.Equal(t.T(), optimism.ConflictNone, op3.ConflictStage)
}
cancel2()
Expand Down
7 changes: 4 additions & 3 deletions dm/pkg/shardddl/optimism/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ func (l *Lock) trySyncForOneDDL(source, schema, table string, prevTable, postTab

// Normal DDL
if tableErr == nil {
log.L().Debug("receive a normal DDL", zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable))
log.L().Info("receive a normal DDL", zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable))
oldJoined, oldErr := l.joinNormalTables()

l.tables[source][schema][table] = postTable
Expand Down Expand Up @@ -883,7 +883,8 @@ func (l *Lock) trySyncForOneDDL(source, schema, table string, prevTable, postTab
// oldJoined != newJoined
// postTable == oldJoined (CREATE TABLE)
// prevTable < postTable
return (joinedErr != nil || joinedCmp != 0) || (err2 == nil && cmp == 0) || tableCmp < 0, ConflictNone
// prevTable == postTable(Partition/Sequence)
return (joinedErr != nil || joinedCmp != 0) || (err2 == nil && cmp == 0) || tableCmp <= 0, ConflictNone
}
}

Expand Down Expand Up @@ -920,7 +921,7 @@ func (l *Lock) trySyncForOneDDL(source, schema, table string, prevTable, postTab

return true, ConflictNone
}
log.L().Debug("conflict hasn't been resolved", zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable))
log.L().Info("conflict hasn't been resolved", zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable))
return false, ConflictSkipWaitRedirect
}

Expand Down
5 changes: 5 additions & 0 deletions dm/pkg/shardddl/optimism/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2738,6 +2738,11 @@ func (t *testLock) TestTrySyncForOneDDL(c *C) {
c.Assert(schemaChanged, IsTrue)
c.Assert(conflictStage, Equals, ConflictNone)

// check create partition, no changed since https://github.com/pingcap/tidb-tools/blob/d671b0840063bc2532941f02e02e12627402844c/pkg/schemacmp/table.go#L251
schemaChanged, conflictStage = l.trySyncForOneDDL(source, schema, table1, t0, t1)
c.Assert(schemaChanged, IsTrue)
c.Assert(conflictStage, Equals, ConflictNone)

// check alter table drop column
schemaChanged, conflictStage = l.trySyncForOneDDL(source, schema, table2, t0, t2)
c.Assert(schemaChanged, IsFalse)
Expand Down
22 changes: 22 additions & 0 deletions dm/tests/shardddl2/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,27 @@ function DM_DropAddColumn() {
done
}

function DM_ADD_DROP_PARTITIONS_CASE() {
run_sql_source1 "insert into ${shardddl1}.${tb1} values(1);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(2);"
run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col1 int;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(3,3);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(4);"

run_sql_source1 "ALTER TABLE ${shardddl1}.${tb1} ADD PARTITION (partition p1 VALUES LESS THAN (10000))"
run_sql_tidb_with_retry "SELECT count(1) FROM information_schema.partitions WHERE TABLE_SCHEMA='${shardddl}' AND TABLE_NAME = '${tb}' AND PARTITION_NAME IS NOT NULL;" "count(1): 2"

run_sql_source1 "ALTER TABLE ${shardddl1}.${tb1} DROP PARTITION p1;"
run_sql_tidb_with_retry "SELECT count(1) FROM information_schema.partitions WHERE TABLE_SCHEMA='${shardddl}' AND TABLE_NAME = '${tb}' AND PARTITION_NAME IS NOT NULL;" "count(1): 1"
}

function DM_ADD_DROP_PARTITIONS() {
run_case ADD_DROP_PARTITIONS "double-source-optimistic" \
"run_sql_source1 \"create table ${shardddl1}.${tb1} (a int) PARTITION BY RANGE (a) (PARTITION p0 VALUES LESS THAN (100));\"; \
run_sql_source2 \"create table ${shardddl1}.${tb1} (a int) PARTITION BY RANGE (a) (PARTITION p0 VALUES LESS THAN (100));\";" \
"clean_table" "optimistic"
}

function run() {
init_cluster
init_database
Expand All @@ -515,6 +536,7 @@ function run() {
DM_DROP_COLUMN_ALL_DONE
DM_RECOVER_LOCK
DM_DropAddColumn
DM_ADD_DROP_PARTITIONS
start=36
end=45
except=(042 044 045)
Expand Down

0 comments on commit 7e90ede

Please sign in to comment.