From fd921048fbf9f49ea31e400f13360a4f156b6aa0 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 1 Nov 2023 14:59:51 +0800 Subject: [PATCH] fix BuildTiDBTableInfo function --- cdc/entry/mounter_test.go | 51 ++++++++++++++++++++++++++++++++++++++- cdc/model/sink.go | 16 +++++++++++- 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index 9fed0d270e8..650ce74f8fd 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -1518,11 +1518,13 @@ func TestDecodeEventIgnoreRow(t *testing.T) { func TestBuildTableInfo(t *testing.T) { cases := []struct { + caseNumber int origin string recovered string recoveredWithNilCol string }{ { + 1, "CREATE TABLE t1 (c INT PRIMARY KEY)", "CREATE TABLE `BuildTiDBTableInfo` (\n" + " `c` int(0) NOT NULL,\n" + @@ -1534,6 +1536,7 @@ func TestBuildTableInfo(t *testing.T) { ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", }, { + 2, "CREATE TABLE t1 (" + " c INT UNSIGNED," + " c2 VARCHAR(10) NOT NULL," + @@ -1555,6 +1558,7 @@ func TestBuildTableInfo(t *testing.T) { ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", }, { + 3, "CREATE TABLE t1 (" + " c INT UNSIGNED," + " gen INT AS (c+1) VIRTUAL," + @@ -1580,6 +1584,7 @@ func TestBuildTableInfo(t *testing.T) { ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", }, { + 4, "CREATE TABLE `t1` (" + " `a` int(11) NOT NULL," + " `b` int(11) DEFAULT NULL," + @@ -1601,6 +1606,39 @@ func TestBuildTableInfo(t *testing.T) { " PRIMARY KEY (`a`(0)) /*T![clustered_index] CLUSTERED */\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", }, + { + 5, + "CREATE TABLE your_table (" + + " id INT NOT NULL," + + " name VARCHAR(50) NOT NULL," + + " email VARCHAR(100) NOT NULL," + + " age INT NOT NULL ," + + " address VARCHAR(200) NOT NULL," + + " PRIMARY KEY (id, name)," + + " UNIQUE INDEX idx_unique_1 (id, email, age)," + + " UNIQUE INDEX idx_unique_2 (name, email, address)" + + " );", + "CREATE TABLE `BuildTiDBTableInfo` (\n" + + " `id` int(0) NOT NULL,\n" + + " `name` varchar(0) NOT NULL,\n" + + " `email` varchar(0) NOT NULL,\n" + + " `age` int(0) NOT NULL,\n" + + " `address` varchar(0) NOT NULL,\n" + + " PRIMARY KEY (`id`(0),`name`(0)) /*T![clustered_index] CLUSTERED */,\n" + + " UNIQUE KEY `idx_1` (`id`(0),`email`(0),`age`(0)),\n" + + " UNIQUE KEY `idx_2` (`name`(0),`email`(0),`address`(0))\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + "CREATE TABLE `BuildTiDBTableInfo` (\n" + + " `id` int(0) NOT NULL,\n" + + " `name` varchar(0) NOT NULL,\n" + + " `omitted` unspecified GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" + + " `omitted` unspecified GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" + + " `omitted` unspecified GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" + + " PRIMARY KEY (`id`(0),`name`(0)) /*T![clustered_index] CLUSTERED */,\n" + + " UNIQUE KEY `idx_1` (`id`(0),`omitted`(0),`omitted`(0)),\n" + + " UNIQUE KEY `idx_2` (`name`(0),`omitted`(0),`omitted`(0))\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + }, } p := parser.New() for _, c := range cases { @@ -1615,7 +1653,18 @@ func TestBuildTableInfo(t *testing.T) { handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI) require.NotNil(t, handle.UniqueNotNullIdx) require.Equal(t, c.recovered, showCreateTable(t, recoveredTI)) - + // make sure BuildTiDBTableInfo indentify the correct primary key + if c.caseNumber == 5 { + inexes := recoveredTI.Indices + primaryCount := 0 + for i := range inexes { + if inexes[i].Primary { + primaryCount++ + } + } + require.Equal(t, 1, primaryCount) + require.Equal(t, 2, len(handle.UniqueNotNullIdx.Columns)) + } // mimic the columns are set to nil when old value feature is disabled for i := range cols { if !cols[i].Flag.IsHandleKey() { diff --git a/cdc/model/sink.go b/cdc/model/sink.go index decb3d1e8cc..b364126ade0 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -604,7 +604,10 @@ func BuildTiDBTableInfo(columns []*Column, indexColumns [][]int) *model.TableInf continue } if firstCol.Flag.IsPrimaryKey() { - indexInfo.Primary = true + // If all columns in the index are primary key, then the index is primary key. + if CheckIfIndexesPrimaryKey(colOffsets, columns) { + indexInfo.Primary = true + } indexInfo.Unique = true } if firstCol.Flag.IsUniqueKey() { @@ -628,6 +631,17 @@ func BuildTiDBTableInfo(columns []*Column, indexColumns [][]int) *model.TableInf return ret } +// CheckIfIndexesPrimaryKey checks if all columns in a index is primary key +func CheckIfIndexesPrimaryKey(columnOffsets []int, columns []*Column) bool { + for _, offset := range columnOffsets { + column := columns[offset] + if column == nil || !column.Flag.IsPrimaryKey() { + return false + } + } + return true +} + // ColumnValueString returns the string representation of the column value func ColumnValueString(c interface{}) string { var data string