Skip to content

Commit

Permalink
cdc: support remove and add partitioning (#9670)
Browse files Browse the repository at this point in the history
close #9641
  • Loading branch information
mjonss authored Sep 22, 2023
1 parent 2edb6db commit fa13ca0
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 8 deletions.
39 changes: 33 additions & 6 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *Snapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo, error) {
case timodel.ActionCreateTable, timodel.ActionCreateView, timodel.ActionRecoverTable:
// no pre table info
return nil, nil
case timodel.ActionRenameTable, timodel.ActionDropTable, timodel.ActionDropView, timodel.ActionTruncateTable:
case timodel.ActionRenameTable, timodel.ActionDropTable, timodel.ActionDropView, timodel.ActionTruncateTable, timodel.ActionAlterTablePartitioning, timodel.ActionRemovePartitioning:
// get the table will be dropped
table, ok := s.PhysicalTableByID(job.TableID)
if !ok {
Expand Down Expand Up @@ -400,17 +400,18 @@ func (s *Snapshot) Drop() {
s.inner.drop()
}

func getWrapTableInfo(job *timodel.Job) *model.TableInfo {
return model.WrapTableInfo(job.SchemaID, job.SchemaName,
job.BinlogInfo.FinishedTS,
job.BinlogInfo.TableInfo)
}

// DoHandleDDL is like HandleDDL but doesn't fill schema name into job.
// NOTE: it's public because some tests in the upper package need this.
func (s *Snapshot) DoHandleDDL(job *timodel.Job) error {
s.rwlock.Lock()
defer s.rwlock.Unlock()

getWrapTableInfo := func(job *timodel.Job) *model.TableInfo {
return model.WrapTableInfo(job.SchemaID, job.SchemaName,
job.BinlogInfo.FinishedTS,
job.BinlogInfo.TableInfo)
}
switch job.Type {
case timodel.ActionCreateSchema:
// get the DBInfo from job rawArgs
Expand Down Expand Up @@ -480,6 +481,11 @@ func (s *Snapshot) DoHandleDDL(job *timodel.Job) error {
if err != nil {
return errors.Trace(err)
}
case timodel.ActionRemovePartitioning, timodel.ActionAlterTablePartitioning:
err := s.inner.alterPartitioning(job)
if err != nil {
return errors.Trace(err)
}
default:
binlogInfo := job.BinlogInfo
if binlogInfo == nil {
Expand Down Expand Up @@ -1004,6 +1010,27 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin
return nil
}

// alterPartitioning changes the table id and updates the TableInfo (including the partitioning info)
func (s *snapshot) alterPartitioning(job *timodel.Job) error {
// first drop the table (will work with both partitioned and non-partitioned tables
err := s.dropTable(job.TableID, job.BinlogInfo.FinishedTS)
if err != nil {
return errors.Trace(err)
}
// (re)create table, again will work with both partitioned and non-paritioned tables
// it uses the model.TableInfo written to the job.BinlogInfo, which is the final one
err = s.createTable(getWrapTableInfo(job), job.BinlogInfo.FinishedTS)
if err != nil {
return errors.Trace(err)
}

log.Info("handle alter partitioning success",
zap.Int64("OldID", job.TableID),
zap.Int64("NewID", job.BinlogInfo.TableInfo.ID),
zap.String("Name", job.TableName))
return nil
}

func (s *snapshot) renameTables(job *timodel.Job, currentTs uint64) error {
var oldSchemaIDs, newSchemaIDs, oldTableIDs []int64
var newTableNames, oldSchemaNames []*timodel.CIStr
Expand Down
4 changes: 4 additions & 0 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ var nonGlobalDDLs = map[timodel.ActionType]struct{}{
timodel.ActionReorganizePartition: {},
timodel.ActionAlterTTLInfo: {},
timodel.ActionAlterTTLRemove: {},
timodel.ActionAlterTablePartitioning: {},
timodel.ActionRemovePartitioning: {},
}

var redoBarrierDDLs = map[timodel.ActionType]struct{}{
Expand All @@ -86,6 +88,8 @@ var redoBarrierDDLs = map[timodel.ActionType]struct{}{
timodel.ActionTruncateTablePartition: {},
timodel.ActionRecoverTable: {},
timodel.ActionReorganizePartition: {},
timodel.ActionAlterTablePartitioning: {},
timodel.ActionRemovePartitioning: {},
}

// ddlManager holds the pending DDL events of all tables and responsible for
Expand Down
2 changes: 2 additions & 0 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ var allowDDLList = []timodel.ActionType{
timodel.ActionReorganizePartition,
timodel.ActionAlterTTLInfo,
timodel.ActionAlterTTLRemove,
timodel.ActionAlterTablePartitioning,
timodel.ActionRemovePartitioning,
}

// Filter are safe for concurrent use.
Expand Down
1 change: 1 addition & 0 deletions pkg/sink/codec/canal/canal_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func convertDdlEventType(e *model.DDLEvent) canal.EventType {
mm.ActionSetDefaultValue, mm.ActionModifyTableComment, mm.ActionRenameIndex, mm.ActionAddTablePartition,
mm.ActionDropTablePartition, mm.ActionModifyTableCharsetAndCollate, mm.ActionTruncateTablePartition,
mm.ActionAlterIndexVisibility, mm.ActionMultiSchemaChange, mm.ActionReorganizePartition,
mm.ActionAlterTablePartitioning, mm.ActionRemovePartitioning,
// AddColumns and DropColumns are removed in TiDB v6.2.0, see https://github.com/pingcap/tidb/pull/35862.
mm.ActionAddColumns, mm.ActionDropColumns:
return canal.EventType_ALTER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ check-struct-only = false

target-instance = "tidb0"

target-check-tables = ["partition_table.?*"]
target-check-tables = ["partition_table*.*"]

[data-sources]
[data-sources.mysql1]
Expand Down
13 changes: 12 additions & 1 deletion tests/integration_tests/partition_table/data/prepare.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
drop database if exists `partition_table`;
set @@global.tidb_enable_exchange_partition=on;
drop database if exists `partition_table2`;
create database `partition_table`;
use `partition_table`;

Expand All @@ -21,6 +21,12 @@ alter table t1 drop partition p1;
insert into t1 values (7),(8),(9);
update t1 set a=a+10 where a=9;

/* Remove partitioning + add partitioning back again */
alter table t remove partitioning;
insert into t values (20),(21),(22),(23),(24),(25);
alter table t partition by hash (a) partitions 5;
insert into t values (30),(31),(32),(33),(34),(35);

/* exchange partition case 1: source table and target table in same database */
create table t2 (a int primary key);
ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
Expand All @@ -43,4 +49,9 @@ insert into t1 values (-3),(5),(14),(22),(30),(100);
update t1 set a=a-16 where a=12;
delete from t1 where a = 29;

/* Change partitioning to key based and then back to range */
alter table t1 partition by key(a) partitions 7;
insert into t1 values (-2001),(2001),(2002),(-2002),(-2003),(2003),(-2004),(2004),(-2005),(2005),(2006),(-2006),(2007),(-2007);
ALTER TABLE t1 partition by range(a) (partition p0 values less than (5), PARTITION p2 VALUES LESS THAN (20), PARTITION p3 VALUES LESS THAN (26), PARTITION p4 VALUES LESS THAN (35), PARTITION pMax VALUES LESS THAN (MAXVALUE));

create table finish_mark (a int primary key);

0 comments on commit fa13ca0

Please sign in to comment.