From 7ea72b9de43a4a3f9e26b9cb3b55f1808aad3bfa Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 26 Sep 2023 16:16:27 +0800 Subject: [PATCH] use tableInfo in rowChangeEvent to gennerate dmls --- cdc/sink/dmlsink/txn/mysql/mysql.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index f0eb05709a2..18c329f0ccc 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -567,8 +567,15 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { } // only use batch dml when the table has a handle key if hasHandleKey(tableColumns) { - // TODO(dongmen): find a better way to get table info. - tableInfo := model.BuildTiDBTableInfo(tableColumns, firstRow.IndexColumns) + var tableInfo *timodel.TableInfo + if firstRow.TableInfo != nil && firstRow.TableInfo.TableInfo != nil { + tableInfo = firstRow.TableInfo.TableInfo + } else { + // firstRow.TableInfo is nil only when the row change event was generated by + // mounter, which only occurs when mysql sink was used in redo consumer or + // in the other consumers. + tableInfo = model.BuildTiDBTableInfo(tableColumns, firstRow.IndexColumns) + } sql, value := s.batchSingleTxnDmls(event, tableInfo, translateToInsert) sqls = append(sqls, sql...) values = append(values, value...)