Skip to content

Commit

Permalink
fix batch dml delete sql error
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen committed Sep 26, 2023
1 parent 43848f2 commit d16d721
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 473 deletions.
140 changes: 6 additions & 134 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1516,119 +1516,6 @@ func TestDecodeEventIgnoreRow(t *testing.T) {
}
}

func TestBuildTableInfo(t *testing.T) {
cases := []struct {
origin string
recovered string
recoveredWithNilCol string
}{
{
"CREATE TABLE t1 (c INT PRIMARY KEY)",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE t1 (" +
" c INT UNSIGNED," +
" c2 VARCHAR(10) NOT NULL," +
" c3 BIT(10) NOT NULL," +
" UNIQUE KEY (c2, c3)" +
")",
// CDC discards field length.
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned DEFAULT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `c3` bit(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`c2`(0),`c3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `omitted` unspecified GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `c3` bit(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`c2`(0),`c3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE t1 (" +
" c INT UNSIGNED," +
" gen INT AS (c+1) VIRTUAL," +
" c2 VARCHAR(10) NOT NULL," +
" gen2 INT AS (c+2) STORED," +
" c3 BIT(10) NOT NULL," +
" PRIMARY KEY (c, c2)" +
")",
// CDC discards virtual generated column, and generating expression of stored generated column.
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned NOT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `gen2` int(0) GENERATED ALWAYS AS (pass_generated_check) STORED,\n" +
" `c3` bit(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0),`c2`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned NOT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `omitted` unspecified GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `omitted` unspecified GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" PRIMARY KEY (`c`(0),`c2`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE `t1` (" +
" `a` int(11) NOT NULL," +
" `b` int(11) DEFAULT NULL," +
" `c` int(11) DEFAULT NULL," +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */," +
" UNIQUE KEY `b` (`b`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `a` int(0) NOT NULL,\n" +
" `b` int(0) DEFAULT NULL,\n" +
" `c` int(0) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`(0)) /*T![clustered_index] CLUSTERED */,\n" +
" UNIQUE KEY `idx_1` (`b`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `a` int(0) NOT NULL,\n" +
" `omitted` unspecified GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `omitted` unspecified GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" PRIMARY KEY (`a`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
p := parser.New()
for _, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols, _, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{})
require.NoError(t, err)
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
require.NotNil(t, handle.UniqueNotNullIdx)
require.Equal(t, c.recovered, showCreateTable(t, recoveredTI))

// mimic the columns are set to nil when old value feature is disabled
for i := range cols {
if !cols[i].Flag.IsHandleKey() {
cols[i] = nil
}
}
recoveredTI = model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle = sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
require.NotNil(t, handle.UniqueNotNullIdx)
require.Equal(t, c.recoveredWithNilCol, showCreateTable(t, recoveredTI))
}
}

var tiCtx = mock.NewContext()

func showCreateTable(t *testing.T, ti *timodel.TableInfo) string {
Expand All @@ -1648,12 +1535,10 @@ func TestNewDMRowChange(t *testing.T) {
" a1 INT NOT NULL," +
" a3 INT NOT NULL," +
" UNIQUE KEY dex1(a1, a3));",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `id` int(0) DEFAULT NULL,\n" +
" `a1` int(0) NOT NULL,\n" +
" `a3` int(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`a1`(0),`a3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `t1` (\n `id` int(11) DEFAULT NULL,\n" +
" `a1` int(11) NOT NULL,\n `a3` int(11) NOT NULL,\n" +
" UNIQUE KEY `dex1` (`a1`,`a3`)\n) " +
"ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
p := parser.New()
Expand All @@ -1662,22 +1547,9 @@ func TestNewDMRowChange(t *testing.T) {
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols := []*model.Column{
{
Name: "id", Type: 3, Charset: "binary", Flag: 65, Value: 1, Default: nil,
},
{
Name: "a1", Type: 3, Charset: "binary", Flag: 51, Value: 1, Default: nil,
},
{
Name: "a3", Type: 3, Charset: "binary", Flag: 51, Value: 2, Default: nil,
},
}
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
require.Equal(t, c.recovered, showCreateTable(t, recoveredTI))
require.Equal(t, c.recovered, showCreateTable(t, originTI))
tableName := &model.TableName{Schema: "db", Table: "t1"}
rowChange := sqlmodel.NewRowChange(tableName, nil, []interface{}{1, 1, 2}, nil, recoveredTI, nil, nil)
rowChange := sqlmodel.NewRowChange(tableName, nil, []interface{}{1, 1, 2}, nil, originTI, nil, nil)
sqlGot, argsGot := rowChange.GenSQL(sqlmodel.DMLDelete)
require.Equal(t, "DELETE FROM `db`.`t1` WHERE `a1` = ? AND `a3` = ? LIMIT 1", sqlGot)
require.Equal(t, []interface{}{1, 2}, argsGot)
Expand Down
99 changes: 0 additions & 99 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/integrity"
Expand Down Expand Up @@ -530,104 +529,6 @@ type RedoColumn struct {
Flag uint64 `msg:"flag"`
}

// BuildTiDBTableInfo builds a TiDB TableInfo from given information.
func BuildTiDBTableInfo(columns []*Column, indexColumns [][]int) *model.TableInfo {
ret := &model.TableInfo{}
// nowhere will use this field, so we set a debug message
ret.Name = model.NewCIStr("BuildTiDBTableInfo")

for i, col := range columns {
columnInfo := &model.ColumnInfo{
Offset: i,
State: model.StatePublic,
}
if col == nil {
// by referring to datum2Column, nil is happened when
// - !IsColCDCVisible, which means the column is a virtual generated
// column
// - !exist && !fillWithDefaultValue, which means upstream does not
// send the column value
// just mock for the first case
columnInfo.Name = model.NewCIStr("omitted")
columnInfo.GeneratedExprString = "pass_generated_check"
columnInfo.GeneratedStored = false
ret.Columns = append(ret.Columns, columnInfo)
continue
}
columnInfo.Name = model.NewCIStr(col.Name)
columnInfo.SetType(col.Type)
// TiKV always use utf8mb4 to store, and collation is not recorded by CDC
columnInfo.SetCharset(mysql.UTF8MB4Charset)
columnInfo.SetCollate(mysql.UTF8MB4DefaultCollation)

// inverse initColumnsFlag
flag := col.Flag
if flag.IsBinary() {
columnInfo.SetCharset("binary")
}
if flag.IsGeneratedColumn() {
// we do not use this field, so we set it to any non-empty string
columnInfo.GeneratedExprString = "pass_generated_check"
columnInfo.GeneratedStored = true
}
if flag.IsHandleKey() {
columnInfo.AddFlag(mysql.PriKeyFlag)
ret.IsCommonHandle = true
} else if flag.IsPrimaryKey() {
columnInfo.AddFlag(mysql.PriKeyFlag)
}
if flag.IsUniqueKey() {
columnInfo.AddFlag(mysql.UniqueKeyFlag)
}
if !flag.IsNullable() {
columnInfo.AddFlag(mysql.NotNullFlag)
}
if flag.IsMultipleKey() {
columnInfo.AddFlag(mysql.MultipleKeyFlag)
}
if flag.IsUnsigned() {
columnInfo.AddFlag(mysql.UnsignedFlag)
}
ret.Columns = append(ret.Columns, columnInfo)
}

for i, colOffsets := range indexColumns {
indexInfo := &model.IndexInfo{
Name: model.NewCIStr(fmt.Sprintf("idx_%d", i)),
State: model.StatePublic,
}
firstCol := columns[colOffsets[0]]
if firstCol == nil {
// when the referenced column is nil, we already have a handle index
// so we can skip this index.
// only happens for DELETE event and old value feature is disabled
continue
}
if firstCol.Flag.IsPrimaryKey() {
indexInfo.Primary = true
indexInfo.Unique = true
}
if firstCol.Flag.IsUniqueKey() {
indexInfo.Unique = true
}

for _, offset := range colOffsets {
col := ret.Columns[offset]

indexCol := &model.IndexColumn{}
indexCol.Name = col.Name
indexCol.Offset = offset
indexInfo.Columns = append(indexInfo.Columns, indexCol)
}

// TODO: revert the "all column set index related flag" to "only the
// first column set index related flag" if needed

ret.Indices = append(ret.Indices, indexInfo)
}
return ret
}

// ColumnValueString returns the string representation of the column value
func ColumnValueString(c interface{}) string {
var data string
Expand Down
6 changes: 2 additions & 4 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (s *mysqlBackend) Flush(ctx context.Context) (err error) {
}

dmls := s.prepareDMLs()
log.Debug("prepare DMLs", zap.Any("rows", s.rows),
log.Info("prepare DMLs", zap.Any("rows", s.rows),
zap.Strings("sqls", dmls.sqls), zap.Any("values", dmls.values))

start := time.Now()
Expand Down Expand Up @@ -567,9 +567,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs {
}
// only use batch dml when the table has a handle key
if hasHandleKey(tableColumns) {
// TODO(dongmen): find a better way to get table info.
tableInfo := model.BuildTiDBTableInfo(tableColumns, firstRow.IndexColumns)
sql, value := s.batchSingleTxnDmls(event, tableInfo, translateToInsert)
sql, value := s.batchSingleTxnDmls(event, firstRow.TableInfo.TableInfo, translateToInsert)
sqls = append(sqls, sql...)
values = append(values, value...)

Expand Down
Loading

0 comments on commit d16d721

Please sign in to comment.