Skip to content

Commit

Permalink
mysql (ticdc): fix batch dml delete sql gennerate error (#9813)
Browse files Browse the repository at this point in the history
close #9812
  • Loading branch information
asddongmen authored Nov 1, 2023
1 parent 69c180a commit 3dc797b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 5 deletions.
47 changes: 45 additions & 2 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1601,9 +1601,41 @@ func TestBuildTableInfo(t *testing.T) {
" PRIMARY KEY (`a`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{ // This case is to check the primary key is correctly identified by BuildTiDBTableInfo
"CREATE TABLE your_table (" +
" id INT NOT NULL," +
" name VARCHAR(50) NOT NULL," +
" email VARCHAR(100) NOT NULL," +
" age INT NOT NULL ," +
" address VARCHAR(200) NOT NULL," +
" PRIMARY KEY (id, name)," +
" UNIQUE INDEX idx_unique_1 (id, email, age)," +
" UNIQUE INDEX idx_unique_2 (name, email, address)" +
" );",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `id` int(0) NOT NULL,\n" +
" `name` varchar(0) NOT NULL,\n" +
" `email` varchar(0) NOT NULL,\n" +
" `age` int(0) NOT NULL,\n" +
" `address` varchar(0) NOT NULL,\n" +
" PRIMARY KEY (`id`(0),`name`(0)) /*T![clustered_index] CLUSTERED */,\n" +
" UNIQUE KEY `idx_1` (`id`(0),`email`(0),`age`(0)),\n" +
" UNIQUE KEY `idx_2` (`name`(0),`email`(0),`address`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `id` int(0) NOT NULL,\n" +
" `name` varchar(0) NOT NULL,\n" +
" `omitted` unspecified GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `omitted` unspecified GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `omitted` unspecified GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" PRIMARY KEY (`id`(0),`name`(0)) /*T![clustered_index] CLUSTERED */,\n" +
" UNIQUE KEY `idx_1` (`id`(0),`omitted`(0),`omitted`(0)),\n" +
" UNIQUE KEY `idx_2` (`name`(0),`omitted`(0),`omitted`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
p := parser.New()
for _, c := range cases {
for i, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
Expand All @@ -1615,7 +1647,18 @@ func TestBuildTableInfo(t *testing.T) {
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
require.NotNil(t, handle.UniqueNotNullIdx)
require.Equal(t, c.recovered, showCreateTable(t, recoveredTI))

// make sure BuildTiDBTableInfo indentify the correct primary key
if i == 5 {
inexes := recoveredTI.Indices
primaryCount := 0
for i := range inexes {
if inexes[i].Primary {
primaryCount++
}
}
require.Equal(t, 1, primaryCount)
require.Equal(t, 2, len(handle.UniqueNotNullIdx.Columns))
}
// mimic the columns are set to nil when old value feature is disabled
for i := range cols {
if !cols[i].Flag.IsHandleKey() {
Expand Down
12 changes: 9 additions & 3 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,20 +604,26 @@ func BuildTiDBTableInfo(columns []*Column, indexColumns [][]int) *model.TableInf
continue
}
if firstCol.Flag.IsPrimaryKey() {
indexInfo.Primary = true
indexInfo.Unique = true
}
if firstCol.Flag.IsUniqueKey() {
indexInfo.Unique = true
}

isPrimary := true
for _, offset := range colOffsets {
col := ret.Columns[offset]
col := columns[offset]
// When only all columns in the index are primary key, then the index is primary key.
if col == nil || !col.Flag.IsPrimaryKey() {
isPrimary = false
}

tiCol := ret.Columns[offset]
indexCol := &model.IndexColumn{}
indexCol.Name = col.Name
indexCol.Name = tiCol.Name
indexCol.Offset = offset
indexInfo.Columns = append(indexInfo.Columns, indexCol)
indexInfo.Primary = isPrimary
}

// TODO: revert the "all column set index related flag" to "only the
Expand Down

0 comments on commit 3dc797b

Please sign in to comment.