diff --git a/pkg/sink/codec/simple/avro.go b/pkg/sink/codec/simple/avro.go index 51ab82e6bea..22399d3e410 100644 --- a/pkg/sink/codec/simple/avro.go +++ b/pkg/sink/codec/simple/avro.go @@ -18,13 +18,10 @@ import ( "sync" "time" - "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/types" "github.com/pingcap/tidb/pkg/util/rowcodec" "github.com/pingcap/tiflow/cdc/model" - cerror "github.com/pingcap/tiflow/pkg/errors" - "go.uber.org/zap" ) func newTableSchemaMap(tableInfo *model.TableInfo) interface{} { @@ -41,7 +38,7 @@ func newTableSchemaMap(tableInfo *model.TableInfo) interface{} { for _, col := range idx.Columns { columns = append(columns, col.Name.O) colInfo := tableInfo.Columns[col.Offset] - // An index is not null when all columns of aer not null + // An index is not null when all columns of are not null if !mysql.HasNotNullFlag(colInfo.GetFlag()) { index["nullable"] = true } @@ -215,6 +212,13 @@ var ( }, } + // rowMapPool return map for each row + rowMapPool = sync.Pool{ + New: func() any { + return make(map[string]interface{}) + }, + } + // dmlPayloadHolderPool return holder for the payload dmlPayloadHolderPool = sync.Pool{ New: func() any { @@ -283,8 +287,6 @@ func (a *avroMarshaller) newDMLMessageMap( old := a.collectColumns(event.PreColumns, event.ColInfos, onlyHandleKey) m["old"] = old m["type"] = string(DMLTypeUpdate) - } else { - log.Panic("invalid event type, this should not hit", zap.Any("event", event)) } m = map[string]interface{}{ @@ -306,11 +308,11 @@ func recycleMap(m map[string]interface{}) { payload := holder["payload"].(map[string]interface{}) eventMap := payload["com.pingcap.simple.avro.DML"].(map[string]interface{}) - checksumMap := eventMap["com.pingcap.simple.avro.Checksum"] - if checksumMap != nil { - holder := checksumMap.(map[string]interface{}) - clear(holder) - genericMapPool.Put(holder) + checksum := eventMap["checksum"] + if checksum != nil { + checksum := checksum.(map[string]interface{}) + clear(checksum) + genericMapPool.Put(checksum) } dataMap := eventMap["data"] @@ -321,6 +323,8 @@ func recycleMap(m map[string]interface{}) { clear(colMap) genericMapPool.Put(col) } + clear(dataMap) + rowMapPool.Put(dataMap) } oldDataMap := eventMap["old"] @@ -331,6 +335,8 @@ func recycleMap(m map[string]interface{}) { clear(colMap) genericMapPool.Put(col) } + clear(oldDataMap) + rowMapPool.Put(oldDataMap) } holder["payload"] = nil dmlPayloadHolderPool.Put(holder) @@ -341,18 +347,17 @@ func recycleMap(m map[string]interface{}) { func (a *avroMarshaller) collectColumns( columns []*model.Column, columnInfos []rowcodec.ColInfo, onlyHandleKey bool, ) map[string]interface{} { - result := make(map[string]interface{}, len(columns)) + result := rowMapPool.Get().(map[string]interface{}) for idx, col := range columns { - if col == nil { - continue - } - if onlyHandleKey && !col.Flag.IsHandleKey() { - continue + if col != nil { + if onlyHandleKey && !col.Flag.IsHandleKey() { + continue + } + value, avroType := a.encodeValue4Avro(col.Value, columnInfos[idx].Ft) + holder := genericMapPool.Get().(map[string]interface{}) + holder[avroType] = value + result[col.Name] = holder } - value, avroType := a.encodeValue4Avro(col.Value, columnInfos[idx].Ft) - holder := genericMapPool.Get().(map[string]interface{}) - holder[avroType] = value - result[col.Name] = holder } return map[string]interface{}{ @@ -446,16 +451,9 @@ func newTableSchemaFromAvroNative(native map[string]interface{}) *TableSchema { } } -func newMessageFromAvroNative(native interface{}, m *message) error { - rawValues, ok := native.(map[string]interface{})["com.pingcap.simple.avro.Message"].(map[string]interface{}) - if !ok { - return cerror.ErrDecodeFailed.GenWithStack("cannot convert the avro message to map") - } - - rawPayload, ok := rawValues["payload"].(map[string]interface{}) - if !ok { - return cerror.ErrDecodeFailed.GenWithStack("cannot convert the avro payload to map") - } +func newMessageFromAvroNative(native interface{}, m *message) { + rawValues := native.(map[string]interface{})["com.pingcap.simple.avro.Message"].(map[string]interface{}) + rawPayload := rawValues["payload"].(map[string]interface{}) rawMessage := rawPayload["com.pingcap.simple.avro.Watermark"] if rawMessage != nil { @@ -464,7 +462,7 @@ func newMessageFromAvroNative(native interface{}, m *message) error { m.Type = MessageTypeWatermark m.CommitTs = uint64(rawValues["commitTs"].(int64)) m.BuildTs = rawValues["buildTs"].(int64) - return nil + return } rawMessage = rawPayload["com.pingcap.simple.avro.Bootstrap"] @@ -474,7 +472,7 @@ func newMessageFromAvroNative(native interface{}, m *message) error { m.Type = MessageTypeBootstrap m.BuildTs = rawValues["buildTs"].(int64) m.TableSchema = newTableSchemaFromAvroNative(rawValues["tableSchema"].(map[string]interface{})) - return nil + return } rawMessage = rawPayload["com.pingcap.simple.avro.DDL"] @@ -499,7 +497,7 @@ func newMessageFromAvroNative(native interface{}, m *message) error { rawPreTableSchema = rawPreTableSchema["com.pingcap.simple.avro.TableSchema"].(map[string]interface{}) m.PreTableSchema = newTableSchemaFromAvroNative(rawPreTableSchema) } - return nil + return } rawValues = rawPayload["com.pingcap.simple.avro.DML"].(map[string]interface{}) @@ -522,7 +520,6 @@ func newMessageFromAvroNative(native interface{}, m *message) error { m.Checksum = newChecksum(rawValues) m.Data = newDataMap(rawValues["data"]) m.Old = newDataMap(rawValues["old"]) - return nil } func newChecksum(raw map[string]interface{}) *checksum { diff --git a/pkg/sink/codec/simple/decoder.go b/pkg/sink/codec/simple/decoder.go index 9d9e832b9b9..21fa13ff004 100644 --- a/pkg/sink/codec/simple/decoder.go +++ b/pkg/sink/codec/simple/decoder.go @@ -69,10 +69,6 @@ func NewDecoder(ctx context.Context, config *common.Config, db *sql.DB) (*Decode } m, err := newMarshaller(config) - if err != nil { - return nil, errors.Trace(err) - } - return &Decoder{ config: config, marshaller: m, @@ -82,21 +78,17 @@ func NewDecoder(ctx context.Context, config *common.Config, db *sql.DB) (*Decode memo: newMemoryTableInfoProvider(), cachedMessages: list.New(), - }, nil + }, errors.Trace(err) } // AddKeyValue add the received key and values to the Decoder, -func (d *Decoder) AddKeyValue(_, value []byte) error { +func (d *Decoder) AddKeyValue(_, value []byte) (err error) { if d.value != nil { - return cerror.ErrDecodeFailed.GenWithStack( + return cerror.ErrCodecDecode.GenWithStack( "Decoder value already exists, not consumed yet") } - value, err := common.Decompress(d.config.LargeMessageHandle.LargeMessageHandleCompression, value) - if err != nil { - return err - } - d.value = value - return nil + d.value, err = common.Decompress(d.config.LargeMessageHandle.LargeMessageHandleCompression, value) + return err } // HasNext returns whether there is any event need to be consumed @@ -166,15 +158,10 @@ func (d *Decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { } event, err := buildRowChangedEvent(d.msg, tableInfo, d.config.EnableRowChecksum) - if err != nil { - return nil, err - } - event.Table = &model.TableName{ - Schema: tableInfo.TableName.Schema, - Table: tableInfo.TableName.Table, - } d.msg = nil - return event, nil + + log.Debug("row changed event assembled", zap.Any("event", event)) + return event, err } func (d *Decoder) assembleClaimCheckRowChangedEvent(claimCheckLocation string) (*model.RowChangedEvent, error) { @@ -205,9 +192,14 @@ func (d *Decoder) assembleClaimCheckRowChangedEvent(claimCheckLocation string) ( func (d *Decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowChangedEvent, error) { tableInfo := d.memo.Read(m.Schema, m.Table, m.SchemaVersion) if tableInfo == nil { - return nil, cerror.ErrCodecDecode.GenWithStack( - "cannot found the table info, schema: %s, table: %s, version: %d", - m.Schema, m.Table, m.SchemaVersion) + log.Debug("table info not found for the event, "+ + "the consumer should cache this event temporarily, and update the tableInfo after it's received", + zap.String("schema", d.msg.Schema), + zap.String("table", d.msg.Table), + zap.Uint64("version", d.msg.SchemaVersion)) + d.cachedMessages.PushBack(d.msg) + d.msg = nil + return nil, nil } fieldTypeMap := make(map[string]*types.FieldType, len(tableInfo.Columns)) @@ -219,6 +211,7 @@ func (d *Decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowCh Version: defaultVersion, Schema: m.Schema, Table: m.Table, + TableID: m.TableID, Type: m.Type, CommitTs: m.CommitTs, SchemaVersion: m.SchemaVersion, @@ -255,13 +248,7 @@ func (d *Decoder) buildData( col := holder.Types[i] value := holder.Values[i] - fieldType, ok := fieldTypeMap[col.Name()] - if !ok { - log.Panic("cannot found the field type", - zap.String("schema", d.msg.Schema), - zap.String("table", d.msg.Table), - zap.String("column", col.Name())) - } + fieldType := fieldTypeMap[col.Name()] result[col.Name()] = encodeValue(value, fieldType, timezone) } return result @@ -273,10 +260,7 @@ func (d *Decoder) NextDDLEvent() (*model.DDLEvent, error) { return nil, cerror.ErrCodecDecode.GenWithStack( "no message found when decode DDL event") } - ddl, err := newDDLEvent(d.msg) - if err != nil { - return nil, err - } + ddl := newDDLEvent(d.msg) d.msg = nil d.memo.Write(ddl.TableInfo) @@ -288,14 +272,12 @@ func (d *Decoder) NextDDLEvent() (*model.DDLEvent, error) { if err != nil { return nil, err } - if event == nil { - ele = ele.Next() - continue - } - d.CachedRowChangedEvents = append(d.CachedRowChangedEvents, event) next := ele.Next() - d.cachedMessages.Remove(ele) + if event != nil { + d.CachedRowChangedEvents = append(d.CachedRowChangedEvents, event) + d.cachedMessages.Remove(ele) + } ele = next } return ddl, nil diff --git a/pkg/sink/codec/simple/encoder.go b/pkg/sink/codec/simple/encoder.go index 0be52dd5080..7712c79060a 100644 --- a/pkg/sink/codec/simple/encoder.go +++ b/pkg/sink/codec/simple/encoder.go @@ -117,11 +117,11 @@ func (e *encoder) AppendRowChangedEvent( // Build implement the RowEventEncoder interface func (e *encoder) Build() []*common.Message { - if len(e.messages) == 0 { - return nil + var result []*common.Message + if len(e.messages) != 0 { + result = e.messages + e.messages = nil } - result := e.messages - e.messages = nil return result } @@ -134,10 +134,7 @@ func (e *encoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) { value, err = common.Compress(e.config.ChangefeedID, e.config.LargeMessageHandle.LargeMessageHandleCompression, value) - if err != nil { - return nil, err - } - return common.NewResolvedMsg(config.ProtocolSimple, nil, value, ts), nil + return common.NewResolvedMsg(config.ProtocolSimple, nil, value, ts), err } // EncodeDDLEvent implement the DDLEventBatchEncoder interface @@ -186,15 +183,11 @@ func NewBuilder(ctx context.Context, config *common.Config) (*builder, error) { } m, err := newMarshaller(config) - if err != nil { - return nil, errors.Trace(err) - } - return &builder{ config: config, claimCheck: claimCheck, marshaller: m, - }, nil + }, errors.Trace(err) } // Build implement the RowEventEncoderBuilder interface diff --git a/pkg/sink/codec/simple/encoder_test.go b/pkg/sink/codec/simple/encoder_test.go index 89e7dfc98db..5cc83a9adb6 100644 --- a/pkg/sink/codec/simple/encoder_test.go +++ b/pkg/sink/codec/simple/encoder_test.go @@ -154,11 +154,67 @@ func TestEncodeDDLSequence(t *testing.T) { helper := entry.NewSchemaTestHelper(t) defer helper.Close() + dropDBEvent := helper.DDL2Event(`DROP DATABASE IF EXISTS test`) + createDBDDLEvent := helper.DDL2Event(`CREATE DATABASE IF NOT EXISTS test`) helper.Tk().MustExec("use test") + createTableDDLEvent := helper.DDL2Event("CREATE TABLE `TBL1` (`id` INT PRIMARY KEY AUTO_INCREMENT,`value` VARCHAR(255),`payload` VARCHAR(2000),`a` INT)") + + addColumnDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` ADD COLUMN `nn` INT") + + dropColumnDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` DROP COLUMN `nn`") + + changeColumnDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` CHANGE COLUMN `value` `value2` VARCHAR(512)") + + modifyColumnDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` MODIFY COLUMN `value2` VARCHAR(512) FIRST") + + setDefaultDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` ALTER COLUMN `payload` SET DEFAULT _UTF8MB4'a'") + + dropDefaultDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` ALTER COLUMN `payload` DROP DEFAULT") + + autoIncrementDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` AUTO_INCREMENT = 5") + + modifyColumnNullDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` MODIFY COLUMN `a` INT NULL") + + modifyColumnNotNullDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` MODIFY COLUMN `a` INT NOT NULL") + + addIndexDDLEvent := helper.DDL2Event("CREATE INDEX `idx_a` ON `TBL1` (`a`)") + + renameIndexDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` RENAME INDEX `idx_a` TO `new_idx_a`") + + indexVisibilityDDLEvent := helper.DDL2Event("ALTER TABLE TBL1 ALTER INDEX `new_idx_a` INVISIBLE") + + dropIndexDDLEvent := helper.DDL2Event("DROP INDEX `new_idx_a` ON `TBL1`") + + truncateTableDDLEvent := helper.DDL2Event("TRUNCATE TABLE TBL1") + + multiSchemaChangeDDLEvent := helper.DDL2Event("ALTER TABLE TBL1 ADD COLUMN `new_col` INT, ADD INDEX `idx_new_col` (`a`)") + + multiSchemaChangeDropDDLEvent := helper.DDL2Event("ALTER TABLE TBL1 DROP COLUMN `new_col`, DROP INDEX `idx_new_col`") + + renameTableDDLEvent := helper.DDL2Event("RENAME TABLE TBL1 TO TBL2") + + helper.Tk().MustExec("set @@tidb_allow_remove_auto_inc = 1") + renameColumnDDLEvent := helper.DDL2Event("ALTER TABLE TBL2 CHANGE COLUMN `id` `id2` INT") + + partitionTableDDLEvent := helper.DDL2Event("ALTER TABLE TBL2 PARTITION BY RANGE (id2) (PARTITION p0 VALUES LESS THAN (10), PARTITION p1 VALUES LESS THAN (20))") + + addPartitionDDLEvent := helper.DDL2Event("ALTER TABLE TBL2 ADD PARTITION (PARTITION p2 VALUES LESS THAN (30))") + + dropPartitionDDLEvent := helper.DDL2Event("ALTER TABLE TBL2 DROP PARTITION p2") + + truncatePartitionDDLevent := helper.DDL2Event("ALTER TABLE TBL2 TRUNCATE PARTITION p1") + + reorganizePartitionDDLEvent := helper.DDL2Event("ALTER TABLE TBL2 REORGANIZE PARTITION p1 INTO (PARTITION p3 VALUES LESS THAN (40))") + + removePartitionDDLEvent := helper.DDL2Event("ALTER TABLE TBL2 REMOVE PARTITIONING") + + alterCharsetCollateDDLEvent := helper.DDL2Event("ALTER TABLE TBL2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin") + + dropTableDDLEvent := helper.DDL2Event("DROP TABLE TBL2") + ctx := context.Background() codecConfig := common.NewConfig(config.ProtocolSimple) - for _, format := range []common.EncodingFormatType{ common.EncodingFormatAvro, common.EncodingFormatJSON, @@ -178,9 +234,7 @@ func TestEncodeDDLSequence(t *testing.T) { dec, err := NewDecoder(ctx, codecConfig, nil) require.NoError(t, err) - createTableDDLEvent := helper.DDL2Event( - "CREATE TABLE `TBL1` (`id` INT PRIMARY KEY AUTO_INCREMENT,`value` VARCHAR(255),`payload` VARCHAR(2000),`a` INT)") - m, err := enc.EncodeDDLEvent(createTableDDLEvent) + m, err := enc.EncodeDDLEvent(dropDBEvent) require.NoError(t, err) err = dec.AddKeyValue(m.Key, m.Value) @@ -190,6 +244,36 @@ func TestEncodeDDLSequence(t *testing.T) { require.NoError(t, err) require.True(t, hasNext) require.Equal(t, model.MessageTypeDDL, messageType) + require.Equal(t, DDLTypeQuery, dec.msg.Type) + + _, err = dec.NextDDLEvent() + require.NoError(t, err) + + m, err = enc.EncodeDDLEvent(createDBDDLEvent) + require.NoError(t, err) + + err = dec.AddKeyValue(m.Key, m.Value) + require.NoError(t, err) + + messageType, hasNext, err = dec.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeDDL, messageType) + require.Equal(t, DDLTypeQuery, dec.msg.Type) + + _, err = dec.NextDDLEvent() + require.NoError(t, err) + + m, err = enc.EncodeDDLEvent(createTableDDLEvent) + require.NoError(t, err) + + err = dec.AddKeyValue(m.Key, m.Value) + require.NoError(t, err) + + messageType, hasNext, err = dec.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeDDL, messageType) require.Equal(t, DDLTypeCreate, dec.msg.Type) event, err := dec.NextDDLEvent() @@ -197,7 +281,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Len(t, event.TableInfo.Indices, 1) require.Len(t, event.TableInfo.Columns, 4) - addColumnDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` ADD COLUMN `nn` INT") m, err = enc.EncodeDDLEvent(addColumnDDLEvent) require.NoError(t, err) @@ -213,7 +296,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Len(t, event.TableInfo.Indices, 1) require.Len(t, event.TableInfo.Columns, 5) - dropColumnDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` DROP COLUMN `nn`") m, err = enc.EncodeDDLEvent(dropColumnDDLEvent) require.NoError(t, err) @@ -229,7 +311,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Len(t, event.TableInfo.Indices, 1) require.Len(t, event.TableInfo.Columns, 4) - changeColumnDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` CHANGE COLUMN `value` `value2` VARCHAR(512)") m, err = enc.EncodeDDLEvent(changeColumnDDLEvent) require.NoError(t, err) @@ -245,7 +326,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Len(t, event.TableInfo.Indices, 1) require.Len(t, event.TableInfo.Columns, 4) - modifyColumnDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` MODIFY COLUMN `value2` VARCHAR(512) FIRST") m, err = enc.EncodeDDLEvent(modifyColumnDDLEvent) require.NoError(t, err) @@ -258,11 +338,9 @@ func TestEncodeDDLSequence(t *testing.T) { event, err = dec.NextDDLEvent() require.NoError(t, err) - require.Equal(t, 1, len(event.TableInfo.Indices)) + require.Equal(t, 1, len(event.TableInfo.Indices), string(format), compressionType) require.Equal(t, 4, len(event.TableInfo.Columns)) - setDefaultDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` ALTER COLUMN `payload` SET DEFAULT _UTF8MB4'a'") - m, err = enc.EncodeDDLEvent(setDefaultDDLEvent) require.NoError(t, err) @@ -283,7 +361,6 @@ func TestEncodeDDLSequence(t *testing.T) { } } - dropDefaultDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` ALTER COLUMN `payload` DROP DEFAULT") m, err = enc.EncodeDDLEvent(dropDefaultDDLEvent) require.NoError(t, err) @@ -304,7 +381,6 @@ func TestEncodeDDLSequence(t *testing.T) { } } - autoIncrementDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` AUTO_INCREMENT = 5") m, err = enc.EncodeDDLEvent(autoIncrementDDLEvent) require.NoError(t, err) @@ -320,7 +396,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 1, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - modifyColumnNullDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` MODIFY COLUMN `a` INT NULL") m, err = enc.EncodeDDLEvent(modifyColumnNullDDLEvent) require.NoError(t, err) @@ -341,7 +416,6 @@ func TestEncodeDDLSequence(t *testing.T) { } } - modifyColumnNotNullDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` MODIFY COLUMN `a` INT NOT NULL") m, err = enc.EncodeDDLEvent(modifyColumnNotNullDDLEvent) require.NoError(t, err) @@ -362,7 +436,6 @@ func TestEncodeDDLSequence(t *testing.T) { } } - addIndexDDLEvent := helper.DDL2Event("CREATE INDEX `idx_a` ON `TBL1` (`a`)") m, err = enc.EncodeDDLEvent(addIndexDDLEvent) require.NoError(t, err) @@ -378,7 +451,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 2, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - renameIndexDDLEvent := helper.DDL2Event("ALTER TABLE `TBL1` RENAME INDEX `idx_a` TO `new_idx_a`") m, err = enc.EncodeDDLEvent(renameIndexDDLEvent) require.NoError(t, err) @@ -406,7 +478,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.True(t, hasNewIndex) require.True(t, noOldIndex) - indexVisibilityDDLEvent := helper.DDL2Event("ALTER TABLE TBL1 ALTER INDEX `new_idx_a` INVISIBLE") m, err = enc.EncodeDDLEvent(indexVisibilityDDLEvent) require.NoError(t, err) @@ -422,7 +493,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 2, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - dropIndexDDLEvent := helper.DDL2Event("DROP INDEX `new_idx_a` ON `TBL1`") m, err = enc.EncodeDDLEvent(dropIndexDDLEvent) require.NoError(t, err) @@ -438,7 +508,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 1, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - truncateTableDDLEvent := helper.DDL2Event("TRUNCATE TABLE TBL1") m, err = enc.EncodeDDLEvent(truncateTableDDLEvent) require.NoError(t, err) @@ -454,7 +523,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 1, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - multiSchemaChangeDDLEvent := helper.DDL2Event("ALTER TABLE TBL1 ADD COLUMN `new_col` INT, ADD INDEX `idx_new_col` (`a`)") m, err = enc.EncodeDDLEvent(multiSchemaChangeDDLEvent) require.NoError(t, err) @@ -470,7 +538,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 2, len(event.TableInfo.Indices)) require.Equal(t, 5, len(event.TableInfo.Columns)) - multiSchemaChangeDropDDLEvent := helper.DDL2Event("ALTER TABLE TBL1 DROP COLUMN `new_col`, DROP INDEX `idx_new_col`") m, err = enc.EncodeDDLEvent(multiSchemaChangeDropDDLEvent) require.NoError(t, err) @@ -486,7 +553,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 1, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - renameTableDDLEvent := helper.DDL2Event("RENAME TABLE TBL1 TO TBL2") m, err = enc.EncodeDDLEvent(renameTableDDLEvent) require.NoError(t, err) @@ -502,8 +568,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 1, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - helper.Tk().MustExec("set @@tidb_allow_remove_auto_inc = 1") - renameColumnDDLEvent := helper.DDL2Event("ALTER TABLE TBL2 CHANGE COLUMN `id` `id2` INT") m, err = enc.EncodeDDLEvent(renameColumnDDLEvent) require.NoError(t, err) @@ -519,8 +583,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 1, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - partitionTableDDLEvent := helper.DDL2Event( - "ALTER TABLE TBL2 PARTITION BY RANGE (id2) (PARTITION p0 VALUES LESS THAN (10), PARTITION p1 VALUES LESS THAN (20))") m, err = enc.EncodeDDLEvent(partitionTableDDLEvent) require.NoError(t, err) @@ -536,7 +598,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 1, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - addPartitionDDLEvent := helper.DDL2Event("ALTER TABLE TBL2 ADD PARTITION (PARTITION p2 VALUES LESS THAN (30))") m, err = enc.EncodeDDLEvent(addPartitionDDLEvent) require.NoError(t, err) @@ -552,7 +613,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 1, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - dropPartitionDDLEvent := helper.DDL2Event("ALTER TABLE TBL2 DROP PARTITION p2") m, err = enc.EncodeDDLEvent(dropPartitionDDLEvent) require.NoError(t, err) @@ -568,7 +628,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 1, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - truncatePartitionDDLevent := helper.DDL2Event("ALTER TABLE TBL2 TRUNCATE PARTITION p1") m, err = enc.EncodeDDLEvent(truncatePartitionDDLevent) require.NoError(t, err) @@ -584,8 +643,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 1, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - reorganizePartitionDDLEvent := helper.DDL2Event( - "ALTER TABLE TBL2 REORGANIZE PARTITION p1 INTO (PARTITION p3 VALUES LESS THAN (40))") m, err = enc.EncodeDDLEvent(reorganizePartitionDDLEvent) require.NoError(t, err) @@ -601,7 +658,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 1, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - removePartitionDDLEvent := helper.DDL2Event("ALTER TABLE TBL2 REMOVE PARTITIONING") m, err = enc.EncodeDDLEvent(removePartitionDDLEvent) require.NoError(t, err) @@ -617,7 +673,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 1, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - alterCharsetCollateDDLEvent := helper.DDL2Event("ALTER TABLE TBL2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin") m, err = enc.EncodeDDLEvent(alterCharsetCollateDDLEvent) require.NoError(t, err) @@ -633,7 +688,6 @@ func TestEncodeDDLSequence(t *testing.T) { require.Equal(t, 1, len(event.TableInfo.Indices)) require.Equal(t, 4, len(event.TableInfo.Columns)) - dropTableDDLEvent := helper.DDL2Event("DROP TABLE TBL2") m, err = enc.EncodeDDLEvent(dropTableDDLEvent) require.NoError(t, err) @@ -658,6 +712,16 @@ func TestEncodeDDLEvent(t *testing.T) { helper := entry.NewSchemaTestHelperWithReplicaConfig(t, replicaConfig) defer helper.Close() + createTableSQL := `create table test.t(id int primary key, name varchar(255) not null, gender enum('male', 'female'), email varchar(255) null, key idx_name_email(name, email))` + createTableDDLEvent := helper.DDL2Event(createTableSQL) + + insertEvent := helper.DML2Event(`insert into test.t values (1, "jack", "male", "jack@abc.com")`, "test", "t") + + renameTableDDLEvent := helper.DDL2Event(`rename table test.t to test.abc`) + + insertEvent2 := helper.DML2Event(`insert into test.abc values (2, "anna", "female", "anna@abc.com")`, "test", "abc") + helper.Tk().MustExec("drop table test.abc") + ctx := context.Background() codecConfig := common.NewConfig(config.ProtocolSimple) codecConfig.EnableRowChecksum = true @@ -679,10 +743,6 @@ func TestEncodeDDLEvent(t *testing.T) { dec, err := NewDecoder(ctx, codecConfig, nil) require.NoError(t, err) - createTableSQL := `create table test.t(id int primary key, - name varchar(255) not null, gender enum('male', 'female'), - email varchar(255) not null, key idx_name_email(name, email))` - createTableDDLEvent := helper.DDL2Event(createTableSQL) m, err := enc.EncodeDDLEvent(createTableDDLEvent) require.NoError(t, err) @@ -694,6 +754,7 @@ func TestEncodeDDLEvent(t *testing.T) { require.True(t, hasNext) require.Equal(t, model.MessageTypeDDL, messageType) require.NotEqual(t, 0, dec.msg.BuildTs) + require.True(t, dec.msg.TableSchema.Indexes[0].Nullable) columnSchemas := dec.msg.TableSchema.Columns sortedColumns := make([]*timodel.ColumnInfo, len(createTableDDLEvent.TableInfo.Columns)) @@ -725,7 +786,6 @@ func TestEncodeDDLEvent(t *testing.T) { createTableDDLEvent.TableInfo.TableName.Table, createTableDDLEvent.TableInfo.UpdateTS) require.NotNil(t, item) - insertEvent := helper.DML2Event(`insert into test.t values (1, "jack", "male", "jack@abc.com")`, "test", "t") err = enc.AppendRowChangedEvent(ctx, "", insertEvent, func() {}) require.NoError(t, err) @@ -748,7 +808,6 @@ func TestEncodeDDLEvent(t *testing.T) { require.Equal(t, decodedRow.TableInfo.GetTableName(), insertEvent.TableInfo.GetTableName()) require.Nil(t, decodedRow.PreColumns) - renameTableDDLEvent := helper.DDL2Event(`rename table test.t to test.abc`) m, err = enc.EncodeDDLEvent(renameTableDDLEvent) require.NoError(t, err) @@ -777,7 +836,6 @@ func TestEncodeDDLEvent(t *testing.T) { renameTableDDLEvent.TableInfo.TableName.Table, renameTableDDLEvent.TableInfo.UpdateTS) require.NotNil(t, item) - insertEvent2 := helper.DML2Event(`insert into test.abc values (2, "anna", "female", "anna@abc.com")`, "test", "abc") err = enc.AppendRowChangedEvent(context.Background(), "", insertEvent2, func() {}) require.NoError(t, err) @@ -799,8 +857,6 @@ func TestEncodeDDLEvent(t *testing.T) { require.Equal(t, insertEvent2.TableInfo.GetSchemaName(), decodedRow.TableInfo.GetSchemaName()) require.Equal(t, insertEvent2.TableInfo.GetTableName(), decodedRow.TableInfo.GetTableName()) require.Nil(t, decodedRow.PreColumns) - - helper.Tk().MustExec("drop table test.abc") } } } @@ -1041,10 +1097,11 @@ func TestEncodeBootstrapEvent(t *testing.T) { defer helper.Close() sql := `create table test.t( - id int primary key, + id int, name varchar(255) not null, age int, email varchar(255) not null, + primary key(id, name), key idx_name_email(name, email))` ddlEvent := helper.DDL2Event(sql) ddlEvent.IsBootstrap = true @@ -1092,7 +1149,7 @@ func TestEncodeBootstrapEvent(t *testing.T) { // Bootstrap event doesn't have query require.Equal(t, "", event.Query) require.Equal(t, len(ddlEvent.TableInfo.Columns), len(event.TableInfo.Columns)) - require.Equal(t, len(ddlEvent.TableInfo.Indices)+1, len(event.TableInfo.Indices)) + require.Equal(t, len(ddlEvent.TableInfo.Indices), len(event.TableInfo.Indices)) item := dec.memo.Read(ddlEvent.TableInfo.TableName.Schema, ddlEvent.TableInfo.TableName.Table, ddlEvent.TableInfo.UpdateTS) @@ -1254,18 +1311,30 @@ func TestDMLMessageTooLarge(t *testing.T) { _, insertEvent, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) codecConfig := common.NewConfig(config.ProtocolSimple) - codecConfig.MaxMessageBytes = 100 + codecConfig.MaxMessageBytes = 50 + for _, format := range []common.EncodingFormatType{ common.EncodingFormatAvro, common.EncodingFormatJSON, } { codecConfig.EncodingFormat = format - b, err := NewBuilder(context.Background(), codecConfig) - require.NoError(t, err) - enc := b.Build() - err = enc.AppendRowChangedEvent(context.Background(), "", insertEvent, func() {}) - require.ErrorIs(t, err, errors.ErrMessageTooLarge) + for _, handle := range []string{ + config.LargeMessageHandleOptionNone, + config.LargeMessageHandleOptionHandleKeyOnly, + config.LargeMessageHandleOptionClaimCheck, + } { + codecConfig.LargeMessageHandle.LargeMessageHandleOption = handle + if handle == config.LargeMessageHandleOptionClaimCheck { + codecConfig.LargeMessageHandle.ClaimCheckStorageURI = "file:///tmp/simple-claim-check" + } + b, err := NewBuilder(context.Background(), codecConfig) + require.NoError(t, err) + enc := b.Build() + + err = enc.AppendRowChangedEvent(context.Background(), "", insertEvent, func() {}) + require.ErrorIs(t, err, errors.ErrMessageTooLarge, string(format), handle) + } } } @@ -1275,6 +1344,16 @@ func TestLargerMessageHandleClaimCheck(t *testing.T) { ctx := context.Background() codecConfig := common.NewConfig(config.ProtocolSimple) codecConfig.LargeMessageHandle.LargeMessageHandleOption = config.LargeMessageHandleOptionClaimCheck + + codecConfig.LargeMessageHandle.ClaimCheckStorageURI = "unsupported:///" + b, err := NewBuilder(ctx, codecConfig) + require.Error(t, err) + require.Nil(t, b) + + badDec, err := NewDecoder(ctx, codecConfig, nil) + require.Error(t, err) + require.Nil(t, badDec) + codecConfig.LargeMessageHandle.ClaimCheckStorageURI = "file:///tmp/simple-claim-check" for _, format := range []common.EncodingFormatType{ common.EncodingFormatAvro, @@ -1289,7 +1368,7 @@ func TestLargerMessageHandleClaimCheck(t *testing.T) { codecConfig.MaxMessageBytes = config.DefaultMaxMessageBytes codecConfig.LargeMessageHandle.LargeMessageHandleCompression = compressionType - b, err := NewBuilder(ctx, codecConfig) + b, err = NewBuilder(ctx, codecConfig) require.NoError(t, err) enc := b.Build() @@ -1364,6 +1443,17 @@ func TestLargeMessageHandleKeyOnly(t *testing.T) { ctx := context.Background() codecConfig := common.NewConfig(config.ProtocolSimple) codecConfig.LargeMessageHandle.LargeMessageHandleOption = config.LargeMessageHandleOptionHandleKeyOnly + + badDec, err := NewDecoder(ctx, codecConfig, nil) + require.Error(t, err) + require.Nil(t, badDec) + + events := []*model.RowChangedEvent{ + insertEvent, + updateEvent, + deleteEvent, + } + for _, format := range []common.EncodingFormatType{ common.EncodingFormatJSON, common.EncodingFormatAvro, @@ -1384,27 +1474,9 @@ func TestLargeMessageHandleKeyOnly(t *testing.T) { dec, err := NewDecoder(ctx, codecConfig, db) require.NoError(t, err) - m, err := enc.EncodeDDLEvent(ddlEvent) - require.NoError(t, err) - - err = dec.AddKeyValue(m.Key, m.Value) - require.NoError(t, err) - - messageType, hasNext, err := dec.HasNext() - require.NoError(t, err) - require.True(t, hasNext) - require.Equal(t, model.MessageTypeDDL, messageType) - - _, err = dec.NextDDLEvent() - require.NoError(t, err) - enc.(*encoder).config.MaxMessageBytes = 500 dec.config.MaxMessageBytes = 500 - for _, event := range []*model.RowChangedEvent{ - insertEvent, - updateEvent, - deleteEvent, - } { + for _, event := range events { err = enc.AppendRowChangedEvent(ctx, "", event, func() {}) require.NoError(t, err) @@ -1460,6 +1532,25 @@ func TestLargeMessageHandleKeyOnly(t *testing.T) { } } + decodedRow, err := dec.NextRowChangedEvent() + require.NoError(t, err) + require.Nil(t, decodedRow) + } + + enc.(*encoder).config.MaxMessageBytes = config.DefaultMaxMessageBytes + dec.config.MaxMessageBytes = config.DefaultMaxMessageBytes + m, err := enc.EncodeDDLEvent(ddlEvent) + require.NoError(t, err) + + err = dec.AddKeyValue(m.Key, m.Value) + require.NoError(t, err) + + messageType, hasNext, err := dec.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeDDL, messageType) + + for _, event := range events { mock.ExpectQuery("SELECT @@global.time_zone"). WillReturnRows(mock.NewRows([]string{""}).AddRow("SYSTEM")) @@ -1476,8 +1567,14 @@ func TestLargeMessageHandleKeyOnly(t *testing.T) { mock.ExpectQuery("select * from test.t where t = 127"). WillReturnRows(mock.NewRows(names).AddRow(values...)) - decodedRow, err := dec.NextRowChangedEvent() - require.NoError(t, err) + } + _, err = dec.NextDDLEvent() + require.NoError(t, err) + + decodedRows := dec.GetCachedEvents() + for idx, decodedRow := range decodedRows { + event := events[idx] + require.Equal(t, decodedRow.CommitTs, event.CommitTs) require.Equal(t, decodedRow.TableInfo.GetSchemaName(), event.TableInfo.GetSchemaName()) require.Equal(t, decodedRow.TableInfo.GetTableName(), event.TableInfo.GetTableName()) @@ -1525,3 +1622,33 @@ func TestLargeMessageHandleKeyOnly(t *testing.T) { } } } + +func TestDecoder(t *testing.T) { + ctx := context.Background() + codecConfig := common.NewConfig(config.ProtocolSimple) + decoder, err := NewDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + require.NotNil(t, decoder) + + messageType, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.False(t, hasNext) + require.Equal(t, model.MessageTypeUnknown, messageType) + + ddl, err := decoder.NextDDLEvent() + require.ErrorIs(t, err, errors.ErrCodecDecode) + require.Nil(t, ddl) + + decoder.msg = new(message) + checkpoint, err := decoder.NextResolvedEvent() + require.ErrorIs(t, err, errors.ErrCodecDecode) + require.Equal(t, uint64(0), checkpoint) + + event, err := decoder.NextRowChangedEvent() + require.ErrorIs(t, err, errors.ErrCodecDecode) + require.Nil(t, event) + + decoder.value = []byte("invalid") + err = decoder.AddKeyValue(nil, nil) + require.ErrorIs(t, err, errors.ErrCodecDecode) +} diff --git a/pkg/sink/codec/simple/marshaller.go b/pkg/sink/codec/simple/marshaller.go index 999c58976b9..dcdc6306bb3 100644 --- a/pkg/sink/codec/simple/marshaller.go +++ b/pkg/sink/codec/simple/marshaller.go @@ -51,13 +51,8 @@ func newMarshaller(config *common.Config) (marshaller, error) { result = newJSONMarshaller(config) case common.EncodingFormatAvro: result, err = newAvroMarshaller(config, string(avroSchemaBytes)) - if err != nil { - return nil, errors.Trace(err) - } - default: - return nil, errors.New("unknown encoding format") } - return result, nil + return result, errors.Trace(err) } type jsonMarshaller struct { @@ -74,10 +69,7 @@ func newJSONMarshaller(config *common.Config) *jsonMarshaller { func (m *jsonMarshaller) MarshalCheckpoint(ts uint64) ([]byte, error) { msg := newResolvedMessage(ts) result, err := json.Marshal(msg) - if err != nil { - return nil, errors.WrapError(errors.ErrEncodeFailed, err) - } - return result, nil + return result, errors.WrapError(errors.ErrEncodeFailed, err) } // MarshalDDLEvent implement the marshaller interface @@ -89,10 +81,7 @@ func (m *jsonMarshaller) MarshalDDLEvent(event *model.DDLEvent) ([]byte, error) msg = newDDLMessage(event) } value, err := json.Marshal(msg) - if err != nil { - return nil, errors.WrapError(errors.ErrEncodeFailed, err) - } - return value, nil + return value, errors.WrapError(errors.ErrEncodeFailed, err) } // MarshalRowChangedEvent implement the marshaller interface @@ -102,10 +91,7 @@ func (m *jsonMarshaller) MarshalRowChangedEvent( ) ([]byte, error) { msg := m.newDMLMessage(event, handleKeyOnly, claimCheckFileName) value, err := json.Marshal(msg) - if err != nil { - return nil, errors.WrapError(errors.ErrEncodeFailed, err) - } - return value, nil + return value, errors.WrapError(errors.ErrEncodeFailed, err) } // Unmarshal implement the marshaller interface @@ -120,28 +106,17 @@ type avroMarshaller struct { func newAvroMarshaller(config *common.Config, schema string) (*avroMarshaller, error) { codec, err := goavro.NewCodec(schema) - if err != nil { - return nil, errors.Trace(err) - } return &avroMarshaller{ codec: codec, config: config, - }, nil -} - -// Marshal implement the marshaller interface -func (m *avroMarshaller) Marshal(v any) ([]byte, error) { - return m.codec.BinaryFromNative(nil, v) + }, errors.Trace(err) } // MarshalCheckpoint implement the marshaller interface func (m *avroMarshaller) MarshalCheckpoint(ts uint64) ([]byte, error) { msg := newResolvedMessageMap(ts) result, err := m.codec.BinaryFromNative(nil, msg) - if err != nil { - return nil, errors.WrapError(errors.ErrEncodeFailed, err) - } - return result, nil + return result, errors.WrapError(errors.ErrEncodeFailed, err) } // MarshalDDLEvent implement the marshaller interface @@ -153,10 +128,7 @@ func (m *avroMarshaller) MarshalDDLEvent(event *model.DDLEvent) ([]byte, error) msg = newDDLMessageMap(event) } value, err := m.codec.BinaryFromNative(nil, msg) - if err != nil { - return nil, errors.WrapError(errors.ErrEncodeFailed, err) - } - return value, nil + return value, errors.WrapError(errors.ErrEncodeFailed, err) } // MarshalRowChangedEvent implement the marshaller interface @@ -166,23 +138,13 @@ func (m *avroMarshaller) MarshalRowChangedEvent( ) ([]byte, error) { msg := m.newDMLMessageMap(event, handleKeyOnly, claimCheckFileName) value, err := m.codec.BinaryFromNative(nil, msg) - if err != nil { - return nil, errors.WrapError(errors.ErrEncodeFailed, err) - } - recycleMap(msg) - return value, nil + return value, errors.WrapError(errors.ErrEncodeFailed, err) } // Unmarshal implement the marshaller interface func (m *avroMarshaller) Unmarshal(data []byte, v any) error { native, _, err := m.codec.NativeFromBinary(data) - if err != nil { - return errors.Trace(err) - } - err = newMessageFromAvroNative(native, v.(*message)) - if err != nil { - return errors.Trace(err) - } - return nil + newMessageFromAvroNative(native, v.(*message)) + return errors.Trace(err) } diff --git a/pkg/sink/codec/simple/message.go b/pkg/sink/codec/simple/message.go index 332c87941a7..be112b1d3db 100644 --- a/pkg/sink/codec/simple/message.go +++ b/pkg/sink/codec/simple/message.go @@ -158,7 +158,7 @@ func newColumnSchema(col *timodel.ColumnInfo) *columnSchema { // newTiColumnInfo uses columnSchema and IndexSchema to construct a tidb column info. func newTiColumnInfo( column *columnSchema, colID int64, indexes []*IndexSchema, -) (*timodel.ColumnInfo, error) { +) *timodel.ColumnInfo { col := new(timodel.ColumnInfo) col.ID = colID col.Name = timodel.NewCIStr(column.Name) @@ -194,10 +194,6 @@ func newTiColumnInfo( default: } } - err := col.SetDefaultValue(defaultValue) - if err != nil { - return nil, cerror.WrapError(cerror.ErrDecodeFailed, err) - } for _, index := range indexes { if index.Primary { @@ -211,7 +207,11 @@ func newTiColumnInfo( } } - return col, nil + err := col.SetDefaultValue(defaultValue) + if err != nil { + log.Panic("set default value failed", zap.Any("column", col), zap.Any("default", defaultValue)) + } + return col } // IndexSchema is the schema of the index. @@ -233,7 +233,7 @@ func newIndexSchema(index *timodel.IndexInfo, columns []*timodel.ColumnInfo) *In for _, col := range index.Columns { indexSchema.Columns = append(indexSchema.Columns, col.Name.O) colInfo := columns[col.Offset] - // An index is not null when all columns of aer not null + // An index is not null when all columns of are not null if !mysql.HasNotNullFlag(colInfo.GetFlag()) { indexSchema.Nullable = true } @@ -316,7 +316,7 @@ func newTableSchema(tableInfo *model.TableInfo) *TableSchema { } // newTableInfo converts from TableSchema to TableInfo. -func newTableInfo(m *TableSchema) (*model.TableInfo, error) { +func newTableInfo(m *TableSchema) *model.TableInfo { var ( database string table string @@ -343,45 +343,32 @@ func newTableInfo(m *TableSchema) (*model.TableInfo, error) { TableID: tableID, }, TableInfo: tidbTableInfo, - }, nil + } } nextMockID := int64(100) for _, col := range m.Columns { - tiCol, err := newTiColumnInfo(col, nextMockID, m.Indexes) + tiCol := newTiColumnInfo(col, nextMockID, m.Indexes) nextMockID += 100 - if err != nil { - return nil, err - } tidbTableInfo.Columns = append(tidbTableInfo.Columns, tiCol) } for _, idx := range m.Indexes { index := newTiIndexInfo(idx) tidbTableInfo.Indices = append(tidbTableInfo.Indices, index) } - info := model.WrapTableInfo(100, database, schemaVersion, tidbTableInfo) - - return info, nil + return model.WrapTableInfo(100, database, schemaVersion, tidbTableInfo) } // newDDLEvent converts from message to DDLEvent. -func newDDLEvent(msg *message) (*model.DDLEvent, error) { +func newDDLEvent(msg *message) *model.DDLEvent { var ( tableInfo *model.TableInfo preTableInfo *model.TableInfo - err error ) - tableInfo, err = newTableInfo(msg.TableSchema) - if err != nil { - return nil, err - } - + tableInfo = newTableInfo(msg.TableSchema) if msg.PreTableSchema != nil { - preTableInfo, err = newTableInfo(msg.PreTableSchema) - if err != nil { - return nil, err - } + preTableInfo = newTableInfo(msg.PreTableSchema) } return &model.DDLEvent{ StartTs: msg.CommitTs, @@ -389,7 +376,7 @@ func newDDLEvent(msg *message) (*model.DDLEvent, error) { TableInfo: tableInfo, PreTableInfo: preTableInfo, Query: msg.SQL, - }, nil + } } // buildRowChangedEvent converts from message to RowChangedEvent. @@ -399,6 +386,7 @@ func buildRowChangedEvent( result := &model.RowChangedEvent{ CommitTs: msg.CommitTs, TableInfo: tableInfo, + Table: &tableInfo.TableName, } result.Columns = decodeColumns(msg.Data, tableInfo) @@ -476,17 +464,16 @@ func adjustTimestampValue(column *model.Column, flag types.FieldType) { if flag.GetType() != mysql.TypeTimestamp { return } - if column.Value == nil { - return - } - var ts string - switch v := column.Value.(type) { - case map[string]string: - ts = v["value"] - case map[string]interface{}: - ts = v["value"].(string) + if column.Value != nil { + var ts string + switch v := column.Value.(type) { + case map[string]string: + ts = v["value"] + case map[string]interface{}: + ts = v["value"].(string) + } + column.Value = ts } - column.Value = ts } func decodeColumns( @@ -624,10 +611,7 @@ func (a *jsonMarshaller) newDMLMessage( m.Type = DMLTypeUpdate m.Data = a.formatColumns(event.Columns, event.TableInfo.Columns, onlyHandleKey) m.Old = a.formatColumns(event.PreColumns, event.TableInfo.Columns, onlyHandleKey) - } else { - log.Panic("invalid event type, this should not hit", zap.Any("event", event)) } - if a.config.EnableRowChecksum && event.Checksum != nil { m.Checksum = &checksum{ Version: event.Checksum.Version, @@ -645,14 +629,13 @@ func (a *jsonMarshaller) formatColumns( ) map[string]interface{} { result := make(map[string]interface{}, len(columns)) for idx, col := range columns { - if col == nil { - continue - } - if onlyHandleKey && !col.Flag.IsHandleKey() { - continue + if col != nil { + if onlyHandleKey && !col.Flag.IsHandleKey() { + continue + } + value := encodeValue(col.Value, &colInfos[idx].FieldType, a.config.TimeZone.String()) + result[col.Name] = value } - value := encodeValue(col.Value, &colInfos[idx].FieldType, a.config.TimeZone.String()) - result[col.Name] = value } return result } @@ -697,7 +680,6 @@ func (a *avroMarshaller) encodeValue4Avro( default: log.Panic("unexpected type for avro value", zap.Any("value", value)) } - return value, "" } @@ -708,6 +690,7 @@ func encodeValue( return nil } + var err error switch ft.GetType() { case mysql.TypeBit: switch v := value.(type) { @@ -732,27 +715,26 @@ func encodeValue( switch v := value.(type) { case []uint8: data := string(v) - enum, err := tiTypes.ParseEnumName(ft.GetElems(), data, ft.GetCollate()) - if err != nil { - log.Panic("parse enum name failed", - zap.Any("elems", ft.GetElems()), zap.String("name", data), zap.Error(err)) - } - return enum.Value + var enum tiTypes.Enum + enum, err = tiTypes.ParseEnumName(ft.GetElems(), data, ft.GetCollate()) + value = enum.Value } case mysql.TypeSet: switch v := value.(type) { case []uint8: data := string(v) - set, err := tiTypes.ParseSetName(ft.GetElems(), data, ft.GetCollate()) - if err != nil { - log.Panic("parse set name failed", - zap.Any("elems", ft.GetElems()), zap.String("name", data), zap.Error(err)) - } - return set.Value + var set tiTypes.Set + set, err = tiTypes.ParseSetName(ft.GetElems(), data, ft.GetCollate()) + value = set.Value } default: } + if err != nil { + log.Panic("parse enum / set name failed", + zap.Any("elems", ft.GetElems()), zap.Any("name", value), zap.Error(err)) + } + var result string switch v := value.(type) { case int64: @@ -811,13 +793,6 @@ func decodeColumn(value interface{}, fieldType *types.FieldType) *model.Column { // json encoding, set is encoded as `string`, bit encoded as `string` case string: value, err = strconv.ParseUint(v, 10, 64) - if err != nil { - return nil - } - case []uint8: - value = common.MustBinaryLiteralToInt(v) - case uint64: - value = v case int64: value = uint64(v) } @@ -825,9 +800,6 @@ func decodeColumn(value interface{}, fieldType *types.FieldType) *model.Column { switch v := value.(type) { case string: value, err = strconv.ParseInt(v, 10, 64) - if err != nil { - return nil - } default: value = v } @@ -837,9 +809,6 @@ func decodeColumn(value interface{}, fieldType *types.FieldType) *model.Column { value, err = strconv.ParseInt(v, 10, 64) if err != nil { value, err = strconv.ParseUint(v, 10, 64) - if err != nil { - return nil - } } case map[string]interface{}: value = uint64(v["value"].(int64)) @@ -850,9 +819,6 @@ func decodeColumn(value interface{}, fieldType *types.FieldType) *model.Column { switch v := value.(type) { case string: value, err = strconv.ParseFloat(v, 32) - if err != nil { - return nil - } default: value = v } @@ -860,9 +826,6 @@ func decodeColumn(value interface{}, fieldType *types.FieldType) *model.Column { switch v := value.(type) { case string: value, err = strconv.ParseFloat(v, 64) - if err != nil { - return nil - } default: value = v } @@ -872,13 +835,14 @@ func decodeColumn(value interface{}, fieldType *types.FieldType) *model.Column { switch v := value.(type) { case string: value, err = strconv.ParseUint(v, 10, 64) - if err != nil { - return nil - } } default: } + if err != nil { + return nil + } + result.Value = value return result } diff --git a/tests/integration_tests/kafka_simple_claim_check/conf/changefeed.toml b/tests/integration_tests/kafka_simple_claim_check/conf/changefeed.toml index 296d4b7522c..526f41a8532 100644 --- a/tests/integration_tests/kafka_simple_claim_check/conf/changefeed.toml +++ b/tests/integration_tests/kafka_simple_claim_check/conf/changefeed.toml @@ -1,3 +1,7 @@ +[integrity] +integrity-check-level = "correctness" +corruption-handle-level = "error" + [sink.kafka-config.large-message-handle] large-message-handle-compression = "snappy" large-message-handle-option = "claim-check" diff --git a/tests/integration_tests/kafka_simple_claim_check/run.sh b/tests/integration_tests/kafka_simple_claim_check/run.sh index ae75482d0c2..4fc1f089d18 100644 --- a/tests/integration_tests/kafka_simple_claim_check/run.sh +++ b/tests/integration_tests/kafka_simple_claim_check/run.sh @@ -19,6 +19,10 @@ function run() { start_tidb_cluster --workdir $WORK_DIR cd $WORK_DIR + + # upstream tidb cluster enable row level checksum + run_sql "set global tidb_enable_row_level_checksum=true" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY TOPIC_NAME="kafka-simple-claim-check-$RANDOM" diff --git a/tests/integration_tests/kafka_simple_claim_check_avro/conf/changefeed.toml b/tests/integration_tests/kafka_simple_claim_check_avro/conf/changefeed.toml index a7fdbec764c..3d9314b55bd 100644 --- a/tests/integration_tests/kafka_simple_claim_check_avro/conf/changefeed.toml +++ b/tests/integration_tests/kafka_simple_claim_check_avro/conf/changefeed.toml @@ -1,3 +1,7 @@ +[integrity] +integrity-check-level = "correctness" +corruption-handle-level = "error" + [sink.kafka-config.large-message-handle] large-message-handle-compression = "snappy" large-message-handle-option = "claim-check" diff --git a/tests/integration_tests/kafka_simple_claim_check_avro/run.sh b/tests/integration_tests/kafka_simple_claim_check_avro/run.sh index 3bbba9a4ba2..8466b83188e 100644 --- a/tests/integration_tests/kafka_simple_claim_check_avro/run.sh +++ b/tests/integration_tests/kafka_simple_claim_check_avro/run.sh @@ -19,6 +19,10 @@ function run() { start_tidb_cluster --workdir $WORK_DIR cd $WORK_DIR + + # upstream tidb cluster enable row level checksum + run_sql "set global tidb_enable_row_level_checksum=true" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY TOPIC_NAME="kafka-simple-claim-check-avro-$RANDOM" diff --git a/tests/integration_tests/kafka_simple_handle_key_only/conf/changefeed.toml b/tests/integration_tests/kafka_simple_handle_key_only/conf/changefeed.toml index 409c304b39e..318a83de162 100644 --- a/tests/integration_tests/kafka_simple_handle_key_only/conf/changefeed.toml +++ b/tests/integration_tests/kafka_simple_handle_key_only/conf/changefeed.toml @@ -1,3 +1,7 @@ +[integrity] +integrity-check-level = "correctness" +corruption-handle-level = "error" + [sink] send-bootstrap-interval-in-sec = 0 send-bootstrap-in-msg-count = 0 diff --git a/tests/integration_tests/kafka_simple_handle_key_only/run.sh b/tests/integration_tests/kafka_simple_handle_key_only/run.sh index 5b8d6492145..5d15bb9df68 100644 --- a/tests/integration_tests/kafka_simple_handle_key_only/run.sh +++ b/tests/integration_tests/kafka_simple_handle_key_only/run.sh @@ -19,6 +19,10 @@ function run() { start_tidb_cluster --workdir $WORK_DIR cd $WORK_DIR + + # upstream tidb cluster enable row level checksum + run_sql "set global tidb_enable_row_level_checksum=true" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY TOPIC_NAME="simple-handle-key-only-$RANDOM" diff --git a/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/changefeed.toml b/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/changefeed.toml index 409c304b39e..318a83de162 100644 --- a/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/changefeed.toml +++ b/tests/integration_tests/kafka_simple_handle_key_only_avro/conf/changefeed.toml @@ -1,3 +1,7 @@ +[integrity] +integrity-check-level = "correctness" +corruption-handle-level = "error" + [sink] send-bootstrap-interval-in-sec = 0 send-bootstrap-in-msg-count = 0 diff --git a/tests/integration_tests/kafka_simple_handle_key_only_avro/run.sh b/tests/integration_tests/kafka_simple_handle_key_only_avro/run.sh index c3c0e40f42f..ec475d821bc 100644 --- a/tests/integration_tests/kafka_simple_handle_key_only_avro/run.sh +++ b/tests/integration_tests/kafka_simple_handle_key_only_avro/run.sh @@ -19,6 +19,10 @@ function run() { start_tidb_cluster --workdir $WORK_DIR cd $WORK_DIR + + # upstream tidb cluster enable row level checksum + run_sql "set global tidb_enable_row_level_checksum=true" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY TOPIC_NAME="simple-handle-key-only-avro-$RANDOM"