Skip to content

Commit

Permalink
fix BuildTiDBTableInfo function
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen committed Nov 1, 2023
1 parent e4ac90b commit fd92104
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
51 changes: 50 additions & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1518,11 +1518,13 @@ func TestDecodeEventIgnoreRow(t *testing.T) {

func TestBuildTableInfo(t *testing.T) {
cases := []struct {
caseNumber int
origin string
recovered string
recoveredWithNilCol string
}{
{
1,
"CREATE TABLE t1 (c INT PRIMARY KEY)",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) NOT NULL,\n" +
Expand All @@ -1534,6 +1536,7 @@ func TestBuildTableInfo(t *testing.T) {
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
2,
"CREATE TABLE t1 (" +
" c INT UNSIGNED," +
" c2 VARCHAR(10) NOT NULL," +
Expand All @@ -1555,6 +1558,7 @@ func TestBuildTableInfo(t *testing.T) {
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
3,
"CREATE TABLE t1 (" +
" c INT UNSIGNED," +
" gen INT AS (c+1) VIRTUAL," +
Expand All @@ -1580,6 +1584,7 @@ func TestBuildTableInfo(t *testing.T) {
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
4,
"CREATE TABLE `t1` (" +
" `a` int(11) NOT NULL," +
" `b` int(11) DEFAULT NULL," +
Expand All @@ -1601,6 +1606,39 @@ func TestBuildTableInfo(t *testing.T) {
" PRIMARY KEY (`a`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
5,
"CREATE TABLE your_table (" +
" id INT NOT NULL," +
" name VARCHAR(50) NOT NULL," +
" email VARCHAR(100) NOT NULL," +
" age INT NOT NULL ," +
" address VARCHAR(200) NOT NULL," +
" PRIMARY KEY (id, name)," +
" UNIQUE INDEX idx_unique_1 (id, email, age)," +
" UNIQUE INDEX idx_unique_2 (name, email, address)" +
" );",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `id` int(0) NOT NULL,\n" +
" `name` varchar(0) NOT NULL,\n" +
" `email` varchar(0) NOT NULL,\n" +
" `age` int(0) NOT NULL,\n" +
" `address` varchar(0) NOT NULL,\n" +
" PRIMARY KEY (`id`(0),`name`(0)) /*T![clustered_index] CLUSTERED */,\n" +
" UNIQUE KEY `idx_1` (`id`(0),`email`(0),`age`(0)),\n" +
" UNIQUE KEY `idx_2` (`name`(0),`email`(0),`address`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `id` int(0) NOT NULL,\n" +
" `name` varchar(0) NOT NULL,\n" +
" `omitted` unspecified GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `omitted` unspecified GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `omitted` unspecified GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" PRIMARY KEY (`id`(0),`name`(0)) /*T![clustered_index] CLUSTERED */,\n" +
" UNIQUE KEY `idx_1` (`id`(0),`omitted`(0),`omitted`(0)),\n" +
" UNIQUE KEY `idx_2` (`name`(0),`omitted`(0),`omitted`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
p := parser.New()
for _, c := range cases {
Expand All @@ -1615,7 +1653,18 @@ func TestBuildTableInfo(t *testing.T) {
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
require.NotNil(t, handle.UniqueNotNullIdx)
require.Equal(t, c.recovered, showCreateTable(t, recoveredTI))

// make sure BuildTiDBTableInfo indentify the correct primary key
if c.caseNumber == 5 {
inexes := recoveredTI.Indices
primaryCount := 0
for i := range inexes {
if inexes[i].Primary {
primaryCount++
}
}
require.Equal(t, 1, primaryCount)
require.Equal(t, 2, len(handle.UniqueNotNullIdx.Columns))
}
// mimic the columns are set to nil when old value feature is disabled
for i := range cols {
if !cols[i].Flag.IsHandleKey() {
Expand Down
16 changes: 15 additions & 1 deletion cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,10 @@ func BuildTiDBTableInfo(columns []*Column, indexColumns [][]int) *model.TableInf
continue
}
if firstCol.Flag.IsPrimaryKey() {
indexInfo.Primary = true
// If all columns in the index are primary key, then the index is primary key.
if CheckIfIndexesPrimaryKey(colOffsets, columns) {
indexInfo.Primary = true
}
indexInfo.Unique = true
}
if firstCol.Flag.IsUniqueKey() {
Expand All @@ -628,6 +631,17 @@ func BuildTiDBTableInfo(columns []*Column, indexColumns [][]int) *model.TableInf
return ret
}

// CheckIfIndexesPrimaryKey checks if all columns in a index is primary key
func CheckIfIndexesPrimaryKey(columnOffsets []int, columns []*Column) bool {
for _, offset := range columnOffsets {
column := columns[offset]
if column == nil || !column.Flag.IsPrimaryKey() {
return false
}
}
return true
}

// ColumnValueString returns the string representation of the column value
func ColumnValueString(c interface{}) string {
var data string
Expand Down

0 comments on commit fd92104

Please sign in to comment.