From 93c8afa5e628c016f16b712c9775d0a7798ce267 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 22 Nov 2023 01:02:42 -0600 Subject: [PATCH] This is an automated cherry-pick of #10123 Signed-off-by: ti-chi-bot --- cdc/sink/codec/canal/canal_encoder_test.go | 115 ++- cdc/sink/codec/canal/canal_entry.go | 55 +- cdc/sink/codec/canal/canal_entry_test.go | 82 +- .../codec/canal/canal_json_decoder_test.go | 18 +- cdc/sink/codec/canal/canal_json_encoder.go | 57 ++ cdc/sink/codec/canal/canal_json_message.go | 51 + cdc/sink/codec/canal/canal_test_util.go | 398 +++++++- cdc/sinkv2/eventsink/mq/mq_dml_sink_test.go | 23 +- cdc/sinkv2/eventsink/mq/worker_test.go | 135 ++- .../canal_json_row_event_encoder_test.go | 769 ++++++++++++++ .../canal_json_txn_event_encoder_test.go | 145 +++ pkg/sink/codec/canal/type_test.go | 960 ++++++++++++++++++ pkg/sink/codec/utils/field_types.go | 82 ++ 13 files changed, 2820 insertions(+), 70 deletions(-) create mode 100644 pkg/sink/codec/canal/canal_json_row_event_encoder_test.go create mode 100644 pkg/sink/codec/canal/canal_json_txn_event_encoder_test.go create mode 100644 pkg/sink/codec/canal/type_test.go create mode 100644 pkg/sink/codec/utils/field_types.go diff --git a/cdc/sink/codec/canal/canal_encoder_test.go b/cdc/sink/codec/canal/canal_encoder_test.go index b07cee9eeb0..8f6c885a630 100644 --- a/cdc/sink/codec/canal/canal_encoder_test.go +++ b/cdc/sink/codec/canal/canal_encoder_test.go @@ -19,6 +19,10 @@ import ( "github.com/golang/protobuf/proto" "github.com/pingcap/tidb/parser/mysql" +<<<<<<< HEAD:cdc/sink/codec/canal/canal_encoder_test.go +======= + "github.com/pingcap/tiflow/cdc/entry" +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_encoder_test.go "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/codec/common" "github.com/pingcap/tiflow/pkg/config" @@ -26,14 +30,87 @@ import ( "github.com/stretchr/testify/require" ) +var ( + rowCases = [][]*model.RowChangedEvent{ + {{ + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + Columns: []*model.Column{{ + Name: "col1", + Type: mysql.TypeVarchar, + Value: []byte("aa"), + }}, + }}, + { + { + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + Columns: []*model.Column{{ + Name: "col1", + Type: mysql.TypeVarchar, + Value: []byte("aa"), + }}, + }, + { + CommitTs: 2, + Table: &model.TableName{Schema: "test", Table: "t"}, + Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}}, + }, + }, + } + + ddlCases = [][]*model.DDLEvent{ + {{ + CommitTs: 1, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "a", Table: "b", + }, + }, + Query: "create table a", + Type: 1, + }}, + { + { + CommitTs: 2, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "a", Table: "b", + }, + }, + Query: "create table b", + Type: 3, + }, + { + CommitTs: 3, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "a", Table: "b", + }, + }, + Query: "create table c", + Type: 3, + }, + }, + } +) + func TestCanalBatchEncoder(t *testing.T) { - t.Parallel() - s := defaultCanalBatchTester - for _, cs := range s.rowCases { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(10) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + for _, cs := range rowCases { encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal)) for _, row := range cs { + _, _, colInfo := tableInfo.GetRowColInfos() + row.TableInfo = tableInfo + row.ColInfos = colInfo err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil) - require.Nil(t, err) + require.NoError(t, err) } res := encoder.Build() @@ -41,7 +118,6 @@ func TestCanalBatchEncoder(t *testing.T) { require.Nil(t, res) continue } - require.Len(t, res, 1) require.Nil(t, res[0].Key) require.Equal(t, len(cs), res[0].GetRowsCount()) @@ -56,33 +132,36 @@ func TestCanalBatchEncoder(t *testing.T) { require.Equal(t, len(cs), len(messages.GetMessages())) } - for _, cs := range s.ddlCases { + for _, cs := range ddlCases { encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal)) for _, ddl := range cs { msg, err := encoder.EncodeDDLEvent(ddl) - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, msg) require.Nil(t, msg.Key) packet := &canal.Packet{} err = proto.Unmarshal(msg.Value, packet) - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, canal.PacketType_MESSAGES, packet.GetType()) messages := &canal.Messages{} err = proto.Unmarshal(packet.GetBody(), messages) - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, 1, len(messages.GetMessages())) - require.Nil(t, err) + require.NoError(t, err) } } } func TestCanalAppendRowChangedEventWithCallback(t *testing.T) { - encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal)) - require.NotNil(t, encoder) + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() - count := 0 + sql := `create table test.t(a varchar(10) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfo := tableInfo.GetRowColInfos() row := &model.RowChangedEvent{ CommitTs: 1, Table: &model.TableName{Schema: "a", Table: "b"}, @@ -91,8 +170,18 @@ func TestCanalAppendRowChangedEventWithCallback(t *testing.T) { Type: mysql.TypeVarchar, Value: []byte("aa"), }}, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_encoder_test.go +======= + TableInfo: tableInfo, + ColInfos: colInfo, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_encoder_test.go } + encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal)) + require.NotNil(t, encoder) + + count := 0 + tests := []struct { row *model.RowChangedEvent callback func() diff --git a/cdc/sink/codec/canal/canal_entry.go b/cdc/sink/codec/canal/canal_entry.go index e99762f2495..63bc9a7528d 100644 --- a/cdc/sink/codec/canal/canal_entry.go +++ b/cdc/sink/codec/canal/canal_entry.go @@ -18,16 +18,25 @@ import ( "math" "reflect" "strconv" - "strings" "github.com/golang/protobuf/proto" // nolint:staticcheck "github.com/pingcap/errors" mm "github.com/pingcap/tidb/parser/model" + timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" +<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry.go "github.com/pingcap/tidb/types" +======= +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry.go "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/codec/internal" cerror "github.com/pingcap/tiflow/pkg/errors" +<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry.go +======= + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "github.com/pingcap/tiflow/pkg/sink/codec/internal" + "github.com/pingcap/tiflow/pkg/sink/codec/utils" +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry.go canal "github.com/pingcap/tiflow/proto/canal" "golang.org/x/text/encoding" "golang.org/x/text/encoding/charmap" @@ -78,7 +87,7 @@ func (b *canalEntryBuilder) buildHeader(commitTs uint64, schema string, table st // see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java#L276-L1147 // all value will be represented in string type // see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L760-L855 -func (b *canalEntryBuilder) formatValue(value interface{}, javaType internal.JavaSQLType) (result string, err error) { +func (b *canalEntryBuilder) formatValue(value interface{}, isBinary bool) (result string, err error) { // value would be nil, if no value insert for the column. if value == nil { return "", nil @@ -96,20 +105,15 @@ func (b *canalEntryBuilder) formatValue(value interface{}, javaType internal.Jav case string: result = v case []byte: - // JavaSQLTypeVARCHAR / JavaSQLTypeCHAR / JavaSQLTypeBLOB / JavaSQLTypeCLOB / - // special handle for text and blob // see https://github.com/alibaba/canal/blob/9f6021cf36f78cc8ac853dcf37a1769f359b868b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L801 - switch javaType { - // for normal text - case internal.JavaSQLTypeVARCHAR, internal.JavaSQLTypeCHAR, internal.JavaSQLTypeCLOB: - result = string(v) - default: - // JavaSQLTypeBLOB + if isBinary { decoded, err := b.bytesDecoder.Bytes(v) if err != nil { return "", err } result = string(decoded) + } else { + result = string(v) } default: result = fmt.Sprintf("%v", v) @@ -119,21 +123,27 @@ func (b *canalEntryBuilder) formatValue(value interface{}, javaType internal.Jav // build the Column in the canal RowData // see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L756-L872 +<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry.go func (b *canalEntryBuilder) buildColumn(c *model.Column, colName string, updated bool) (*canal.Column, error) { mysqlType := getMySQLType(c) javaType, err := getJavaSQLType(c, mysqlType) +======= +func (b *canalEntryBuilder) buildColumn(c *model.Column, columnInfo *timodel.ColumnInfo, updated bool) (*canal.Column, error) { + mysqlType := utils.GetMySQLType(columnInfo, b.config.ContentCompatible) + javaType, err := getJavaSQLType(c.Value, c.Type, c.Flag) +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry.go if err != nil { return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } - value, err := b.formatValue(c.Value, javaType) + value, err := b.formatValue(c.Value, c.Flag.IsBinary()) if err != nil { return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } canalColumn := &canal.Column{ SqlType: int32(javaType), - Name: colName, + Name: c.Name, IsKey: c.Flag.IsPrimaryKey(), Updated: updated, IsNullPresent: &canal.Column_IsNull{IsNull: c.Value == nil}, @@ -150,7 +160,16 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKey if column == nil { continue } +<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry.go c, err := b.buildColumn(column, column.Name, !e.IsDelete()) +======= + columnInfo, ok := e.TableInfo.GetColumnInfo(e.ColInfos[idx].ID) + if !ok { + return nil, cerror.ErrCanalEncodeFailed.GenWithStack( + "column info not found for column id: %d", e.ColInfos[idx].ID) + } + c, err := b.buildColumn(column, columnInfo, !e.IsDelete()) +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry.go if err != nil { return nil, errors.Trace(err) } @@ -166,7 +185,16 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKey if onlyHandleKeyColumns && !column.Flag.IsHandleKey() { continue } +<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry.go c, err := b.buildColumn(column, column.Name, !e.IsDelete()) +======= + columnInfo, ok := e.TableInfo.GetColumnInfo(e.ColInfos[idx].ID) + if !ok { + return nil, cerror.ErrCanalEncodeFailed.GenWithStack( + "column info not found for column id: %d", e.ColInfos[idx].ID) + } + c, err := b.buildColumn(column, columnInfo, !e.IsDelete()) +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry.go if err != nil { return nil, errors.Trace(err) } @@ -373,6 +401,7 @@ func getJavaSQLType(c *model.Column, mysqlType string) (result internal.JavaSQLT return javaType, nil } +<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry.go // when encoding the canal format, for unsigned mysql type, add `unsigned` keyword. // it should have the form `t unsigned`, such as `int unsigned` @@ -407,3 +436,5 @@ func getMySQLType(c *model.Column) string { return mysqlType } +======= +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry.go diff --git a/cdc/sink/codec/canal/canal_entry_test.go b/cdc/sink/codec/canal/canal_entry_test.go index 14ec07087cd..eac4e190484 100644 --- a/cdc/sink/codec/canal/canal_entry_test.go +++ b/cdc/sink/codec/canal/canal_entry_test.go @@ -19,13 +19,22 @@ import ( "github.com/golang/protobuf/proto" mm "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" +<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry_test.go "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/codec/internal" +======= + "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "github.com/pingcap/tiflow/pkg/sink/codec/internal" +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry_test.go canal "github.com/pingcap/tiflow/proto/canal" "github.com/stretchr/testify/require" "golang.org/x/text/encoding/charmap" ) +<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry_test.go func TestGetMySQLTypeAndJavaSQLType(t *testing.T) { t.Parallel() canalEntryBuilder := newCanalEntryBuilder() @@ -55,18 +64,43 @@ func TestConvertEntry(t *testing.T) { func testInsert(t *testing.T) { testCaseInsert := &model.RowChangedEvent{ +======= +func TestInsert(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t( + id int primary key, + name varchar(32), + tiny tinyint unsigned, + comment text, + bb blob)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + event := &model.RowChangedEvent{ +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry_test.go CommitTs: 417318403368288260, Table: &model.TableName{ Schema: "cdc", Table: "person", }, + TableInfo: tableInfo, Columns: []*model.Column{ {Name: "id", Type: mysql.TypeLong, Flag: model.PrimaryKeyFlag, Value: 1}, {Name: "name", Type: mysql.TypeVarchar, Value: "Bob"}, {Name: "tiny", Type: mysql.TypeTiny, Value: 255}, {Name: "comment", Type: mysql.TypeBlob, Value: []byte("测试")}, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry_test.go {Name: "blob", Type: mysql.TypeBlob, Value: []byte("测试blob"), Flag: model.BinaryFlag}, }, +======= + {Name: "bb", Type: mysql.TypeBlob, Value: []byte("测试blob"), Flag: model.BinaryFlag}, + }, + ColInfos: colInfos, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry_test.go } builder := newCanalEntryBuilder() @@ -117,7 +151,7 @@ func testInsert(t *testing.T) { require.Nil(t, err) require.Equal(t, "测试", col.GetValue()) require.Equal(t, "text", col.GetMysqlType()) - case "blob": + case "bb": require.Equal(t, int32(internal.JavaSQLTypeBLOB), col.GetSqlType()) require.False(t, col.GetIsKey()) require.False(t, col.GetIsNull()) @@ -129,13 +163,28 @@ func testInsert(t *testing.T) { } } +<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry_test.go func testUpdate(t *testing.T) { testCaseUpdate := &model.RowChangedEvent{ +======= +func TestUpdate(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(id int primary key, name varchar(32))` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + event := &model.RowChangedEvent{ +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry_test.go CommitTs: 417318403368288260, Table: &model.TableName{ - Schema: "cdc", - Table: "person", + Schema: "test", + Table: "t", }, + TableInfo: tableInfo, Columns: []*model.Column{ {Name: "id", Type: mysql.TypeLong, Flag: model.PrimaryKeyFlag, Value: 1}, {Name: "name", Type: mysql.TypeVarchar, Value: "Bob"}, @@ -144,6 +193,10 @@ func testUpdate(t *testing.T) { {Name: "id", Type: mysql.TypeLong, Flag: model.PrimaryKeyFlag, Value: 2}, {Name: "name", Type: mysql.TypeVarchar, Value: "Nancy"}, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry_test.go +======= + ColInfos: colInfos, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry_test.go } builder := newCanalEntryBuilder() entry, err := builder.fromRowEvent(testCaseUpdate, false) @@ -206,16 +259,35 @@ func testUpdate(t *testing.T) { } } +<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry_test.go func testDelete(t *testing.T) { testCaseDelete := &model.RowChangedEvent{ +======= +func TestDelete(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(id int primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + event := &model.RowChangedEvent{ +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry_test.go CommitTs: 417318403368288260, Table: &model.TableName{ - Schema: "cdc", - Table: "person", + Schema: "test", + Table: "t", }, + TableInfo: tableInfo, PreColumns: []*model.Column{ {Name: "id", Type: mysql.TypeLong, Flag: model.PrimaryKeyFlag, Value: 1}, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry_test.go +======= + ColInfos: colInfos, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry_test.go } builder := newCanalEntryBuilder() diff --git a/cdc/sink/codec/canal/canal_json_decoder_test.go b/cdc/sink/codec/canal/canal_json_decoder_test.go index a6646a10f36..384cd3ba3f7 100644 --- a/cdc/sink/codec/canal/canal_json_decoder_test.go +++ b/cdc/sink/codec/canal/canal_json_decoder_test.go @@ -24,8 +24,7 @@ import ( ) func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { - t.Parallel() - + insertEvent, _, _ := newLargeEvent4Test(t) ctx := context.Background() expectedDecodedValue := collectExpectedDecodedValue(testColumnsTable) for _, encodeEnable := range []bool{false, true} { @@ -36,8 +35,17 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { }) require.NotNil(t, encoder) +<<<<<<< HEAD:cdc/sink/codec/canal/canal_json_decoder_test.go err := encoder.AppendRowChangedEvent(context.Background(), "", testCaseInsert, nil) require.Nil(t, err) +======= + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil) + require.NoError(t, err) +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_json_decoder_test.go messages := encoder.Build() require.Equal(t, 1, len(messages)) @@ -59,9 +67,9 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { consumed, err := decoder.NextRowChangedEvent() require.Nil(t, err) - require.Equal(t, testCaseInsert.Table, consumed.Table) + require.Equal(t, insertEvent.Table, consumed.Table) if encodeEnable && decodeEnable { - require.Equal(t, testCaseInsert.CommitTs, consumed.CommitTs) + require.Equal(t, insertEvent.CommitTs, consumed.CommitTs) } else { require.Equal(t, uint64(0), consumed.CommitTs) } @@ -71,7 +79,7 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { require.True(t, ok) require.Equal(t, expected, col.Value) - for _, item := range testCaseInsert.Columns { + for _, item := range insertEvent.Columns { if item.Name == col.Name { require.Equal(t, item.Type, col.Type) } diff --git a/cdc/sink/codec/canal/canal_json_encoder.go b/cdc/sink/codec/canal/canal_json_encoder.go index dc8782579c4..ac448e26f33 100644 --- a/cdc/sink/codec/canal/canal_json_encoder.go +++ b/cdc/sink/codec/canal/canal_json_encoder.go @@ -26,6 +26,13 @@ import ( "github.com/pingcap/tiflow/cdc/sink/codec/common" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" +<<<<<<< HEAD:cdc/sink/codec/canal/canal_json_encoder.go +======= + "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "github.com/pingcap/tiflow/pkg/sink/codec/utils" + "github.com/pingcap/tiflow/pkg/sink/kafka/claimcheck" +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_json_row_event_encoder.go "go.uber.org/zap" ) @@ -59,7 +66,16 @@ func newJSONBatchEncoder(config *common.Config) codec.EventBatchEncoder { } func fillColumns( +<<<<<<< HEAD:cdc/sink/codec/canal/canal_json_encoder.go columns []*model.Column, out *jwriter.Writer, onlyHandleKeyColumns bool, builder *canalEntryBuilder, +======= + columns []*model.Column, + onlyOutputUpdatedColumn bool, + onlyHandleKeyColumn bool, + newColumnMap map[string]*model.Column, + out *jwriter.Writer, + builder *canalEntryBuilder, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_json_row_event_encoder.go ) error { if len(columns) == 0 { out.RawString("null") @@ -78,12 +94,16 @@ func fillColumns( } else { out.RawByte(',') } +<<<<<<< HEAD:cdc/sink/codec/canal/canal_json_encoder.go mysqlType := getMySQLType(col) javaType, err := getJavaSQLType(col, mysqlType) if err != nil { return cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } value, err := builder.formatValue(col.Value, javaType) +======= + value, err := builder.formatValue(col.Value, col.Flag.IsBinary()) +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_json_row_event_encoder.go if err != nil { return cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } @@ -112,6 +132,10 @@ func newJSONMessageForDML( onlyHandleKey = true } +<<<<<<< HEAD:cdc/sink/codec/canal/canal_json_encoder.go +======= + mysqlTypeMap := make(map[string]string, len(e.Columns)) +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_json_row_event_encoder.go out := &jwriter.Writer{} out.RawByte('{') { @@ -198,7 +222,16 @@ func newJSONMessageForDML( out.String(col.Name) out.RawByte(':') out.Int32(int32(javaType)) +<<<<<<< HEAD:cdc/sink/codec/canal/canal_json_encoder.go mysqlTypeMap[col.Name] = mysqlType +======= + columnInfo, ok := e.TableInfo.GetColumnInfo(e.ColInfos[idx].ID) + if !ok { + return nil, cerror.ErrCanalEncodeFailed.GenWithStack( + "cannot found the column info by the column ID: %d", e.ColInfos[idx].ID) + } + mysqlTypeMap[col.Name] = utils.GetMySQLType(columnInfo, config.ContentCompatible) +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_json_row_event_encoder.go } } if emptyColumn { @@ -232,22 +265,46 @@ func newJSONMessageForDML( if e.IsDelete() { out.RawString(",\"old\":null") out.RawString(",\"data\":") +<<<<<<< HEAD:cdc/sink/codec/canal/canal_json_encoder.go if err := fillColumns(e.PreColumns, out, onlyHandleKey, builder); err != nil { +======= + if err := fillColumns( + e.PreColumns, false, onlyHandleKey, nil, out, builder, + ); err != nil { +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_json_row_event_encoder.go return nil, err } } else if e.IsInsert() { out.RawString(",\"old\":null") out.RawString(",\"data\":") +<<<<<<< HEAD:cdc/sink/codec/canal/canal_json_encoder.go if err := fillColumns(e.Columns, out, onlyHandleKey, builder); err != nil { +======= + if err := fillColumns( + e.Columns, false, onlyHandleKey, nil, out, builder, + ); err != nil { +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_json_row_event_encoder.go return nil, err } } else if e.IsUpdate() { out.RawString(",\"old\":") +<<<<<<< HEAD:cdc/sink/codec/canal/canal_json_encoder.go if err := fillColumns(e.PreColumns, out, onlyHandleKey, builder); err != nil { return nil, err } out.RawString(",\"data\":") if err := fillColumns(e.Columns, out, onlyHandleKey, builder); err != nil { +======= + if err := fillColumns( + e.PreColumns, config.OnlyOutputUpdatedColumns, onlyHandleKey, newColsMap, out, builder, + ); err != nil { + return nil, err + } + out.RawString(",\"data\":") + if err := fillColumns( + e.Columns, false, onlyHandleKey, nil, out, builder, + ); err != nil { +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_json_row_event_encoder.go return nil, err } } else { diff --git a/cdc/sink/codec/canal/canal_json_message.go b/cdc/sink/codec/canal/canal_json_message.go index 61a326a96ac..09925dfde3d 100644 --- a/cdc/sink/codec/canal/canal_json_message.go +++ b/cdc/sink/codec/canal/canal_json_message.go @@ -18,10 +18,15 @@ import ( "strings" timodel "github.com/pingcap/tidb/parser/model" +<<<<<<< HEAD:cdc/sink/codec/canal/canal_json_message.go "github.com/pingcap/tidb/parser/types" +======= + "github.com/pingcap/tidb/parser/mysql" +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_json_message.go "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/codec/internal" cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec/utils" canal "github.com/pingcap/tiflow/proto/canal" ) @@ -212,6 +217,52 @@ func canalJSONColumnMap2RowChangeColumns(cols map[string]interface{}, mysqlType return result, nil } +<<<<<<< HEAD:cdc/sink/codec/canal/canal_json_message.go +======= +func canalJSONFormatColumn(value interface{}, name string, mysqlTypeStr string) *model.Column { + mysqlType := utils.ExtractBasicMySQLType(mysqlTypeStr) + result := &model.Column{ + Type: mysqlType, + Name: name, + Value: value, + } + if result.Value == nil { + return result + } + + data, ok := value.(string) + if !ok { + log.Panic("canal-json encoded message should have type in `string`") + } + + if mysqlType == mysql.TypeBit || mysqlType == mysql.TypeSet { + val, err := strconv.ParseUint(data, 10, 64) + if err != nil { + log.Panic("invalid column value for bit", zap.Any("col", result), zap.Error(err)) + } + result.Value = val + return result + } + + var err error + if isBinaryMySQLType(mysqlTypeStr) { + // when encoding the `JavaSQLTypeBLOB`, use `ISO8859_1` decoder, now reverse it back. + encoder := charmap.ISO8859_1.NewEncoder() + value, err = encoder.String(data) + if err != nil { + log.Panic("invalid column value, please report a bug", zap.Any("col", result), zap.Error(err)) + } + } + + result.Value = value + return result +} + +func isBinaryMySQLType(mysqlType string) bool { + return strings.Contains(mysqlType, "blob") || strings.Contains(mysqlType, "binary") +} + +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_json_message.go func canalJSONMessage2DDLEvent(msg canalJSONMessageInterface) *model.DDLEvent { result := new(model.DDLEvent) // we lost the startTs from kafka message diff --git a/cdc/sink/codec/canal/canal_test_util.go b/cdc/sink/codec/canal/canal_test_util.go index 578653e0685..2714a6cf29b 100644 --- a/cdc/sink/codec/canal/canal_test_util.go +++ b/cdc/sink/codec/canal/canal_test_util.go @@ -14,8 +14,11 @@ package canal import ( + "testing" + mm "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/codec/internal" ) @@ -24,6 +27,14 @@ type testColumnTuple struct { column *model.Column expectedMySQLType string expectedJavaSQLType internal.JavaSQLType +======= + "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/model" +) + +type testColumnTuple struct { + column *model.Column +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go // expectedEncodedValue is expected by encoding expectedEncodedValue string @@ -35,171 +46,273 @@ type testColumnTuple struct { var ( testColumnsTable = []*testColumnTuple{ { +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go &model.Column{Name: "tinyint", Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Type: mysql.TypeTiny, Value: int64(127)}, "tinyint", internal.JavaSQLTypeTINYINT, "127", "127", +======= + &model.Column{Name: "t", Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Type: mysql.TypeTiny, Value: int64(127)}, + "127", "127", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "tinyint unsigned", Type: mysql.TypeTiny, Value: uint64(127), + Name: "tu1", Type: mysql.TypeTiny, Value: uint64(127), Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "tinyint unsigned", internal.JavaSQLTypeTINYINT, "127", "127", +======= + "127", "127", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "tinyint unsigned 2", Type: mysql.TypeTiny, Value: uint64(128), + Name: "tu2", Type: mysql.TypeTiny, Value: uint64(128), Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "tinyint unsigned", internal.JavaSQLTypeSMALLINT, "128", "128", +======= + "128", "128", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "tinyint unsigned 3", Type: mysql.TypeTiny, Value: "0", + Name: "tu3", Type: mysql.TypeTiny, Value: "0", Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "tinyint unsigned", internal.JavaSQLTypeTINYINT, "0", "0", +======= + "0", "0", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "tinyint unsigned 4", Type: mysql.TypeTiny, Value: nil, + Name: "tu4", Type: mysql.TypeTiny, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "tinyint unsigned", internal.JavaSQLTypeTINYINT, "", nil, }, { &model.Column{Name: "smallint", Type: mysql.TypeShort, Value: int64(32767)}, "smallint", internal.JavaSQLTypeSMALLINT, "32767", "32767", +======= + "", nil, + }, + + { + &model.Column{Name: "s", Type: mysql.TypeShort, Value: int64(32767)}, + "32767", "32767", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "smallint unsigned", Type: mysql.TypeShort, Value: uint64(32767), + Name: "su1", Type: mysql.TypeShort, Value: uint64(32767), Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "smallint unsigned", internal.JavaSQLTypeSMALLINT, "32767", "32767", +======= + "32767", "32767", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "smallint unsigned 2", Type: mysql.TypeShort, Value: uint64(32768), + Name: "su2", Type: mysql.TypeShort, Value: uint64(32768), Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "smallint unsigned", internal.JavaSQLTypeINTEGER, "32768", "32768", +======= + "32768", "32768", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "smallint unsigned 3", Type: mysql.TypeShort, Value: "0", + Name: "su3", Type: mysql.TypeShort, Value: "0", Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "smallint unsigned", internal.JavaSQLTypeSMALLINT, "0", "0", +======= + "0", "0", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "smallint unsigned 4", Type: mysql.TypeShort, Value: nil, + Name: "su4", Type: mysql.TypeShort, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "smallint unsigned", internal.JavaSQLTypeSMALLINT, "", nil, }, { &model.Column{Name: "mediumint", Type: mysql.TypeInt24, Value: int64(8388607)}, "mediumint", internal.JavaSQLTypeINTEGER, "8388607", "8388607", +======= + "", nil, + }, + + { + &model.Column{Name: "m", Type: mysql.TypeInt24, Value: int64(8388607)}, + "8388607", "8388607", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "mediumint unsigned", Type: mysql.TypeInt24, Value: uint64(8388607), + Name: "mu1", Type: mysql.TypeInt24, Value: uint64(8388607), Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "mediumint unsigned", internal.JavaSQLTypeINTEGER, "8388607", "8388607", +======= + "8388607", "8388607", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "mediumint unsigned 2", Type: mysql.TypeInt24, Value: uint64(8388608), + Name: "mu2", Type: mysql.TypeInt24, Value: uint64(8388608), Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "mediumint unsigned", internal.JavaSQLTypeINTEGER, "8388608", "8388608", +======= + "8388608", "8388608", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "mediumint unsigned 3", Type: mysql.TypeInt24, Value: "0", + Name: "mu3", Type: mysql.TypeInt24, Value: "0", Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "mediumint unsigned", internal.JavaSQLTypeINTEGER, "0", "0", +======= + "0", "0", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "mediumint unsigned 4", Type: mysql.TypeInt24, Value: nil, + Name: "mu4", Type: mysql.TypeInt24, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "mediumint unsigned", internal.JavaSQLTypeINTEGER, "", nil, }, { &model.Column{Name: "int", Type: mysql.TypeLong, Value: int64(2147483647)}, "int", internal.JavaSQLTypeINTEGER, "2147483647", "2147483647", +======= + "", nil, + }, + + { + &model.Column{Name: "i", Type: mysql.TypeLong, Value: int64(2147483647)}, + "2147483647", "2147483647", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "int unsigned", Type: mysql.TypeLong, Value: uint64(2147483647), + Name: "iu1", Type: mysql.TypeLong, Value: uint64(2147483647), Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "int unsigned", internal.JavaSQLTypeINTEGER, "2147483647", "2147483647", +======= + "2147483647", "2147483647", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "int unsigned 2", Type: mysql.TypeLong, Value: uint64(2147483648), + Name: "iu2", Type: mysql.TypeLong, Value: uint64(2147483648), Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "int unsigned", internal.JavaSQLTypeBIGINT, "2147483648", "2147483648", +======= + "2147483648", "2147483648", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "int unsigned 3", Type: mysql.TypeLong, Value: "0", + Name: "iu3", Type: mysql.TypeLong, Value: "0", Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "int unsigned", internal.JavaSQLTypeINTEGER, "0", "0", +======= + "0", "0", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "int unsigned 4", Type: mysql.TypeLong, Value: nil, + Name: "iu4", Type: mysql.TypeLong, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "int unsigned", internal.JavaSQLTypeINTEGER, "", nil, }, { &model.Column{Name: "bigint", Type: mysql.TypeLonglong, Value: int64(9223372036854775807)}, "bigint", internal.JavaSQLTypeBIGINT, "9223372036854775807", "9223372036854775807", +======= + "", nil, + }, + + { + &model.Column{Name: "bi", Type: mysql.TypeLonglong, Value: int64(9223372036854775807)}, + "9223372036854775807", "9223372036854775807", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "bigint unsigned", Type: mysql.TypeLonglong, Value: uint64(9223372036854775807), + Name: "biu1", Type: mysql.TypeLonglong, Value: uint64(9223372036854775807), Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "bigint unsigned", internal.JavaSQLTypeBIGINT, "9223372036854775807", "9223372036854775807", +======= + "9223372036854775807", "9223372036854775807", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "bigint unsigned 2", Type: mysql.TypeLonglong, Value: uint64(9223372036854775808), + Name: "biu2", Type: mysql.TypeLonglong, Value: uint64(9223372036854775808), Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "bigint unsigned", internal.JavaSQLTypeDECIMAL, "9223372036854775808", "9223372036854775808", +======= + "9223372036854775808", "9223372036854775808", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "bigint unsigned 3", Type: mysql.TypeLonglong, Value: "0", + Name: "biu3", Type: mysql.TypeLonglong, Value: "0", Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "bigint unsigned", internal.JavaSQLTypeBIGINT, "0", "0", +======= + "0", "0", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "bigint unsigned 4", Type: mysql.TypeLonglong, Value: nil, + Name: "biu4", Type: mysql.TypeLonglong, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "bigint unsigned", internal.JavaSQLTypeBIGINT, "", nil, }, @@ -214,6 +327,22 @@ var ( { &model.Column{Name: "decimal", Type: mysql.TypeNewDecimal, Value: "2333"}, "decimal", internal.JavaSQLTypeDECIMAL, "2333", "2333", +======= + "", nil, + }, + + { + &model.Column{Name: "floatT", Type: mysql.TypeFloat, Value: 3.14}, + "3.14", "3.14", + }, + { + &model.Column{Name: "doubleT", Type: mysql.TypeDouble, Value: 2.71}, + "2.71", "2.71", + }, + { + &model.Column{Name: "decimalT", Type: mysql.TypeNewDecimal, Value: "2333"}, + "2333", "2333", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { @@ -221,52 +350,87 @@ var ( Name: "float unsigned", Type: mysql.TypeFloat, Value: 3.14, Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "float unsigned", internal.JavaSQLTypeREAL, "3.14", "3.14", +======= + "3.14", "3.14", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ Name: "double unsigned", Type: mysql.TypeDouble, Value: 2.71, Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "double unsigned", internal.JavaSQLTypeDOUBLE, "2.71", "2.71", +======= + "2.71", "2.71", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ Name: "decimal unsigned", Type: mysql.TypeNewDecimal, Value: "2333", Flag: model.UnsignedFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "decimal unsigned", internal.JavaSQLTypeDECIMAL, "2333", "2333", +======= + "2333", "2333", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, // for column value type in `[]uint8` and have `BinaryFlag`, expectedEncodedValue is dummy. { +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go &model.Column{Name: "varchar", Type: mysql.TypeVarchar, Value: []uint8("测试Varchar")}, "varchar", internal.JavaSQLTypeVARCHAR, "测试Varchar", "测试Varchar", }, { &model.Column{Name: "char", Type: mysql.TypeString, Value: []uint8("测试String")}, "char", internal.JavaSQLTypeCHAR, "测试String", "测试String", +======= + &model.Column{Name: "varcharT", Type: mysql.TypeVarchar, Value: []uint8("测试Varchar")}, + "测试Varchar", "测试Varchar", + }, + { + &model.Column{Name: "charT", Type: mysql.TypeString, Value: []uint8("测试String")}, + "测试String", "测试String", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "binary", Type: mysql.TypeString, Value: []uint8("测试Binary"), + Name: "binaryT", Type: mysql.TypeString, Value: []uint8("测试Binary"), Flag: model.BinaryFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "binary", internal.JavaSQLTypeBLOB, "测试Binary", "测试Binary", +======= + "测试Binary", "测试Binary", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "varbinary", Type: mysql.TypeVarchar, Value: []uint8("测试varbinary"), + Name: "varbinaryT", Type: mysql.TypeVarchar, Value: []uint8("测试varbinary"), Flag: model.BinaryFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "varbinary", internal.JavaSQLTypeBLOB, "测试varbinary", "测试varbinary", }, { &model.Column{Name: "tinytext", Type: mysql.TypeTinyBlob, Value: []uint8("测试Tinytext")}, "tinytext", internal.JavaSQLTypeCLOB, "测试Tinytext", "测试Tinytext", +======= + "测试varbinary", "测试varbinary", }, + { + &model.Column{Name: "tinytextT", Type: mysql.TypeTinyBlob, Value: []uint8("测试Tinytext")}, + "测试Tinytext", "测试Tinytext", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go + }, + { +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go &model.Column{Name: "text", Type: mysql.TypeBlob, Value: []uint8("测试text")}, "text", internal.JavaSQLTypeCLOB, "测试text", "测试text", }, @@ -280,34 +444,59 @@ var ( { &model.Column{Name: "longtext", Type: mysql.TypeLongBlob, Value: []uint8("测试longtext")}, "longtext", internal.JavaSQLTypeCLOB, "测试longtext", "测试longtext", +======= + &model.Column{Name: "textT", Type: mysql.TypeBlob, Value: []uint8("测试text")}, + "测试text", "测试text", + }, + { + &model.Column{Name: "mediumtextT", Type: mysql.TypeMediumBlob, Value: []uint8("测试mediumtext")}, + "测试mediumtext", "测试mediumtext", + }, + { + &model.Column{Name: "longtextT", Type: mysql.TypeLongBlob, Value: []uint8("测试longtext")}, + "测试longtext", "测试longtext", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "tinyblob", Type: mysql.TypeTinyBlob, Value: []uint8("测试tinyblob"), + Name: "tinyblobT", Type: mysql.TypeTinyBlob, Value: []uint8("测试tinyblob"), Flag: model.BinaryFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "tinyblob", internal.JavaSQLTypeBLOB, "测试tinyblob", "测试tinyblob", +======= + "测试tinyblob", "测试tinyblob", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "blob", Type: mysql.TypeBlob, Value: []uint8("测试blob"), + Name: "blobT", Type: mysql.TypeBlob, Value: []uint8("测试blob"), Flag: model.BinaryFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "blob", internal.JavaSQLTypeBLOB, "测试blob", "测试blob", +======= + "测试blob", "测试blob", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "mediumblob", Type: mysql.TypeMediumBlob, Value: []uint8("测试mediumblob"), + Name: "mediumblobT", Type: mysql.TypeMediumBlob, Value: []uint8("测试mediumblob"), Flag: model.BinaryFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "mediumblob", internal.JavaSQLTypeBLOB, "测试mediumblob", "测试mediumblob", +======= + "测试mediumblob", "测试mediumblob", +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "longblob", Type: mysql.TypeLongBlob, Value: []uint8("测试longblob"), + Name: "longblobT", Type: mysql.TypeLongBlob, Value: []uint8("测试longblob"), Flag: model.BinaryFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "longblob", internal.JavaSQLTypeBLOB, "测试longblob", "测试longblob", }, @@ -339,19 +528,57 @@ var ( { &model.Column{Name: "set", Type: mysql.TypeSet, Value: uint64(3)}, "set", internal.JavaSQLTypeBIT, "3", uint64(3), +======= + "测试longblob", "测试longblob", + }, + + { + &model.Column{Name: "dateT", Type: mysql.TypeDate, Value: "2020-02-20"}, + "2020-02-20", "2020-02-20", + }, + { + &model.Column{Name: "datetimeT", Type: mysql.TypeDatetime, Value: "2020-02-20 02:20:20"}, + "2020-02-20 02:20:20", "2020-02-20 02:20:20", + }, + { + &model.Column{Name: "timestampT", Type: mysql.TypeTimestamp, Value: "2020-02-20 10:20:20"}, + "2020-02-20 10:20:20", "2020-02-20 10:20:20", + }, + { + &model.Column{Name: "timeT", Type: mysql.TypeDuration, Value: "02:20:20"}, + "02:20:20", "02:20:20", + }, + { + &model.Column{Name: "yearT", Type: mysql.TypeYear, Value: "2020", Flag: model.UnsignedFlag}, + "2020", "2020", + }, + + { + &model.Column{Name: "enumT", Type: mysql.TypeEnum, Value: uint64(1)}, + "1", "1", + }, + { + &model.Column{Name: "setT", Type: mysql.TypeSet, Value: uint64(2)}, + "2", uint64(2), +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "bit", Type: mysql.TypeBit, Value: uint64(65), + Name: "bitT", Type: mysql.TypeBit, Value: uint64(65), Flag: model.UnsignedFlag | model.BinaryFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "bit", internal.JavaSQLTypeBIT, "65", uint64(65), +======= + "65", uint64(65), +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go }, { &model.Column{ - Name: "json", Type: mysql.TypeJSON, Value: "{\"key1\": \"value1\"}", + Name: "jsonT", Type: mysql.TypeJSON, Value: "{\"key1\": \"value1\"}", Flag: model.BinaryFlag, }, +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go "json", internal.JavaSQLTypeVARCHAR, "{\"key1\": \"value1\"}", "{\"key1\": \"value1\"}", }, } @@ -455,6 +682,12 @@ var ( PreColumns: testColumns, } +======= + "{\"key1\": \"value1\"}", "{\"key1\": \"value1\"}", + }, + } + +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go testCaseDDL = &model.DDLEvent{ CommitTs: 417318403368288260, TableInfo: &model.TableInfo{ @@ -468,11 +701,19 @@ var ( ) func collectAllColumns(groups []*testColumnTuple) []*model.Column { +<<<<<<< HEAD:cdc/sink/codec/canal/canal_test_util.go result := make([]*model.Column, 0, len(groups)) for _, item := range groups { result = append(result, item.column) } return result +======= + columns := make([]*model.Column, 0, len(groups)) + for _, item := range groups { + columns = append(columns, item.column) + } + return columns +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_test_util.go } func collectExpectedDecodedValue(columns []*testColumnTuple) map[string]interface{} { @@ -482,3 +723,104 @@ func collectExpectedDecodedValue(columns []*testColumnTuple) map[string]interfac } return result } + +func newLargeEvent4Test(t *testing.T) (*model.RowChangedEvent, *model.RowChangedEvent, *model.RowChangedEvent) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t( + t tinyint primary key, + tu1 tinyint unsigned, + tu2 tinyint unsigned, + tu3 tinyint unsigned, + tu4 tinyint unsigned, + s smallint, + su1 smallint unsigned, + su2 smallint unsigned, + su3 smallint unsigned, + su4 smallint unsigned, + m mediumint, + mu1 mediumint unsigned, + mu2 mediumint unsigned, + mu3 mediumint unsigned, + mu4 mediumint unsigned, + i int, + iu1 int unsigned, + iu2 int unsigned, + iu3 int unsigned, + iu4 int unsigned, + bi bigint, + biu1 bigint unsigned, + biu2 bigint unsigned, + biu3 bigint unsigned, + biu4 bigint unsigned, + floatT float, + doubleT double, + decimalT decimal, + floatTu float unsigned, + doubleTu double unsigned, + decimalTu decimal unsigned, + varcharT varchar(255), + charT char, + binaryT binary, + varbinaryT varbinary(255), + tinytextT tinytext, + textT text, + mediumtextT mediumtext, + longtextT longtext, + tinyblobT tinyblob, + blobT blob, + mediumblobT mediumblob, + longblobT longblob, + dateT date, + datetimeT datetime, + timestampT timestamp, + timeT time, + yearT year, + enumT enum('a', 'b', 'c'), + setT set('a', 'b', 'c'), + bitT bit(4), + jsonT json)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfo := tableInfo.GetRowColInfos() + + testColumns := collectAllColumns(testColumnsTable) + + insert := &model.RowChangedEvent{ + CommitTs: 417318403368288260, + Table: &model.TableName{ + Schema: "test", + Table: "t", + }, + TableInfo: tableInfo, + Columns: testColumns, + PreColumns: nil, + ColInfos: colInfo, + } + + update := &model.RowChangedEvent{ + CommitTs: 417318403368288260, + Table: &model.TableName{ + Schema: "cdc", + Table: "person", + }, + TableInfo: tableInfo, + Columns: testColumns, + PreColumns: testColumns, + ColInfos: colInfo, + } + + deleteE := &model.RowChangedEvent{ + CommitTs: 417318403368288260, + Table: &model.TableName{ + Schema: "cdc", + Table: "person", + }, + TableInfo: tableInfo, + Columns: nil, + PreColumns: testColumns, + ColInfos: colInfo, + } + return insert, update, deleteE +} diff --git a/cdc/sinkv2/eventsink/mq/mq_dml_sink_test.go b/cdc/sinkv2/eventsink/mq/mq_dml_sink_test.go index c828258a559..6507a29f59f 100644 --- a/cdc/sinkv2/eventsink/mq/mq_dml_sink_test.go +++ b/cdc/sinkv2/eventsink/mq/mq_dml_sink_test.go @@ -20,7 +20,12 @@ import ( "testing" "time" +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/mq_dml_sink_test.go "github.com/Shopify/sarama" +======= + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tiflow/cdc/entry" +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/mq_dml_sink_test.go "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink/mq/dmlproducer" @@ -76,8 +81,6 @@ func TestNewKafkaDMLSinkFailed(t *testing.T) { } func TestWriteEvents(t *testing.T) { - t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -99,11 +102,27 @@ func TestWriteEvents(t *testing.T) { require.Nil(t, err) require.NotNil(t, s) + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfo := tableInfo.GetRowColInfos() + tableStatus := state.TableSinkSinking row := &model.RowChangedEvent{ +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/mq_dml_sink_test.go CommitTs: 1, Table: &model.TableName{Schema: "a", Table: "b"}, Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, +======= + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}}, + ColInfos: colInfo, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/mq_dml_sink_test.go } events := make([]*eventsink.CallbackableEvent[*model.SingleTableTxn], 0, 3000) diff --git a/cdc/sinkv2/eventsink/mq/worker_test.go b/cdc/sinkv2/eventsink/mq/worker_test.go index adb4e2cbfd3..244a4f8a9d1 100644 --- a/cdc/sinkv2/eventsink/mq/worker_test.go +++ b/cdc/sinkv2/eventsink/mq/worker_test.go @@ -19,6 +19,11 @@ import ( "testing" "time" +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/worker_test.go +======= + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tiflow/cdc/entry" +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/worker_test.go "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/codec/builder" "github.com/pingcap/tiflow/cdc/sink/codec/common" @@ -59,7 +64,13 @@ func newNonBatchEncodeWorker(ctx context.Context, t *testing.T) (*worker, dmlpro } func TestNonBatchEncode_SendMessages(t *testing.T) { - t.Parallel() + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfo := tableInfo.GetRowColInfos() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -72,9 +83,17 @@ func TestNonBatchEncode_SendMessages(t *testing.T) { Partition: 1, } row := &model.RowChangedEvent{ +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/worker_test.go CommitTs: 1, Table: &model.TableName{Schema: "a", Table: "b"}, Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, +======= + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}}, + ColInfos: colInfo, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/worker_test.go } tableStatus := state.TableSinkSinking @@ -266,7 +285,13 @@ func TestBatchEncode_Group(t *testing.T) { } func TestBatchEncode_GroupWhenTableStopping(t *testing.T) { - t.Parallel() + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfo := tableInfo.GetRowColInfos() key1 := mqv1.TopicPartitionKey{ Topic: "test", @@ -286,9 +311,11 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) { { rowEvent: &eventsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, + ColInfos: colInfo, }, Callback: func() {}, SinkState: &replicatingStatus, @@ -333,9 +360,13 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) { } func TestBatchEncode_SendMessages(t *testing.T) { +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/worker_test.go t.Parallel() key1 := mqv1.TopicPartitionKey{ +======= + key1 := TopicPartitionKey{ +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/worker_test.go Topic: "test", Partition: 1, } @@ -353,13 +384,30 @@ func TestBatchEncode_SendMessages(t *testing.T) { defer cancel() worker, p := newBatchEncodeWorker(ctx, t) defer worker.close() + + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfo := tableInfo.GetRowColInfos() + events := []mqEvent{ { rowEvent: &eventsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/worker_test.go CommitTs: 1, Table: &model.TableName{Schema: "a", Table: "b"}, Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, +======= + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "aa"}}, + ColInfos: colInfo, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/worker_test.go }, Callback: func() {}, SinkState: &tableStatus, @@ -369,9 +417,17 @@ func TestBatchEncode_SendMessages(t *testing.T) { { rowEvent: &eventsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/worker_test.go CommitTs: 2, Table: &model.TableName{Schema: "a", Table: "b"}, Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}}, +======= + CommitTs: 2, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}}, + ColInfos: colInfo, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/worker_test.go }, Callback: func() {}, SinkState: &tableStatus, @@ -381,9 +437,17 @@ func TestBatchEncode_SendMessages(t *testing.T) { { rowEvent: &eventsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/worker_test.go CommitTs: 3, Table: &model.TableName{Schema: "a", Table: "b"}, Columns: []*model.Column{{Name: "col1", Type: 1, Value: "cc"}}, +======= + CommitTs: 3, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "cc"}}, + ColInfos: colInfo, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/worker_test.go }, Callback: func() {}, SinkState: &tableStatus, @@ -393,9 +457,17 @@ func TestBatchEncode_SendMessages(t *testing.T) { { rowEvent: &eventsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/worker_test.go CommitTs: 2, Table: &model.TableName{Schema: "aa", Table: "bb"}, Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}}, +======= + CommitTs: 2, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}}, + ColInfos: colInfo, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/worker_test.go }, Callback: func() {}, SinkState: &tableStatus, @@ -405,9 +477,17 @@ func TestBatchEncode_SendMessages(t *testing.T) { { rowEvent: &eventsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/worker_test.go CommitTs: 2, Table: &model.TableName{Schema: "aaa", Table: "bbb"}, Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}}, +======= + CommitTs: 2, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}}, + ColInfos: colInfo, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/worker_test.go }, Callback: func() {}, SinkState: &tableStatus, @@ -417,9 +497,17 @@ func TestBatchEncode_SendMessages(t *testing.T) { { rowEvent: &eventsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/worker_test.go CommitTs: 3, Table: &model.TableName{Schema: "aaa", Table: "bbb"}, Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}}, +======= + CommitTs: 3, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}}, + ColInfos: colInfo, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/worker_test.go }, Callback: func() {}, SinkState: &tableStatus, @@ -477,9 +565,13 @@ func TestBatchEncodeWorker_Abort(t *testing.T) { } func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) { +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/worker_test.go t.Parallel() key1 := mqv1.TopicPartitionKey{ +======= + key1 := TopicPartitionKey{ +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/worker_test.go Topic: "test", Partition: 1, } @@ -493,13 +585,30 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) { defer worker.close() replicatingStatus := state.TableSinkSinking stoppedStatus := state.TableSinkStopping + + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfo := tableInfo.GetRowColInfos() + events := []mqEvent{ { rowEvent: &eventsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/worker_test.go CommitTs: 1, Table: &model.TableName{Schema: "a", Table: "b"}, Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, +======= + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "aa"}}, + ColInfos: colInfo, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/worker_test.go }, Callback: func() {}, SinkState: &replicatingStatus, @@ -509,9 +618,17 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) { { rowEvent: &eventsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/worker_test.go CommitTs: 2, Table: &model.TableName{Schema: "a", Table: "b"}, Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}}, +======= + CommitTs: 2, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}}, + ColInfos: colInfo, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/worker_test.go }, Callback: func() {}, SinkState: &replicatingStatus, @@ -521,9 +638,17 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) { { rowEvent: &eventsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ +<<<<<<< HEAD:cdc/sinkv2/eventsink/mq/worker_test.go CommitTs: 3, Table: &model.TableName{Schema: "a", Table: "b"}, Columns: []*model.Column{{Name: "col1", Type: 1, Value: "cc"}}, +======= + CommitTs: 3, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}}, + ColInfos: colInfo, +>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):cdc/sink/dmlsink/mq/worker_test.go }, Callback: func() {}, SinkState: &stoppedStatus, diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go new file mode 100644 index 00000000000..ef48961636c --- /dev/null +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -0,0 +1,769 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package canal + +import ( + "context" + "database/sql" + "encoding/json" + "testing" + + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/compression" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "github.com/stretchr/testify/require" + "golang.org/x/text/encoding/charmap" +) + +func TestBuildCanalJSONRowEventEncoder(t *testing.T) { + t.Parallel() + cfg := common.NewConfig(config.ProtocolCanalJSON) + + builder, err := NewJSONRowEventEncoderBuilder(context.Background(), cfg) + require.NoError(t, err) + encoder, ok := builder.Build().(*JSONRowEventEncoder) + require.True(t, ok) + require.NotNil(t, encoder.config) +} + +func TestNewCanalJSONMessage4DML(t *testing.T) { + ctx := context.Background() + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + + encoder, ok := builder.Build().(*JSONRowEventEncoder) + require.True(t, ok) + + insertEvent, updateEvent, deleteEvent := newLargeEvent4Test(t) + data, err := newJSONMessageForDML(encoder.builder, insertEvent, encoder.config, false, "") + require.NoError(t, err) + + var msg canalJSONMessageInterface = &JSONMessage{} + err = json.Unmarshal(data, msg) + require.NoError(t, err) + + jsonMsg, ok := msg.(*JSONMessage) + require.True(t, ok) + require.NotNil(t, jsonMsg.Data) + require.Nil(t, jsonMsg.Old) + require.Equal(t, "INSERT", jsonMsg.EventType) + require.Equal(t, convertToCanalTs(insertEvent.CommitTs), jsonMsg.ExecutionTime) + require.Equal(t, "test", jsonMsg.Schema) + require.Equal(t, "t", jsonMsg.Table) + require.False(t, jsonMsg.IsDDL) + + for _, col := range insertEvent.Columns { + require.Contains(t, jsonMsg.Data[0], col.Name) + require.Contains(t, jsonMsg.SQLType, col.Name) + require.Contains(t, jsonMsg.MySQLType, col.Name) + } + + // check data is enough + obtainedDataMap := jsonMsg.getData() + require.NotNil(t, obtainedDataMap) + + for _, item := range testColumnsTable { + obtainedValue, ok := obtainedDataMap[item.column.Name] + require.True(t, ok) + if !item.column.Flag.IsBinary() { + require.Equal(t, item.expectedEncodedValue, obtainedValue) + continue + } + + // for `Column.Value` is nil, which mean's it is nullable, set the value to `""` + if obtainedValue == nil { + require.Equal(t, "", item.expectedEncodedValue) + continue + } + + if bytes, ok := item.column.Value.([]byte); ok { + expectedValue, err := charmap.ISO8859_1.NewDecoder().Bytes(bytes) + require.NoError(t, err) + require.Equal(t, string(expectedValue), obtainedValue) + continue + } + + require.Equal(t, item.expectedEncodedValue, obtainedValue) + } + + data, err = newJSONMessageForDML(encoder.builder, updateEvent, encoder.config, false, "") + require.NoError(t, err) + + jsonMsg = &JSONMessage{} + err = json.Unmarshal(data, jsonMsg) + require.NoError(t, err) + + require.NotNil(t, jsonMsg.Data) + require.NotNil(t, jsonMsg.Old) + require.Equal(t, "UPDATE", jsonMsg.EventType) + + for _, col := range updateEvent.Columns { + require.Contains(t, jsonMsg.Data[0], col.Name) + require.Contains(t, jsonMsg.SQLType, col.Name) + require.Contains(t, jsonMsg.MySQLType, col.Name) + } + for _, col := range updateEvent.PreColumns { + require.Contains(t, jsonMsg.Old[0], col.Name) + } + + data, err = newJSONMessageForDML(encoder.builder, deleteEvent, encoder.config, false, "") + require.NoError(t, err) + + jsonMsg = &JSONMessage{} + err = json.Unmarshal(data, jsonMsg) + require.NoError(t, err) + require.NotNil(t, jsonMsg.Data) + require.Nil(t, jsonMsg.Old) + require.Equal(t, "DELETE", jsonMsg.EventType) + + for _, col := range deleteEvent.PreColumns { + require.Contains(t, jsonMsg.Data[0], col.Name) + } + + codecConfig = &common.Config{DeleteOnlyHandleKeyColumns: true} + data, err = newJSONMessageForDML(encoder.builder, deleteEvent, codecConfig, false, "") + require.NoError(t, err) + + jsonMsg = &JSONMessage{} + err = json.Unmarshal(data, jsonMsg) + require.NoError(t, err) + require.NotNil(t, jsonMsg.Data) + require.Nil(t, jsonMsg.Old) + + for _, col := range deleteEvent.PreColumns { + if col.Flag.IsHandleKey() { + require.Contains(t, jsonMsg.Data[0], col.Name) + require.Contains(t, jsonMsg.SQLType, col.Name) + require.Contains(t, jsonMsg.MySQLType, col.Name) + } else { + require.NotContains(t, jsonMsg.Data[0], col.Name) + require.NotContains(t, jsonMsg.SQLType, col.Name) + require.NotContains(t, jsonMsg.MySQLType, col.Name) + } + } + + codecConfig = common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = true + codecConfig.OnlyOutputUpdatedColumns = true + + builder, err = NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + + encoder, ok = builder.Build().(*JSONRowEventEncoder) + require.True(t, ok) + data, err = newJSONMessageForDML(encoder.builder, updateEvent, encoder.config, false, "") + require.NoError(t, err) + + withExtension := &canalJSONMessageWithTiDBExtension{} + err = json.Unmarshal(data, withExtension) + require.NoError(t, err) + + require.NotNil(t, withExtension.Extensions) + require.Equal(t, updateEvent.CommitTs, withExtension.Extensions.CommitTs) + + encoder, ok = builder.Build().(*JSONRowEventEncoder) + require.True(t, ok) + data, err = newJSONMessageForDML(encoder.builder, updateEvent, encoder.config, false, "") + require.NoError(t, err) + + withExtension = &canalJSONMessageWithTiDBExtension{} + err = json.Unmarshal(data, withExtension) + require.NoError(t, err) + require.Equal(t, 0, len(withExtension.JSONMessage.Old[0])) + + require.NotNil(t, withExtension.Extensions) + require.Equal(t, updateEvent.CommitTs, withExtension.Extensions.CommitTs) +} + +func TestCanalJSONCompressionE2E(t *testing.T) { + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = true + codecConfig.LargeMessageHandle.LargeMessageHandleCompression = compression.LZ4 + + ctx := context.Background() + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + insertEvent, _, _ := newLargeEvent4Test(t) + + // encode normal row changed event + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {}) + require.NoError(t, err) + + message := encoder.Build()[0] + + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + + messageType, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, messageType, model.MessageTypeRow) + + decodedEvent, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.Equal(t, decodedEvent.CommitTs, insertEvent.CommitTs) + require.Equal(t, decodedEvent.Table.Schema, insertEvent.Table.Schema) + require.Equal(t, decodedEvent.Table.Table, insertEvent.Table.Table) + + // encode DDL event + message, err = encoder.EncodeDDLEvent(testCaseDDL) + require.NoError(t, err) + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + + messageType, hasNext, err = decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, messageType, model.MessageTypeDDL) + + decodedDDL, err := decoder.NextDDLEvent() + require.NoError(t, err) + + require.Equal(t, decodedDDL.Query, testCaseDDL.Query) + require.Equal(t, decodedDDL.CommitTs, testCaseDDL.CommitTs) + require.Equal(t, decodedDDL.TableInfo.TableName.Schema, testCaseDDL.TableInfo.TableName.Schema) + require.Equal(t, decodedDDL.TableInfo.TableName.Table, testCaseDDL.TableInfo.TableName.Table) + + // encode checkpoint event + waterMark := uint64(2333) + message, err = encoder.EncodeCheckpointEvent(waterMark) + require.NoError(t, err) + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + + messageType, hasNext, err = decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, messageType, model.MessageTypeResolved) + + decodedWatermark, err := decoder.NextResolvedEvent() + require.NoError(t, err) + require.Equal(t, decodedWatermark, waterMark) +} + +func TestCanalJSONClaimCheckE2E(t *testing.T) { + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = true + codecConfig.LargeMessageHandle.LargeMessageHandleOption = config.LargeMessageHandleOptionClaimCheck + codecConfig.LargeMessageHandle.LargeMessageHandleCompression = compression.Snappy + codecConfig.LargeMessageHandle.ClaimCheckStorageURI = "file:///tmp/canal-json-claim-check" + codecConfig.MaxMessageBytes = 500 + ctx := context.Background() + + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + insertEvent, _, _ := newLargeEvent4Test(t) + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {}) + require.NoError(t, err) + + // this is a large message, should be delivered to the external storage. + claimCheckLocationMessage := encoder.Build()[0] + + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + err = decoder.AddKeyValue(claimCheckLocationMessage.Key, claimCheckLocationMessage.Value) + require.NoError(t, err) + + messageType, ok, err := decoder.HasNext() + require.NoError(t, err) + require.Equal(t, messageType, model.MessageTypeRow) + require.True(t, ok) + + decodedLargeEvent, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + + require.Equal(t, insertEvent.CommitTs, decodedLargeEvent.CommitTs) + require.Equal(t, insertEvent.Table, decodedLargeEvent.Table) + require.Equal(t, insertEvent.PreColumns, decodedLargeEvent.PreColumns) + + decodedColumns := make(map[string]*model.Column, len(decodedLargeEvent.Columns)) + for _, column := range decodedLargeEvent.Columns { + decodedColumns[column.Name] = column + } + + expectedValue := collectExpectedDecodedValue(testColumnsTable) + for _, column := range insertEvent.Columns { + decodedColumn, ok := decodedColumns[column.Name] + require.True(t, ok) + require.Equal(t, column.Type, decodedColumn.Type) + require.Equal(t, expectedValue[column.Name], decodedColumn.Value) + } +} + +func TestNewCanalJSONMessageHandleKeyOnly4LargeMessage(t *testing.T) { + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = true + codecConfig.LargeMessageHandle.LargeMessageHandleOption = config.LargeMessageHandleOptionHandleKeyOnly + codecConfig.LargeMessageHandle.LargeMessageHandleCompression = compression.LZ4 + codecConfig.MaxMessageBytes = 500 + + ctx := context.Background() + + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + insertEvent, _, _ := newLargeEvent4Test(t) + err = encoder.AppendRowChangedEvent(context.Background(), "", insertEvent, func() {}) + require.NoError(t, err) + + message := encoder.Build()[0] + + decoder, err := NewBatchDecoder(context.Background(), codecConfig, &sql.DB{}) + require.NoError(t, err) + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + + messageType, ok, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, messageType, model.MessageTypeRow) + + handleKeyOnlyMessage := decoder.(*batchDecoder).msg.(*canalJSONMessageWithTiDBExtension) + require.True(t, handleKeyOnlyMessage.Extensions.OnlyHandleKey) + + for _, col := range insertEvent.Columns { + if col.Flag.IsHandleKey() { + require.Contains(t, handleKeyOnlyMessage.Data[0], col.Name) + require.Contains(t, handleKeyOnlyMessage.SQLType, col.Name) + require.Contains(t, handleKeyOnlyMessage.MySQLType, col.Name) + } else { + require.NotContains(t, handleKeyOnlyMessage.Data[0], col.Name) + require.NotContains(t, handleKeyOnlyMessage.SQLType, col.Name) + require.NotContains(t, handleKeyOnlyMessage.MySQLType, col.Name) + } + } +} + +func TestNewCanalJSONMessageFromDDL(t *testing.T) { + t.Parallel() + + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + ctx := context.Background() + + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build().(*JSONRowEventEncoder) + + message := encoder.newJSONMessageForDDL(testCaseDDL) + require.NotNil(t, message) + + msg, ok := message.(*JSONMessage) + require.True(t, ok) + require.Equal(t, convertToCanalTs(testCaseDDL.CommitTs), msg.ExecutionTime) + require.True(t, msg.IsDDL) + require.Equal(t, "cdc", msg.Schema) + require.Equal(t, "person", msg.Table) + require.Equal(t, testCaseDDL.Query, msg.Query) + require.Equal(t, "CREATE", msg.EventType) + + codecConfig.EnableTiDBExtension = true + builder, err = NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + + encoder = builder.Build().(*JSONRowEventEncoder) + message = encoder.newJSONMessageForDDL(testCaseDDL) + require.NotNil(t, message) + + withExtension, ok := message.(*canalJSONMessageWithTiDBExtension) + require.True(t, ok) + + require.NotNil(t, withExtension.Extensions) + require.Equal(t, testCaseDDL.CommitTs, withExtension.Extensions.CommitTs) +} + +func TestBatching(t *testing.T) { + ctx := context.Background() + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + require.NotNil(t, encoder) + + _, updateEvent, _ := newLargeEvent4Test(t) + updateCase := *updateEvent + for i := 1; i <= 1000; i++ { + ts := uint64(i) + updateCase.CommitTs = ts + err := encoder.AppendRowChangedEvent(context.Background(), "", &updateCase, nil) + require.NoError(t, err) + + if i%100 == 0 { + msgs := encoder.Build() + require.NotNil(t, msgs) + require.Len(t, msgs, 100) + + for j := range msgs { + require.Equal(t, 1, msgs[j].GetRowsCount()) + + var msg JSONMessage + err := json.Unmarshal(msgs[j].Value, &msg) + require.NoError(t, err) + require.Equal(t, "UPDATE", msg.EventType) + } + } + } + + require.Len(t, encoder.(*JSONRowEventEncoder).messages, 0) +} + +func TestEncodeCheckpointEvent(t *testing.T) { + t.Parallel() + + ctx := context.Background() + var watermark uint64 = 2333 + for _, enable := range []bool{false, true} { + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = enable + + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + + encoder := builder.Build() + + msg, err := encoder.EncodeCheckpointEvent(watermark) + require.NoError(t, err) + + if !enable { + require.Nil(t, msg) + continue + } + + require.NotNil(t, msg) + + ctx := context.Background() + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + err = decoder.AddKeyValue(msg.Key, msg.Value) + require.NoError(t, err) + + ty, hasNext, err := decoder.HasNext() + require.NoError(t, err) + if enable { + require.True(t, hasNext) + require.Equal(t, model.MessageTypeResolved, ty) + consumed, err := decoder.NextResolvedEvent() + require.NoError(t, err) + require.Equal(t, watermark, consumed) + } else { + require.False(t, hasNext) + require.Equal(t, model.MessageTypeUnknown, ty) + } + + ty, hasNext, err = decoder.HasNext() + require.NoError(t, err) + require.False(t, hasNext) + require.Equal(t, model.MessageTypeUnknown, ty) + } +} + +func TestCheckpointEventValueMarshal(t *testing.T) { + t.Parallel() + + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = true + + ctx := context.Background() + + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + + encoder := builder.Build() + var watermark uint64 = 1024 + msg, err := encoder.EncodeCheckpointEvent(watermark) + require.NoError(t, err) + require.NotNil(t, msg) + + // Unmarshal from the data we have encoded. + jsonMsg := canalJSONMessageWithTiDBExtension{ + &JSONMessage{}, + &tidbExtension{}, + } + err = json.Unmarshal(msg.Value, &jsonMsg) + require.NoError(t, err) + require.Equal(t, watermark, jsonMsg.Extensions.WatermarkTs) + // Hack the build time. + // Otherwise, the timing will be inconsistent. + jsonMsg.BuildTime = 1469579899 + rawBytes, err := json.MarshalIndent(jsonMsg, "", " ") + require.NoError(t, err) + + // No commit ts will be output. + expectedJSON := `{ + "id": 0, + "database": "", + "table": "", + "pkNames": null, + "isDdl": false, + "type": "TIDB_WATERMARK", + "es": 0, + "ts": 1469579899, + "sql": "", + "sqlType": null, + "mysqlType": null, + "data": null, + "old": null, + "_tidb": { + "watermarkTs": 1024 + } +}` + require.Equal(t, expectedJSON, string(rawBytes)) +} + +func TestDDLEventWithExtensionValueMarshal(t *testing.T) { + t.Parallel() + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + encoder := &JSONRowEventEncoder{ + builder: newCanalEntryBuilder(codecConfig), + config: &common.Config{EnableTiDBExtension: true}, + } + require.NotNil(t, encoder) + + message := encoder.newJSONMessageForDDL(testCaseDDL) + require.NotNil(t, message) + + msg, ok := message.(*canalJSONMessageWithTiDBExtension) + require.True(t, ok) + // Hack the build time. + // Otherwise, the timing will be inconsistent. + msg.BuildTime = 1469579899 + rawBytes, err := json.MarshalIndent(msg, "", " ") + require.NoError(t, err) + + // No watermark ts will be output. + expectedJSON := `{ + "id": 0, + "database": "cdc", + "table": "person", + "pkNames": null, + "isDdl": true, + "type": "CREATE", + "es": 1591943372224, + "ts": 1469579899, + "sql": "create table person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))", + "sqlType": null, + "mysqlType": null, + "data": null, + "old": null, + "_tidb": { + "commitTs": 417318403368288260 + } +}` + require.Equal(t, expectedJSON, string(rawBytes)) +} + +func TestCanalJSONAppendRowChangedEventWithCallback(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = true + ctx := context.Background() + + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + count := 0 + row := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{ + Name: "a", + Type: mysql.TypeVarchar, + Value: []byte("aa"), + }}, + ColInfos: colInfos, + } + + tests := []struct { + row *model.RowChangedEvent + callback func() + }{ + { + row: row, + callback: func() { + count += 1 + }, + }, + { + row: row, + callback: func() { + count += 2 + }, + }, + { + row: row, + callback: func() { + count += 3 + }, + }, + { + row: row, + callback: func() { + count += 4 + }, + }, + { + row: row, + callback: func() { + count += 5 + }, + }, + } + + // Empty build makes sure that the callback build logic not broken. + msgs := encoder.Build() + require.Len(t, msgs, 0, "no message should be built and no panic") + + // Append the events. + for _, test := range tests { + err := encoder.AppendRowChangedEvent(context.Background(), "", test.row, test.callback) + require.NoError(t, err) + } + require.Equal(t, 0, count, "nothing should be called") + + msgs = encoder.Build() + require.Len(t, msgs, 5, "expected 5 messages") + msgs[0].Callback() + require.Equal(t, 1, count, "expected one callback be called") + msgs[1].Callback() + require.Equal(t, 3, count, "expected one callback be called") + msgs[2].Callback() + require.Equal(t, 6, count, "expected one callback be called") + msgs[3].Callback() + require.Equal(t, 10, count, "expected one callback be called") + msgs[4].Callback() + require.Equal(t, 15, count, "expected one callback be called") +} + +func TestMaxMessageBytes(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + // the size of `testEvent` after being encoded by canal-json is 200 + testEvent := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{ + Name: "a", + Type: mysql.TypeVarchar, + Value: []byte("aa"), + }}, + ColInfos: colInfos, + } + + ctx := context.Background() + topic := "" + + // the test message length is smaller than max-message-bytes + maxMessageBytes := 300 + codecConfig := common.NewConfig(config.ProtocolCanalJSON).WithMaxMessageBytes(maxMessageBytes) + + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + err = encoder.AppendRowChangedEvent(ctx, topic, testEvent, nil) + require.NoError(t, err) + + // the test message length is larger than max-message-bytes + codecConfig = codecConfig.WithMaxMessageBytes(100) + + builder, err = NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + + encoder = builder.Build() + err = encoder.AppendRowChangedEvent(ctx, topic, testEvent, nil) + require.Error(t, err, cerror.ErrMessageTooLarge) +} + +func TestCanalJSONContentCompatibleE2E(t *testing.T) { + ctx := context.Background() + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = true + codecConfig.ContentCompatible = true + + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + + encoder := builder.Build() + + insertEvent, _, _ := newLargeEvent4Test(t) + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {}) + require.NoError(t, err) + + message := encoder.Build()[0] + + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + + messageType, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, messageType, model.MessageTypeRow) + + decodedEvent, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.Equal(t, decodedEvent.CommitTs, insertEvent.CommitTs) + require.Equal(t, decodedEvent.Table.Schema, insertEvent.Table.Schema) + require.Equal(t, decodedEvent.Table.Table, insertEvent.Table.Table) + + obtainedColumns := make(map[string]*model.Column, len(decodedEvent.Columns)) + for _, column := range decodedEvent.Columns { + obtainedColumns[column.Name] = column + } + + expectedValue := collectExpectedDecodedValue(testColumnsTable) + for _, actual := range insertEvent.Columns { + obtained, ok := obtainedColumns[actual.Name] + require.True(t, ok) + require.Equal(t, actual.Type, obtained.Type) + require.Equal(t, expectedValue[actual.Name], obtained.Value) + } +} diff --git a/pkg/sink/codec/canal/canal_json_txn_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_txn_event_encoder_test.go new file mode 100644 index 00000000000..dfbae44c58e --- /dev/null +++ b/pkg/sink/codec/canal/canal_json_txn_event_encoder_test.go @@ -0,0 +1,145 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package canal + +import ( + "testing" + + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "github.com/stretchr/testify/require" +) + +func TestBuildCanalJSONTxnEventEncoder(t *testing.T) { + t.Parallel() + cfg := common.NewConfig(config.ProtocolCanalJSON) + + builder := NewJSONTxnEventEncoderBuilder(cfg) + encoder, ok := builder.Build().(*JSONTxnEventEncoder) + require.True(t, ok) + require.NotNil(t, encoder.config) +} + +func TestCanalJSONTxnEventEncoderMaxMessageBytes(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + // the size of `testEvent` after being encoded by canal-json is 200 + testEvent := &model.SingleTableTxn{ + Table: &model.TableName{Schema: "test", Table: "t"}, + Rows: []*model.RowChangedEvent{ + { + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{ + Name: "col1", + Type: mysql.TypeVarchar, + Value: []byte("aa"), + }}, + ColInfos: colInfos, + }, + }, + } + + // the test message length is smaller than max-message-bytes + maxMessageBytes := 300 + cfg := common.NewConfig(config.ProtocolCanalJSON).WithMaxMessageBytes(maxMessageBytes) + encoder := NewJSONTxnEventEncoderBuilder(cfg).Build() + err := encoder.AppendTxnEvent(testEvent, nil) + require.Nil(t, err) + + // the test message length is larger than max-message-bytes + cfg = cfg.WithMaxMessageBytes(100) + encoder = NewJSONTxnEventEncoderBuilder(cfg).Build() + err = encoder.AppendTxnEvent(testEvent, nil) + require.NotNil(t, err) +} + +func TestCanalJSONAppendTxnEventEncoderWithCallback(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + cfg := common.NewConfig(config.ProtocolCanalJSON) + encoder := NewJSONTxnEventEncoderBuilder(cfg).Build() + require.NotNil(t, encoder) + + count := 0 + + txn := &model.SingleTableTxn{ + Table: &model.TableName{Schema: "test", Table: "t"}, + Rows: []*model.RowChangedEvent{ + { + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{ + Name: "a", + Type: mysql.TypeVarchar, + Value: []byte("aa"), + }}, + ColInfos: colInfos, + }, + { + CommitTs: 2, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{ + Name: "a", + Type: mysql.TypeVarchar, + Value: []byte("bb"), + }}, + ColInfos: colInfos, + }, + }, + } + + // Empty build makes sure that the callback build logic not broken. + msgs := encoder.Build() + require.Len(t, msgs, 0, "no message should be built and no panic") + + // Append the events. + callback := func() { + count++ + } + err := encoder.AppendTxnEvent(txn, callback) + require.Nil(t, err) + require.Equal(t, 0, count, "nothing should be called") + + msgs = encoder.Build() + require.Len(t, msgs, 1, "expected one message") + msgs[0].Callback() + require.Equal(t, 1, count, "expected one callback be called") + // Assert the build reset all the internal states. + require.Nil(t, encoder.(*JSONTxnEventEncoder).txnSchema) + require.Nil(t, encoder.(*JSONTxnEventEncoder).txnTable) + require.Nil(t, encoder.(*JSONTxnEventEncoder).callback) + require.Equal(t, 0, encoder.(*JSONTxnEventEncoder).batchSize) + require.Equal(t, 0, encoder.(*JSONTxnEventEncoder).valueBuf.Len()) +} diff --git a/pkg/sink/codec/canal/type_test.go b/pkg/sink/codec/canal/type_test.go new file mode 100644 index 00000000000..864a6faa84a --- /dev/null +++ b/pkg/sink/codec/canal/type_test.go @@ -0,0 +1,960 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package canal + +import ( + "testing" + + "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/sink/codec/internal" + "github.com/pingcap/tiflow/pkg/sink/codec/utils" + "github.com/stretchr/testify/require" +) + +func TestGetMySQLType4IntTypes(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t1 ( + a int primary key, + b tinyint, + c smallint, + d mediumint, + e bigint)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[0].ID) + require.True(t, ok) + + mysqlType := utils.GetMySQLType(columnInfo, false) + require.Equal(t, "int", mysqlType) + // mysql type with the default type length + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "int(11)", mysqlType) + + flag := tableInfo.ColumnsFlag[colInfos[0].ID] + javaType, err := getJavaSQLType(int64(2147483647), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "tinyint", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "tinyint(4)", mysqlType) + + flag = tableInfo.ColumnsFlag[colInfos[1].ID] + javaType, err = getJavaSQLType(int64(127), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeTINYINT, javaType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "smallint", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "smallint(6)", mysqlType) + + flag = tableInfo.ColumnsFlag[colInfos[2].ID] + javaType, err = getJavaSQLType(int64(32767), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeSMALLINT, javaType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[3].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "mediumint", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "mediumint(9)", mysqlType) + javaType, err = getJavaSQLType(int64(8388607), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[4].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "bigint", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "bigint(20)", mysqlType) + javaType, err = getJavaSQLType(int64(9223372036854775807), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeBIGINT, javaType) + + sql = `create table test.t2 ( + a int unsigned primary key, + b tinyint unsigned, + c smallint unsigned, + d mediumint unsigned, + e bigint unsigned)` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[0].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[0].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "int unsigned", mysqlType) + // mysql type with the default type length + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "int(10) unsigned", mysqlType) + + javaType, err = getJavaSQLType(uint64(2147483647), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) + javaType, err = getJavaSQLType(uint64(2147483648), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeBIGINT, javaType) + javaType, err = getJavaSQLType("0", columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) + javaType, err = getJavaSQLType(nil, columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[1].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "tinyint unsigned", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "tinyint(3) unsigned", mysqlType) + + javaType, err = getJavaSQLType(uint64(127), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeTINYINT, javaType) + javaType, err = getJavaSQLType(uint64(128), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeSMALLINT, javaType) + javaType, err = getJavaSQLType("0", columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeTINYINT, javaType) + javaType, err = getJavaSQLType(nil, columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeTINYINT, javaType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "smallint unsigned", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "smallint(5) unsigned", mysqlType) + javaType, err = getJavaSQLType(uint64(32767), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeSMALLINT, javaType) + javaType, err = getJavaSQLType(uint64(32768), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) + javaType, err = getJavaSQLType("0", columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeSMALLINT, javaType) + javaType, err = getJavaSQLType(nil, columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeSMALLINT, javaType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[3].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "mediumint unsigned", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "mediumint(8) unsigned", mysqlType) + javaType, err = getJavaSQLType(uint64(8388607), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) + javaType, err = getJavaSQLType(uint64(8388608), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) + javaType, err = getJavaSQLType("0", columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) + javaType, err = getJavaSQLType(nil, columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[4].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "bigint unsigned", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "bigint(20) unsigned", mysqlType) + javaType, err = getJavaSQLType(uint64(9223372036854775807), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeBIGINT, javaType) + javaType, err = getJavaSQLType(uint64(9223372036854775808), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeDECIMAL, javaType) + javaType, err = getJavaSQLType("0", columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeBIGINT, javaType) + javaType, err = getJavaSQLType(nil, columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeBIGINT, javaType) + + sql = `create table test.t3 ( + a int(10) primary key, + b tinyint(3) , + c smallint(5), + d mediumint(8), + e bigint(19))` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[0].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "int", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "int(10)", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "tinyint", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "tinyint(3)", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "smallint", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "smallint(5)", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "mediumint", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "mediumint(8)", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "bigint", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "bigint(19)", mysqlType) + + sql = `create table test.t4 ( + a int(10) unsigned primary key, + b tinyint(3) unsigned, + c smallint(5) unsigned, + d mediumint(8) unsigned, + e bigint(19) unsigned)` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[0].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "int unsigned", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "int(10) unsigned", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "tinyint unsigned", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "tinyint(3) unsigned", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "smallint unsigned", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "smallint(5) unsigned", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "mediumint unsigned", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "mediumint(8) unsigned", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "bigint unsigned", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "bigint(19) unsigned", mysqlType) + + sql = `create table test.t5 ( + a int zerofill primary key, + b tinyint zerofill, + c smallint unsigned zerofill, + d mediumint zerofill, + e bigint zerofill)` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[0].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "int unsigned zerofill", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "int(10) unsigned zerofill", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "tinyint unsigned zerofill", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "tinyint(3) unsigned zerofill", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "smallint unsigned zerofill", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "smallint(5) unsigned zerofill", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "mediumint unsigned zerofill", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "mediumint(8) unsigned zerofill", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "bigint unsigned zerofill", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "bigint(20) unsigned zerofill", mysqlType) + + sql = `create table test.t6( + a int primary key, + b bit, + c bit(3), + d bool)` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "bit", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "bit(1)", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "bit", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "bit(3)", mysqlType) + javaType, err = getJavaSQLType(uint64(65), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeBIT, javaType) + + // bool is identical to tinyint in the TiDB. + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "tinyint", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "tinyint(1)", mysqlType) +} + +func TestGetMySQLType4FloatType(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t1( + a int primary key, + b float, + c double)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + flag := tableInfo.ColumnsFlag[colInfos[1].ID] + mysqlType := utils.GetMySQLType(columnInfo, false) + require.Equal(t, "float", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "float", mysqlType) + javaType, err := getJavaSQLType(3.14, columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeREAL, javaType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "double", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "double", mysqlType) + javaType, err = getJavaSQLType(2.71, columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeDOUBLE, javaType) + + sql = `create table test.t2(a int primary key, b float(10, 3), c float(10))` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "float", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "float(10,3)", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "float", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "float", mysqlType) + + sql = `create table test.t3(a int primary key, b double(20, 3))` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "double", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "double(20,3)", mysqlType) + + sql = `create table test.t4( + a int primary key, + b float unsigned, + c double unsigned, + d float zerofill, + e double zerofill)` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[1].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "float unsigned", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "float unsigned", mysqlType) + javaType, err = getJavaSQLType(3.14, columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeREAL, javaType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "double unsigned", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "double unsigned", mysqlType) + javaType, err = getJavaSQLType(2.71, columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeDOUBLE, javaType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "float unsigned zerofill", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "float unsigned zerofill", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "double unsigned zerofill", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "double unsigned zerofill", mysqlType) +} + +func TestGetMySQLType4Decimal(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t1(a int primary key, b decimal, c numeric)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType := utils.GetMySQLType(columnInfo, false) + require.Equal(t, "decimal", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "decimal(10,0)", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "decimal", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "decimal(10,0)", mysqlType) + + sql = `create table test.t2(a int primary key, b decimal(5), c decimal(5, 2))` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "decimal", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "decimal(5,0)", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag := tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "decimal", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "decimal(5,2)", mysqlType) + javaType, err := getJavaSQLType("2333", columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeDECIMAL, javaType) + + sql = `create table test.t3(a int primary key, b decimal unsigned, c decimal zerofill)` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "decimal unsigned", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "decimal(10,0) unsigned", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "decimal unsigned zerofill", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "decimal(10,0) unsigned zerofill", mysqlType) + javaType, err = getJavaSQLType("2333", columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeDECIMAL, javaType) +} + +func TestGetMySQLType4TimeTypes(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t1(a int primary key, b time, c time(3))` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType := utils.GetMySQLType(columnInfo, false) + require.Equal(t, "time", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "time", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag := tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "time", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "time(3)", mysqlType) + javaType, err := getJavaSQLType("02:20:20", columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeTIME) + + sql = `create table test.t2(a int primary key, b datetime, c datetime(3))` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "datetime", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "datetime", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "datetime", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "datetime(3)", mysqlType) + javaType, err = getJavaSQLType("2020-02-20 02:20:20", columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeTIMESTAMP) + + sql = `create table test.t3(a int primary key, b timestamp, c timestamp(3))` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "timestamp", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "timestamp", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "timestamp", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "timestamp(3)", mysqlType) + javaType, err = getJavaSQLType("2020-02-20 02:20:20", columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeTIMESTAMP) + + sql = `create table test.t4(a int primary key, b date)` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[1].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "date", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "date", mysqlType) + javaType, err = getJavaSQLType("2020-02-20", columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeDATE) + + sql = `create table test.t5(a int primary key, b year, c year(4))` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "year", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "year(4)", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "year", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "year(4)", mysqlType) + javaType, err = getJavaSQLType("2020", columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeVARCHAR) +} + +func TestGetMySQLType4Char(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a int primary key, b char, c char(123))` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType := utils.GetMySQLType(columnInfo, false) + require.Equal(t, "char", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "char(1)", mysqlType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag := tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "char", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "char(123)", mysqlType) + javaType, err := getJavaSQLType([]uint8("测试char"), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeCHAR) + + sql = `create table test.t1(a int primary key, b varchar(123))` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[1].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "varchar", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "varchar(123)", mysqlType) + javaType, err = getJavaSQLType([]uint8("测试varchar"), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeVARCHAR) +} + +func TestGetMySQLType4TextTypes(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t1(a int primary key, b text, c tinytext, d mediumtext, e longtext)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + flag := tableInfo.ColumnsFlag[colInfos[1].ID] + mysqlType := utils.GetMySQLType(columnInfo, false) + require.Equal(t, "text", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "text", mysqlType) + javaType, err := getJavaSQLType([]uint8("测试text"), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeCLOB) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "tinytext", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "tinytext", mysqlType) + javaType, err = getJavaSQLType([]uint8("测试tinytext"), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeCLOB) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[3].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "mediumtext", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "mediumtext", mysqlType) + javaType, err = getJavaSQLType([]uint8("测试mediumtext"), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeCLOB) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[4].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "longtext", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "longtext", mysqlType) + javaType, err = getJavaSQLType([]uint8("测试longtext"), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeCLOB) +} + +func TestGetMySQLType4BinaryType(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t1(a int primary key, b binary, c binary(10))` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + flag := tableInfo.ColumnsFlag[colInfos[1].ID] + mysqlType := utils.GetMySQLType(columnInfo, false) + require.Equal(t, "binary", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "binary(1)", mysqlType) + javaType, err := getJavaSQLType([]uint8("测试binary"), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeBLOB) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "binary", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "binary(10)", mysqlType) + + sql = `create table test.t2(a int primary key, b varbinary(23))` + job = helper.DDL2Job(sql) + tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos = tableInfo.GetRowColInfos() + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[1].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "varbinary", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "varbinary(23)", mysqlType) + javaType, err = getJavaSQLType([]uint8("测试varbinary"), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeBLOB, javaType) +} + +func TestGetMySQLType4BlobType(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t1(a int primary key, b blob, c tinyblob, d mediumblob, e longblob)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + flag := tableInfo.ColumnsFlag[colInfos[1].ID] + mysqlType := utils.GetMySQLType(columnInfo, false) + require.Equal(t, "blob", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "blob", mysqlType) + javaType, err := getJavaSQLType([]uint8("测试blob"), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeBLOB) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "tinyblob", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "tinyblob", mysqlType) + javaType, err = getJavaSQLType([]uint8("测试tinyblob"), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeBLOB) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[3].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "mediumblob", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "mediumblob", mysqlType) + javaType, err = getJavaSQLType([]uint8("测试mediumblob"), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeBLOB) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[4].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "longblob", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "longblob", mysqlType) + javaType, err = getJavaSQLType([]uint8("测试longblob"), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, javaType, internal.JavaSQLTypeBLOB) +} + +func TestGetMySQLType4EnumAndSet(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a int primary key, b enum('a', 'b', 'c'), c set('a', 'b', 'c'))` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + flag := tableInfo.ColumnsFlag[colInfos[1].ID] + + mysqlType := utils.GetMySQLType(columnInfo, false) + require.Equal(t, "enum", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "enum('a','b','c')", mysqlType) + + javaType, err := getJavaSQLType(uint64(1), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) + + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag = tableInfo.ColumnsFlag[colInfos[2].ID] + + mysqlType = utils.GetMySQLType(columnInfo, false) + require.Equal(t, "set", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "set('a','b','c')", mysqlType) + + javaType, err = getJavaSQLType(uint64(2), columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeBIT, javaType) +} + +func TestGetMySQLType4JSON(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a int primary key, b json)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + flag := tableInfo.ColumnsFlag[colInfos[1].ID] + mysqlType := utils.GetMySQLType(columnInfo, false) + require.Equal(t, "json", mysqlType) + mysqlType = utils.GetMySQLType(columnInfo, true) + require.Equal(t, "json", mysqlType) + + javaType, err := getJavaSQLType("{\"key1\": \"value1\"}", columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeVARCHAR, javaType) + + javaType, err = getJavaSQLType(nil, columnInfo.FieldType.GetType(), flag) + require.NoError(t, err) + require.Equal(t, internal.JavaSQLTypeVARCHAR, javaType) +} diff --git a/pkg/sink/codec/utils/field_types.go b/pkg/sink/codec/utils/field_types.go new file mode 100644 index 00000000000..d327b4b5fcf --- /dev/null +++ b/pkg/sink/codec/utils/field_types.go @@ -0,0 +1,82 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "strings" + + "github.com/pingcap/tidb/parser/charset" + timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/parser/types" + "github.com/pingcap/tiflow/cdc/model" +) + +// SetBinChsClnFlag set the binary charset flag. +func SetBinChsClnFlag(ft *types.FieldType) *types.FieldType { + ft.SetCharset(charset.CharsetBin) + ft.SetCollate(charset.CollationBin) + ft.AddFlag(mysql.BinaryFlag) + return ft +} + +// SetUnsigned set the unsigned flag. +func SetUnsigned(ft *types.FieldType) *types.FieldType { + ft.SetFlag(uint(model.UnsignedFlag)) + return ft +} + +// SetElems set the elems to the ft +func SetElems(ft *types.FieldType, elems []string) *types.FieldType { + ft.SetElems(elems) + return ft +} + +// when encoding the canal format, for unsigned mysql type, add `unsigned` keyword. +// it should have the form `t unsigned`, such as `int unsigned` +func withUnsigned4MySQLType(mysqlType string, unsigned bool) string { + if unsigned && mysqlType != "bit" && mysqlType != "year" { + return mysqlType + " unsigned" + } + return mysqlType +} + +func withZerofill4MySQLType(mysqlType string, zerofill bool) string { + if zerofill && !strings.HasPrefix(mysqlType, "year") { + return mysqlType + " zerofill" + } + return mysqlType +} + +// GetMySQLType get the mysql type from column info +func GetMySQLType(columnInfo *timodel.ColumnInfo, fullType bool) string { + if !fullType { + result := types.TypeToStr(columnInfo.GetType(), columnInfo.GetCharset()) + result = withUnsigned4MySQLType(result, mysql.HasUnsignedFlag(columnInfo.GetFlag())) + result = withZerofill4MySQLType(result, mysql.HasZerofillFlag(columnInfo.GetFlag())) + return result + } + return columnInfo.GetTypeDesc() +} + +// ExtractBasicMySQLType return the mysql type +func ExtractBasicMySQLType(mysqlType string) byte { + for i := 0; i < len(mysqlType); i++ { + if mysqlType[i] == '(' || mysqlType[i] == ' ' { + return types.StrToType(mysqlType[:i]) + } + } + + return types.StrToType(mysqlType) +}