diff --git a/cdc/entry/schema_test_helper.go b/cdc/entry/schema_test_helper.go index b3cbf9ff44a..28830503275 100644 --- a/cdc/entry/schema_test_helper.go +++ b/cdc/entry/schema_test_helper.go @@ -262,15 +262,28 @@ func (s *SchemaTestHelper) DDL2Event(ddl string) *model.DDLEvent { require.NoError(s.t, err) s.schemaStorage.AdvanceResolvedTs(ver.Ver) - tableInfo, ok := s.schemaStorage.GetLastSnapshot().TableByName(res.SchemaName, res.TableName) - require.True(s.t, ok) + var tableInfo *model.TableInfo + if res.BinlogInfo != nil && res.BinlogInfo.TableInfo != nil { + tableInfo = model.WrapTableInfo(res.SchemaID, res.SchemaName, res.BinlogInfo.FinishedTS, res.BinlogInfo.TableInfo) + } else { + tableInfo = &model.TableInfo{ + TableName: model.TableName{Schema: res.SchemaName}, + Version: res.BinlogInfo.FinishedTS, + } + } + ctx := context.Background() + snap, err := s.schemaStorage.GetSnapshot(ctx, res.BinlogInfo.FinishedTS-1) + require.NoError(s.t, err) + preTableInfo, err := snap.PreTableInfo(res) + require.NoError(s.t, err) event := &model.DDLEvent{ - StartTs: res.StartTS, - CommitTs: res.BinlogInfo.FinishedTS, - TableInfo: tableInfo, - Query: res.Query, - Type: res.Type, + StartTs: res.StartTS, + CommitTs: res.BinlogInfo.FinishedTS, + TableInfo: tableInfo, + PreTableInfo: preTableInfo, + Query: res.Query, + Type: res.Type, } return event diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 1c48ac6902e..eae8c9af7a3 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -557,7 +557,7 @@ func TestRemovePausedChangefeed(t *testing.T) { info.State = model.StateStopped dir := t.TempDir() // Field `Consistent` is valid only when the downstream - // is MySQL compatible Database + // is MySQL compatible Schema info.SinkURI = "mysql://" info.Config.Consistent = &config.ConsistentConfig{ Level: "eventual", diff --git a/pkg/sink/codec/simple/decoder.go b/pkg/sink/codec/simple/decoder.go index 5ff71e8bda0..f3200f395f5 100644 --- a/pkg/sink/codec/simple/decoder.go +++ b/pkg/sink/codec/simple/decoder.go @@ -137,11 +137,11 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { return d.assembleHandleKeyOnlyRowChangedEvent(d.msg) } - tableInfo := d.memo.Read(d.msg.Database, d.msg.Table, d.msg.SchemaVersion) + tableInfo := d.memo.Read(d.msg.Schema, d.msg.Table, d.msg.SchemaVersion) if tableInfo == nil { return nil, cerror.ErrCodecDecode.GenWithStack( "cannot found the table info, schema: %s, table: %s, version: %d", - d.msg.Database, d.msg.Table, d.msg.SchemaVersion) + d.msg.Schema, d.msg.Table, d.msg.SchemaVersion) } event, err := buildRowChangedEvent(d.msg, tableInfo) @@ -180,11 +180,11 @@ func (d *decoder) assembleClaimCheckRowChangedEvent(claimCheckLocation string) ( } func (d *decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowChangedEvent, error) { - tableInfo := d.memo.Read(m.Database, m.Table, m.SchemaVersion) + tableInfo := d.memo.Read(m.Schema, m.Table, m.SchemaVersion) if tableInfo == nil { return nil, cerror.ErrCodecDecode.GenWithStack( "cannot found the table info, schema: %s, table: %s, version: %d", - m.Database, m.Table, m.SchemaVersion) + m.Schema, m.Table, m.SchemaVersion) } fieldTypeMap := make(map[string]*types.FieldType, len(tableInfo.Columns)) @@ -194,7 +194,7 @@ func (d *decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowCh result := &message{ Version: defaultVersion, - Database: m.Database, + Schema: m.Schema, Table: m.Table, Type: m.Type, CommitTs: m.CommitTs, @@ -204,7 +204,7 @@ func (d *decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowCh ctx := context.Background() switch m.Type { case InsertType: - holder, err := common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs, m.Database, m.Table, m.Data) + holder, err := common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs, m.Schema, m.Table, m.Data) if err != nil { return nil, err } @@ -214,7 +214,7 @@ func (d *decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowCh } result.Data = data case UpdateType: - holder, err := common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs, m.Database, m.Table, m.Data) + holder, err := common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs, m.Schema, m.Table, m.Data) if err != nil { return nil, err } @@ -224,7 +224,7 @@ func (d *decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowCh } result.Data = data - holder, err = common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs-1, m.Database, m.Table, m.Old) + holder, err = common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs-1, m.Schema, m.Table, m.Old) if err != nil { return nil, err } @@ -234,7 +234,7 @@ func (d *decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowCh } result.Old = old case DeleteType: - holder, err := common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs-1, m.Database, m.Table, m.Old) + holder, err := common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs-1, m.Schema, m.Table, m.Old) if err != nil { return nil, err } @@ -263,7 +263,7 @@ func (d *decoder) buildData( if !ok { return nil, cerror.ErrCodecDecode.GenWithStack( "cannot found the field type, schema: %s, table: %s, column: %s", - d.msg.Database, d.msg.Table, col.Name()) + d.msg.Schema, d.msg.Table, col.Name()) } value, err := encodeValue(value, fieldType) if err != nil { @@ -285,6 +285,7 @@ func (d *decoder) NextDDLEvent() (*model.DDLEvent, error) { d.msg = nil d.memo.Write(ddl.TableInfo) + d.memo.Write(ddl.PreTableInfo) return ddl, nil } @@ -306,6 +307,9 @@ func newMemoryTableInfoProvider() *memoryTableInfoProvider { } func (m *memoryTableInfoProvider) Write(info *model.TableInfo) { + if info == nil { + return + } key := cacheKey{ schema: info.TableName.Schema, table: info.TableName.Table, diff --git a/pkg/sink/codec/simple/encoder_test.go b/pkg/sink/codec/simple/encoder_test.go index 47d6c3b33bf..6389bca7dca 100644 --- a/pkg/sink/codec/simple/encoder_test.go +++ b/pkg/sink/codec/simple/encoder_test.go @@ -43,7 +43,7 @@ func TestEncodeCheckpoint(t *testing.T) { require.NoError(t, err) enc := builder.Build() - checkpoint := 23 + checkpoint := 446266400629063682 m, err := enc.EncodeCheckpointEvent(uint64(checkpoint)) require.NoError(t, err) @@ -69,6 +69,20 @@ func TestEncodeDDLEvent(t *testing.T) { helper := entry.NewSchemaTestHelper(t) defer helper.Close() + sql := `create table test.t(id int primary key, name varchar(255) not null, gender enum('male', 'female'), email varchar(255) not null, key idx_name_email(name, email))` + createTableDDLEvent := helper.DDL2Event(sql) + + sql = `insert into test.t values (1, "jack", "male", "jack@abc.com")` + insertEvent := helper.DML2Event(sql, "test", "t") + + sql = `rename table test.t to test.abc` + renameTableDDLEvent := helper.DDL2Event(sql) + + sql = `insert into test.abc values (2, "anna", "female", "anna@abc.com")` + insertEvent2 := helper.DML2Event(sql, "test", "abc") + + helper.Tk().MustExec("drop table test.abc") + ctx := context.Background() for _, compressionType := range []string{ compression.None, @@ -81,18 +95,10 @@ func TestEncodeDDLEvent(t *testing.T) { require.NoError(t, err) enc := builder.Build() - sql := `create table test.t( - id int primary key, - name varchar(255) not null, - gender enum('male', 'female'), - email varchar(255) not null, - key idx_name_email(name, email))` - ddlEvent := helper.DDL2Event(sql) - - m, err := enc.EncodeDDLEvent(ddlEvent) + dec, err := NewDecoder(ctx, codecConfig, nil) require.NoError(t, err) - dec, err := NewDecoder(ctx, codecConfig, nil) + m, err := enc.EncodeDDLEvent(createTableDDLEvent) require.NoError(t, err) err = dec.AddKeyValue(m.Key, m.Value) @@ -106,22 +112,20 @@ func TestEncodeDDLEvent(t *testing.T) { event, err := dec.NextDDLEvent() require.NoError(t, err) - require.Equal(t, ddlEvent.CommitTs, event.CommitTs) + require.Equal(t, createTableDDLEvent.CommitTs, event.CommitTs) // because we don't we don't set startTs in the encoded message, // so the startTs is equal to commitTs - require.Equal(t, ddlEvent.CommitTs, event.StartTs) - require.Equal(t, ddlEvent.Query, event.Query) - require.Equal(t, len(ddlEvent.TableInfo.Columns), len(event.TableInfo.Columns)) - require.Equal(t, len(ddlEvent.TableInfo.Indices)+1, len(event.TableInfo.Indices)) - - item := dec.memo.Read(ddlEvent.TableInfo.TableName.Schema, - ddlEvent.TableInfo.TableName.Table, ddlEvent.TableInfo.UpdateTS) + require.Equal(t, createTableDDLEvent.CommitTs, event.StartTs) + require.Equal(t, createTableDDLEvent.Query, event.Query) + require.Equal(t, len(createTableDDLEvent.TableInfo.Columns), len(event.TableInfo.Columns)) + require.Equal(t, len(createTableDDLEvent.TableInfo.Indices)+1, len(event.TableInfo.Indices)) + require.Nil(t, event.PreTableInfo) + + item := dec.memo.Read(createTableDDLEvent.TableInfo.TableName.Schema, + createTableDDLEvent.TableInfo.TableName.Table, createTableDDLEvent.TableInfo.UpdateTS) require.NotNil(t, item) - sql = `insert into test.t values (1, "jack", "male", "jack@abc.com")` - row := helper.DML2Event(sql, "test", "t") - - err = enc.AppendRowChangedEvent(context.Background(), "", row, func() {}) + err = enc.AppendRowChangedEvent(context.Background(), "", insertEvent, func() {}) require.NoError(t, err) messages := enc.Build() @@ -138,12 +142,59 @@ func TestEncodeDDLEvent(t *testing.T) { decodedRow, err := dec.NextRowChangedEvent() require.NoError(t, err) - require.Equal(t, decodedRow.CommitTs, row.CommitTs) - require.Equal(t, decodedRow.Table.Schema, row.Table.Schema) - require.Equal(t, decodedRow.Table.Table, row.Table.Table) + require.Equal(t, decodedRow.CommitTs, insertEvent.CommitTs) + require.Equal(t, decodedRow.Table.Schema, insertEvent.Table.Schema) + require.Equal(t, decodedRow.Table.Table, insertEvent.Table.Table) require.Nil(t, decodedRow.PreColumns) - helper.Tk().MustExec("drop table test.t") + m, err = enc.EncodeDDLEvent(renameTableDDLEvent) + require.NoError(t, err) + + err = dec.AddKeyValue(m.Key, m.Value) + require.NoError(t, err) + + messageType, hasNext, err = dec.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeDDL, messageType) + require.NotEqual(t, 0, dec.msg.BuildTs) + + event, err = dec.NextDDLEvent() + require.NoError(t, err) + require.Equal(t, renameTableDDLEvent.CommitTs, event.CommitTs) + // because we don't we don't set startTs in the encoded message, + // so the startTs is equal to commitTs + require.Equal(t, renameTableDDLEvent.CommitTs, event.StartTs) + require.Equal(t, renameTableDDLEvent.Query, event.Query) + require.Equal(t, len(renameTableDDLEvent.TableInfo.Columns), len(event.TableInfo.Columns)) + require.Equal(t, len(renameTableDDLEvent.TableInfo.Indices)+1, len(event.TableInfo.Indices)) + require.NotNil(t, event.PreTableInfo) + + item = dec.memo.Read(renameTableDDLEvent.TableInfo.TableName.Schema, + renameTableDDLEvent.TableInfo.TableName.Table, renameTableDDLEvent.TableInfo.UpdateTS) + require.NotNil(t, item) + + err = enc.AppendRowChangedEvent(context.Background(), "", insertEvent2, func() {}) + require.NoError(t, err) + + messages = enc.Build() + require.Len(t, messages, 1) + + err = dec.AddKeyValue(messages[0].Key, messages[0].Value) + require.NoError(t, err) + + messageType, hasNext, err = dec.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, messageType) + require.NotEqual(t, 0, dec.msg.BuildTs) + + decodedRow, err = dec.NextRowChangedEvent() + require.NoError(t, err) + require.Equal(t, decodedRow.CommitTs, insertEvent2.CommitTs) + require.Equal(t, decodedRow.Table.Schema, insertEvent2.Table.Schema) + require.Equal(t, decodedRow.Table.Table, insertEvent2.Table.Table) + require.Nil(t, decodedRow.PreColumns) } } @@ -223,6 +274,20 @@ func TestEncodeBootstrapEvent(t *testing.T) { helper := entry.NewSchemaTestHelper(t) defer helper.Close() + sql := `create table test.t( + id int primary key, + name varchar(255) not null, + age int, + email varchar(255) not null, + key idx_name_email(name, email))` + ddlEvent := helper.DDL2Event(sql) + ddlEvent.IsBootstrap = true + + sql = `insert into test.t values (1, "jack", 23, "jack@abc.com")` + row := helper.DML2Event(sql, "test", "t") + + helper.Tk().MustExec("drop table test.t") + ctx := context.Background() for _, compressionType := range []string{ compression.None, @@ -235,15 +300,6 @@ func TestEncodeBootstrapEvent(t *testing.T) { require.NoError(t, err) enc := builder.Build() - sql := `create table test.t( - id int primary key, - name varchar(255) not null, - age int, - email varchar(255) not null, - key idx_name_email(name, email))` - ddlEvent := helper.DDL2Event(sql) - ddlEvent.IsBootstrap = true - m, err := enc.EncodeDDLEvent(ddlEvent) require.NoError(t, err) @@ -274,9 +330,6 @@ func TestEncodeBootstrapEvent(t *testing.T) { ddlEvent.TableInfo.TableName.Table, ddlEvent.TableInfo.UpdateTS) require.NotNil(t, item) - sql = `insert into test.t values (1, "jack", 23, "jack@abc.com")` - row := helper.DML2Event(sql, "test", "t") - err = enc.AppendRowChangedEvent(context.Background(), "", row, func() {}) require.NoError(t, err) @@ -298,8 +351,6 @@ func TestEncodeBootstrapEvent(t *testing.T) { require.Equal(t, decodedRow.Table.Schema, row.Table.Schema) require.Equal(t, decodedRow.Table.Table, row.Table.Table) require.Nil(t, decodedRow.PreColumns) - - helper.Tk().MustExec("drop table test.t") } } diff --git a/pkg/sink/codec/simple/message.go b/pkg/sink/codec/simple/message.go index cf8c31a6194..40f2e32f47f 100644 --- a/pkg/sink/codec/simple/message.go +++ b/pkg/sink/codec/simple/message.go @@ -196,8 +196,11 @@ func newTiIndexInfo(indexSchema *IndexSchema) *timodel.IndexInfo { // TableSchema is the schema of the table. type TableSchema struct { - Columns []*columnSchema `json:"columns"` - Indexes []*IndexSchema `json:"indexes"` + Database string `json:"database"` + Table string `json:"table"` + Version uint64 `json:"version"` + Columns []*columnSchema `json:"columns"` + Indexes []*IndexSchema `json:"indexes"` } func newTableSchema(tableInfo *model.TableInfo) *TableSchema { @@ -221,7 +224,7 @@ func newTableSchema(tableInfo *model.TableInfo) *TableSchema { indexes = append(indexes, index) } - // Sometime the primary key is not in the index, we need to find it manually. + // sometimes the primary key is not in the index, we need to find it manually. if !pkInIndexes { pkColumns := tableInfo.GetPrimaryKeyColumnNames() if len(pkColumns) != 0 { @@ -239,33 +242,48 @@ func newTableSchema(tableInfo *model.TableInfo) *TableSchema { } return &TableSchema{ - Columns: columns, - Indexes: indexes, + Database: tableInfo.TableName.Schema, + Table: tableInfo.TableName.Table, + Version: tableInfo.UpdateTS, + Columns: columns, + Indexes: indexes, } } // newTableInfo converts from TableSchema to TableInfo. -func newTableInfo(msg *message) *model.TableInfo { +func newTableInfo(m *TableSchema) *model.TableInfo { + var ( + database string + table string + schemaVersion uint64 + ) + if m != nil { + database = m.Database + table = m.Table + schemaVersion = m.Version + } info := &model.TableInfo{ TableName: model.TableName{ - Schema: msg.Database, - Table: msg.Table, + Schema: database, + Table: table, }, TableInfo: &timodel.TableInfo{ - Name: timodel.NewCIStr(msg.Table), - UpdateTS: msg.SchemaVersion, + Name: timodel.NewCIStr(table), + UpdateTS: schemaVersion, }, } - if msg.TableSchema != nil { - for _, col := range msg.TableSchema.Columns { - tiCol := newTiColumnInfo(col, msg.TableSchema.Indexes) - info.Columns = append(info.Columns, tiCol) - } - for _, idx := range msg.TableSchema.Indexes { - index := newTiIndexInfo(idx) - info.Indices = append(info.Indices, index) - } + if m == nil { + return info + } + + for _, col := range m.Columns { + tiCol := newTiColumnInfo(col, m.Indexes) + info.Columns = append(info.Columns, tiCol) + } + for _, idx := range m.Indexes { + index := newTiIndexInfo(idx) + info.Indices = append(info.Indices, index) } return info @@ -273,12 +291,16 @@ func newTableInfo(msg *message) *model.TableInfo { // newDDLEvent converts from message to DDLEvent. func newDDLEvent(msg *message) *model.DDLEvent { - tableInfo := newTableInfo(msg) + var preTableInfo *model.TableInfo + if msg.PreTableSchema != nil { + preTableInfo = newTableInfo(msg.PreTableSchema) + } return &model.DDLEvent{ - StartTs: msg.CommitTs, - CommitTs: msg.CommitTs, - TableInfo: tableInfo, - Query: msg.SQL, + StartTs: msg.CommitTs, + CommitTs: msg.CommitTs, + TableInfo: newTableInfo(msg.TableSchema), + PreTableInfo: preTableInfo, + Query: msg.SQL, } } @@ -287,7 +309,7 @@ func buildRowChangedEvent(msg *message, tableInfo *model.TableInfo) (*model.RowC result := &model.RowChangedEvent{ CommitTs: msg.CommitTs, Table: &model.TableName{ - Schema: msg.Database, + Schema: msg.Schema, Table: msg.Table, }, TableInfo: tableInfo, @@ -334,28 +356,36 @@ func decodeColumns(rawData map[string]interface{}, fieldTypeMap map[string]*type type message struct { Version int `json:"version"` - // Scheme and Table is empty for the resolved ts event. - Database string `json:"database,omitempty"` - Table string `json:"table,omitempty"` - Type EventType `json:"type"` - CommitTs uint64 `json:"commitTs"` - BuildTs int64 `json:"buildTs"` - // Data is available for the Insert and Update event. - Data map[string]interface{} `json:"data,omitempty"` - // Old is available for the Update and Delete event. - Old map[string]interface{} `json:"old,omitempty"` - // TableSchema is for the DDL and Bootstrap event. - TableSchema *TableSchema `json:"tableSchema,omitempty"` + // Schema and Table is empty for the resolved ts event. + Schema string `json:"schema,omitempty"` + Table string `json:"table,omitempty"` + Type EventType `json:"type"` // SQL is only for the DDL event. - SQL string `json:"sql,omitempty"` - // SchemaVersion is for the DDL, Bootstrap and DML event. + SQL string `json:"sql,omitempty"` + CommitTs uint64 `json:"commitTs"` + BuildTs int64 `json:"buildTs"` + // SchemaVersion is for the DML event. SchemaVersion uint64 `json:"schemaVersion,omitempty"` // ClaimCheckLocation is only for the DML event. ClaimCheckLocation string `json:"claimCheckLocation,omitempty"` - // HandleKeyOnly is only for the DML event. HandleKeyOnly bool `json:"handleKeyOnly,omitempty"` + + // E2E checksum related fields, only set when enable checksum functionality. + Checksum string `json:"checksum,omitempty"` + OldChecksum string `json:"oldChecksum,omitempty"` + Corrupted bool `json:"corrupted,omitempty"` + ChecksumVersion int `json:"checksumVersion,omitempty"` + + // Data is available for the Insert and Update event. + Data map[string]interface{} `json:"data,omitempty"` + // Old is available for the Update and Delete event. + Old map[string]interface{} `json:"old,omitempty"` + // TableSchema is for the DDL and Bootstrap event. + TableSchema *TableSchema `json:"tableSchema,omitempty"` + // PreTableSchema holds schema information before the DDL executed. + PreTableSchema *TableSchema `json:"preTableSchema,omitempty"` } func newResolvedMessage(ts uint64) *message { @@ -369,29 +399,28 @@ func newResolvedMessage(ts uint64) *message { func newDDLMessage(ddl *model.DDLEvent) *message { var ( - database string - table string - schema *TableSchema - schemaVersion uint64 + schema *TableSchema + preSchema *TableSchema ) // the tableInfo maybe nil if the DDL is `drop database` if ddl.TableInfo != nil && ddl.TableInfo.TableInfo != nil { schema = newTableSchema(ddl.TableInfo) - database = ddl.TableInfo.TableName.Schema - table = ddl.TableInfo.TableName.Table - schemaVersion = ddl.TableInfo.UpdateTS + } + if !ddl.IsBootstrap { + // `PreTableInfo` may not exist for some DDL, such as `create table` + if ddl.PreTableInfo != nil && ddl.PreTableInfo.TableInfo != nil { + preSchema = newTableSchema(ddl.PreTableInfo) + } } msg := &message{ - Version: defaultVersion, - Database: database, - Table: table, - Type: DDLType, - CommitTs: ddl.CommitTs, - BuildTs: time.Now().UnixMilli(), - SQL: ddl.Query, - TableSchema: schema, - SchemaVersion: schemaVersion, + Version: defaultVersion, + Type: DDLType, + CommitTs: ddl.CommitTs, + BuildTs: time.Now().UnixMilli(), + SQL: ddl.Query, + TableSchema: schema, + PreTableSchema: preSchema, } if ddl.IsBootstrap { msg.Type = BootstrapType @@ -406,7 +435,7 @@ func newDMLMessage( ) (*message, error) { m := &message{ Version: defaultVersion, - Database: event.Table.Schema, + Schema: event.Table.Schema, Table: event.Table.Table, CommitTs: event.CommitTs, BuildTs: time.Now().UnixMilli(), diff --git a/pkg/sink/codec/simple/message_test.go b/pkg/sink/codec/simple/message_test.go index b7e17ea4816..0251e861279 100644 --- a/pkg/sink/codec/simple/message_test.go +++ b/pkg/sink/codec/simple/message_test.go @@ -35,6 +35,9 @@ func TestNewTableSchema(t *testing.T) { );` tableInfo := helper.DDL2Event(sql).TableInfo want := &TableSchema{ + Database: tableInfo.TableName.Schema, + Table: tableInfo.TableName.Table, + Version: tableInfo.UpdateTS, Columns: []*columnSchema{ { ID: 1, @@ -120,6 +123,9 @@ func TestNewTableSchema(t *testing.T) { );` tableInfo = helper.DDL2Event(sql).TableInfo want = &TableSchema{ + Database: tableInfo.TableName.Schema, + Table: tableInfo.TableName.Table, + Version: tableInfo.UpdateTS, Columns: []*columnSchema{ { ID: 1, @@ -235,6 +241,9 @@ func TestNewTableSchema(t *testing.T) { tgen tinyint AS (t+1))` // 38 tableInfo = helper.DDL2Event(sql).TableInfo want = &TableSchema{ + Database: tableInfo.TableName.Schema, + Table: tableInfo.TableName.Table, + Version: tableInfo.UpdateTS, Columns: []*columnSchema{ { ID: 1,