From 5921050d905001cddb3fdbe22949d121869243d4 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 22 Nov 2023 01:02:42 -0600 Subject: [PATCH] codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123) close pingcap/tiflow#10122 --- cdc/model/schema_storage.go | 3 - cdc/model/sink.go | 12 - cdc/sink/dmlsink/mq/mq_dml_sink_test.go | 22 +- cdc/sink/dmlsink/mq/worker_test.go | 139 ++-- pkg/sink/codec/canal/canal_encoder_test.go | 115 +++- pkg/sink/codec/canal/canal_entry.go | 72 +- pkg/sink/codec/canal/canal_entry_test.go | 68 +- .../codec/canal/canal_json_decoder_test.go | 11 +- pkg/sink/codec/canal/canal_json_message.go | 15 +- .../canal/canal_json_row_event_encoder.go | 27 +- .../canal_json_row_event_encoder_test.go | 134 ++-- .../canal_json_txn_event_encoder_test.go | 65 +- pkg/sink/codec/canal/canal_test_util.go | 418 +++++------- pkg/sink/codec/canal/type_test.go | 640 ++++++++++-------- pkg/sink/codec/utils/field_types.go | 58 +- 15 files changed, 922 insertions(+), 877 deletions(-) diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index a7421672df4..a4c50b09f1c 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -211,9 +211,6 @@ func (ti *TableInfo) initColumnsFlag() { if mysql.HasUnsignedFlag(colInfo.GetFlag()) { flag.SetIsUnsigned() } - if mysql.HasZerofillFlag(colInfo.GetFlag()) { - flag.SetZeroFill() - } ti.ColumnsFlag[colInfo.ID] = flag } diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 53cf876b49e..81dda4ff07a 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -81,20 +81,8 @@ const ( NullableFlag // UnsignedFlag means the column stores an unsigned integer UnsignedFlag - // ZerofillFlag means the column is zerofill - ZerofillFlag ) -// SetZeroFill sets ZerofillFlag -func (b *ColumnFlagType) SetZeroFill() { - (*util.Flag)(b).Add(util.Flag(ZerofillFlag)) -} - -// IsZerofill shows whether ZerofillFlag is set -func (b *ColumnFlagType) IsZerofill() bool { - return (*util.Flag)(b).HasAll(util.Flag(ZerofillFlag)) -} - // SetIsBinary sets BinaryFlag func (b *ColumnFlagType) SetIsBinary() { (*util.Flag)(b).Add(util.Flag(BinaryFlag)) diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink_test.go b/cdc/sink/dmlsink/mq/mq_dml_sink_test.go index 2e36cfbef74..c297a3079cd 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink_test.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink_test.go @@ -21,8 +21,7 @@ import ( "time" "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/parser/types" - "github.com/pingcap/tidb/util/rowcodec" + "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer" @@ -60,8 +59,6 @@ func TestNewKafkaDMLSinkFailed(t *testing.T) { } func TestWriteEvents(t *testing.T) { - t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -84,12 +81,21 @@ func TestWriteEvents(t *testing.T) { require.NotNil(t, s) defer s.Close() + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfo := tableInfo.GetRowColInfos() + tableStatus := state.TableSinkSinking row := &model.RowChangedEvent{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}}, - ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}}, + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}}, + ColInfos: colInfo, } events := make([]*dmlsink.CallbackableEvent[*model.SingleTableTxn], 0, 3000) diff --git a/cdc/sink/dmlsink/mq/worker_test.go b/cdc/sink/dmlsink/mq/worker_test.go index 7b0303ddb53..c31db5d074d 100644 --- a/cdc/sink/dmlsink/mq/worker_test.go +++ b/cdc/sink/dmlsink/mq/worker_test.go @@ -20,8 +20,7 @@ import ( "time" "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/rowcodec" + "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer" @@ -64,7 +63,13 @@ func newNonBatchEncodeWorker(ctx context.Context, t *testing.T) (*worker, dmlpro } func TestNonBatchEncode_SendMessages(t *testing.T) { - t.Parallel() + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfo := tableInfo.GetRowColInfos() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -77,10 +82,11 @@ func TestNonBatchEncode_SendMessages(t *testing.T) { Partition: 1, } row := &model.RowChangedEvent{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}}, - ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}}, + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}}, + ColInfos: colInfo, } tableStatus := state.TableSinkSinking @@ -258,7 +264,13 @@ func TestBatchEncode_Group(t *testing.T) { } func TestBatchEncode_GroupWhenTableStopping(t *testing.T) { - t.Parallel() + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfo := tableInfo.GetRowColInfos() key1 := TopicPartitionKey{ Topic: "test", @@ -278,9 +290,11 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) { { rowEvent: &dmlsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, + ColInfos: colInfo, }, Callback: func() {}, SinkState: &replicatingStatus, @@ -325,8 +339,6 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) { } func TestBatchEncode_SendMessages(t *testing.T) { - t.Parallel() - key1 := TopicPartitionKey{ Topic: "test", Partition: 1, @@ -345,14 +357,24 @@ func TestBatchEncode_SendMessages(t *testing.T) { defer cancel() worker, p := newBatchEncodeWorker(ctx, t) defer worker.close() + + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfo := tableInfo.GetRowColInfos() + events := []mqEvent{ { rowEvent: &dmlsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}}, - ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}}, + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "aa"}}, + ColInfos: colInfo, }, Callback: func() {}, SinkState: &tableStatus, @@ -362,10 +384,11 @@ func TestBatchEncode_SendMessages(t *testing.T) { { rowEvent: &dmlsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ - CommitTs: 2, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}}, - ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}}, + CommitTs: 2, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}}, + ColInfos: colInfo, }, Callback: func() {}, SinkState: &tableStatus, @@ -375,10 +398,11 @@ func TestBatchEncode_SendMessages(t *testing.T) { { rowEvent: &dmlsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ - CommitTs: 3, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}}, - ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}}, + CommitTs: 3, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "cc"}}, + ColInfos: colInfo, }, Callback: func() {}, SinkState: &tableStatus, @@ -388,10 +412,11 @@ func TestBatchEncode_SendMessages(t *testing.T) { { rowEvent: &dmlsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ - CommitTs: 2, - Table: &model.TableName{Schema: "aa", Table: "bb"}, - Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}}, - ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}}, + CommitTs: 2, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}}, + ColInfos: colInfo, }, Callback: func() {}, SinkState: &tableStatus, @@ -401,10 +426,11 @@ func TestBatchEncode_SendMessages(t *testing.T) { { rowEvent: &dmlsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ - CommitTs: 2, - Table: &model.TableName{Schema: "aaa", Table: "bbb"}, - Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}}, - ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}}, + CommitTs: 2, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}}, + ColInfos: colInfo, }, Callback: func() {}, SinkState: &tableStatus, @@ -414,10 +440,11 @@ func TestBatchEncode_SendMessages(t *testing.T) { { rowEvent: &dmlsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ - CommitTs: 3, - Table: &model.TableName{Schema: "aaa", Table: "bbb"}, - Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}}, - ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}}, + CommitTs: 3, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}}, + ColInfos: colInfo, }, Callback: func() {}, SinkState: &tableStatus, @@ -475,8 +502,6 @@ func TestBatchEncodeWorker_Abort(t *testing.T) { } func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) { - t.Parallel() - key1 := TopicPartitionKey{ Topic: "test", Partition: 1, @@ -491,14 +516,24 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) { defer worker.close() replicatingStatus := state.TableSinkSinking stoppedStatus := state.TableSinkStopping + + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfo := tableInfo.GetRowColInfos() + events := []mqEvent{ { rowEvent: &dmlsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}}, - ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}}, + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "aa"}}, + ColInfos: colInfo, }, Callback: func() {}, SinkState: &replicatingStatus, @@ -508,10 +543,11 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) { { rowEvent: &dmlsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ - CommitTs: 2, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}}, - ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}}, + CommitTs: 2, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}}, + ColInfos: colInfo, }, Callback: func() {}, SinkState: &replicatingStatus, @@ -521,10 +557,11 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) { { rowEvent: &dmlsink.RowChangeCallbackableEvent{ Event: &model.RowChangedEvent{ - CommitTs: 3, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}}, - ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}}, + CommitTs: 3, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, + Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}}, + ColInfos: colInfo, }, Callback: func() {}, SinkState: &stoppedStatus, diff --git a/pkg/sink/codec/canal/canal_encoder_test.go b/pkg/sink/codec/canal/canal_encoder_test.go index b7036bfb30a..b56cbf9a730 100644 --- a/pkg/sink/codec/canal/canal_encoder_test.go +++ b/pkg/sink/codec/canal/canal_encoder_test.go @@ -19,8 +19,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/rowcodec" + "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" @@ -28,14 +27,87 @@ import ( "github.com/stretchr/testify/require" ) +var ( + rowCases = [][]*model.RowChangedEvent{ + {{ + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + Columns: []*model.Column{{ + Name: "col1", + Type: mysql.TypeVarchar, + Value: []byte("aa"), + }}, + }}, + { + { + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + Columns: []*model.Column{{ + Name: "col1", + Type: mysql.TypeVarchar, + Value: []byte("aa"), + }}, + }, + { + CommitTs: 2, + Table: &model.TableName{Schema: "test", Table: "t"}, + Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}}, + }, + }, + } + + ddlCases = [][]*model.DDLEvent{ + {{ + CommitTs: 1, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "a", Table: "b", + }, + }, + Query: "create table a", + Type: 1, + }}, + { + { + CommitTs: 2, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "a", Table: "b", + }, + }, + Query: "create table b", + Type: 3, + }, + { + CommitTs: 3, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "a", Table: "b", + }, + }, + Query: "create table c", + Type: 3, + }, + }, + } +) + func TestCanalBatchEncoder(t *testing.T) { - t.Parallel() - s := defaultCanalBatchTester - for _, cs := range s.rowCases { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(10) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + for _, cs := range rowCases { encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal)) for _, row := range cs { + _, _, colInfo := tableInfo.GetRowColInfos() + row.TableInfo = tableInfo + row.ColInfos = colInfo err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil) - require.Nil(t, err) + require.NoError(t, err) } res := encoder.Build() @@ -43,7 +115,6 @@ func TestCanalBatchEncoder(t *testing.T) { require.Nil(t, res) continue } - require.Len(t, res, 1) require.Nil(t, res[0].Key) require.Equal(t, len(cs), res[0].GetRowsCount()) @@ -58,33 +129,36 @@ func TestCanalBatchEncoder(t *testing.T) { require.Equal(t, len(cs), len(messages.GetMessages())) } - for _, cs := range s.ddlCases { + for _, cs := range ddlCases { encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal)) for _, ddl := range cs { msg, err := encoder.EncodeDDLEvent(ddl) - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, msg) require.Nil(t, msg.Key) packet := &canal.Packet{} err = proto.Unmarshal(msg.Value, packet) - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, canal.PacketType_MESSAGES, packet.GetType()) messages := &canal.Messages{} err = proto.Unmarshal(packet.GetBody(), messages) - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, 1, len(messages.GetMessages())) - require.Nil(t, err) + require.NoError(t, err) } } } func TestCanalAppendRowChangedEventWithCallback(t *testing.T) { - encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal)) - require.NotNil(t, encoder) + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() - count := 0 + sql := `create table test.t(a varchar(10) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfo := tableInfo.GetRowColInfos() row := &model.RowChangedEvent{ CommitTs: 1, Table: &model.TableName{Schema: "a", Table: "b"}, @@ -93,12 +167,15 @@ func TestCanalAppendRowChangedEventWithCallback(t *testing.T) { Type: mysql.TypeVarchar, Value: []byte("aa"), }}, - ColInfos: []rowcodec.ColInfo{{ - ID: 1, - Ft: types.NewFieldType(mysql.TypeVarchar), - }}, + TableInfo: tableInfo, + ColInfos: colInfo, } + encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal)) + require.NotNil(t, encoder) + + count := 0 + tests := []struct { row *model.RowChangedEvent callback func() diff --git a/pkg/sink/codec/canal/canal_entry.go b/pkg/sink/codec/canal/canal_entry.go index cfea78e8d04..852c9184581 100644 --- a/pkg/sink/codec/canal/canal_entry.go +++ b/pkg/sink/codec/canal/canal_entry.go @@ -18,18 +18,17 @@ import ( "math" "reflect" "strconv" - "strings" "github.com/golang/protobuf/proto" // nolint:staticcheck "github.com/pingcap/errors" mm "github.com/pingcap/tidb/parser/model" + timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/codec/internal" + "github.com/pingcap/tiflow/pkg/sink/codec/utils" canal "github.com/pingcap/tiflow/proto/canal" "golang.org/x/text/encoding" "golang.org/x/text/encoding/charmap" @@ -82,7 +81,7 @@ func (b *canalEntryBuilder) buildHeader(commitTs uint64, schema string, table st // see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java#L276-L1147 // all value will be represented in string type // see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L760-L855 -func (b *canalEntryBuilder) formatValue(value interface{}, javaType internal.JavaSQLType) (result string, err error) { +func (b *canalEntryBuilder) formatValue(value interface{}, isBinary bool) (result string, err error) { // value would be nil, if no value insert for the column. if value == nil { return "", nil @@ -100,20 +99,15 @@ func (b *canalEntryBuilder) formatValue(value interface{}, javaType internal.Jav case string: result = v case []byte: - // JavaSQLTypeVARCHAR / JavaSQLTypeCHAR / JavaSQLTypeBLOB / JavaSQLTypeCLOB / - // special handle for text and blob // see https://github.com/alibaba/canal/blob/9f6021cf36f78cc8ac853dcf37a1769f359b868b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L801 - switch javaType { - // for normal text - case internal.JavaSQLTypeVARCHAR, internal.JavaSQLTypeCHAR, internal.JavaSQLTypeCLOB: - result = string(v) - default: - // JavaSQLTypeBLOB + if isBinary { decoded, err := b.bytesDecoder.Bytes(v) if err != nil { return "", err } result = string(decoded) + } else { + result = string(v) } default: result = fmt.Sprintf("%v", v) @@ -123,21 +117,21 @@ func (b *canalEntryBuilder) formatValue(value interface{}, javaType internal.Jav // build the Column in the canal RowData // see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L756-L872 -func (b *canalEntryBuilder) buildColumn(c *model.Column, colInfo rowcodec.ColInfo, colName string, updated bool) (*canal.Column, error) { - mysqlType := getMySQLType(colInfo.Ft, c.Flag, b.config.ContentCompatible) +func (b *canalEntryBuilder) buildColumn(c *model.Column, columnInfo *timodel.ColumnInfo, updated bool) (*canal.Column, error) { + mysqlType := utils.GetMySQLType(columnInfo, b.config.ContentCompatible) javaType, err := getJavaSQLType(c.Value, c.Type, c.Flag) if err != nil { return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } - value, err := b.formatValue(c.Value, javaType) + value, err := b.formatValue(c.Value, c.Flag.IsBinary()) if err != nil { return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } canalColumn := &canal.Column{ SqlType: int32(javaType), - Name: colName, + Name: c.Name, IsKey: c.Flag.IsPrimaryKey(), Updated: updated, IsNullPresent: &canal.Column_IsNull{IsNull: c.Value == nil}, @@ -154,7 +148,12 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKey if column == nil { continue } - c, err := b.buildColumn(column, e.ColInfos[idx], column.Name, !e.IsDelete()) + columnInfo, ok := e.TableInfo.GetColumnInfo(e.ColInfos[idx].ID) + if !ok { + return nil, cerror.ErrCanalEncodeFailed.GenWithStack( + "column info not found for column id: %d", e.ColInfos[idx].ID) + } + c, err := b.buildColumn(column, columnInfo, !e.IsDelete()) if err != nil { return nil, errors.Trace(err) } @@ -170,7 +169,12 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKey if onlyHandleKeyColumns && !column.Flag.IsHandleKey() { continue } - c, err := b.buildColumn(column, e.ColInfos[idx], column.Name, !e.IsDelete()) + columnInfo, ok := e.TableInfo.GetColumnInfo(e.ColInfos[idx].ID) + if !ok { + return nil, cerror.ErrCanalEncodeFailed.GenWithStack( + "column info not found for column id: %d", e.ColInfos[idx].ID) + } + c, err := b.buildColumn(column, columnInfo, !e.IsDelete()) if err != nil { return nil, errors.Trace(err) } @@ -368,35 +372,3 @@ func getJavaSQLType(value interface{}, tp byte, flag model.ColumnFlagType) (resu return javaType, nil } - -// when encoding the canal format, for unsigned mysql type, add `unsigned` keyword. -// it should have the form `t unsigned`, such as `int unsigned` -func withUnsigned4MySQLType(mysqlType string, unsigned bool) string { - if unsigned && mysqlType != "bit" && mysqlType != "year" { - return mysqlType + " unsigned" - } - return mysqlType -} - -func withZerofill4MySQLType(mysqlType string, zerofill bool) string { - if zerofill && - !strings.HasPrefix(mysqlType, "bit") && - !strings.HasPrefix(mysqlType, "year") { - return mysqlType + " zerofill" - } - return mysqlType -} - -func getMySQLType(fieldType *types.FieldType, flag model.ColumnFlagType, fullType bool) string { - if !fullType { - result := types.TypeToStr(fieldType.GetType(), fieldType.GetCharset()) - result = withUnsigned4MySQLType(result, flag.IsUnsigned()) - result = withZerofill4MySQLType(result, flag.IsZerofill()) - - return result - } - - result := fieldType.InfoSchemaStr() - result = withZerofill4MySQLType(result, flag.IsZerofill()) - return result -} diff --git a/pkg/sink/codec/canal/canal_entry_test.go b/pkg/sink/codec/canal/canal_entry_test.go index 4da25b53ec6..25d833f3c50 100644 --- a/pkg/sink/codec/canal/canal_entry_test.go +++ b/pkg/sink/codec/canal/canal_entry_test.go @@ -19,39 +19,46 @@ import ( "github.com/golang/protobuf/proto" mm "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/rowcodec" + "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/codec/internal" - "github.com/pingcap/tiflow/pkg/sink/codec/utils" canal "github.com/pingcap/tiflow/proto/canal" "github.com/stretchr/testify/require" "golang.org/x/text/encoding/charmap" ) func TestInsert(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t( + id int primary key, + name varchar(32), + tiny tinyint unsigned, + comment text, + bb blob)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + event := &model.RowChangedEvent{ CommitTs: 417318403368288260, Table: &model.TableName{ Schema: "cdc", Table: "person", }, + TableInfo: tableInfo, Columns: []*model.Column{ {Name: "id", Type: mysql.TypeLong, Flag: model.PrimaryKeyFlag, Value: 1}, {Name: "name", Type: mysql.TypeVarchar, Value: "Bob"}, {Name: "tiny", Type: mysql.TypeTiny, Value: 255}, {Name: "comment", Type: mysql.TypeBlob, Value: []byte("测试")}, - {Name: "blob", Type: mysql.TypeBlob, Value: []byte("测试blob"), Flag: model.BinaryFlag}, - }, - ColInfos: []rowcodec.ColInfo{ - {ID: 1, IsPKHandle: true, Ft: utils.SetFlag(types.NewFieldType(mysql.TypeLong), uint(model.PrimaryKeyFlag))}, - {ID: 2, Ft: types.NewFieldType(mysql.TypeVarchar)}, - {ID: 3, Ft: types.NewFieldType(mysql.TypeTiny)}, - {ID: 4, Ft: utils.NewTextFieldType(mysql.TypeBlob)}, - {ID: 5, Ft: utils.SetBinChsClnFlag(types.NewFieldType(mysql.TypeBlob))}, + {Name: "bb", Type: mysql.TypeBlob, Value: []byte("测试blob"), Flag: model.BinaryFlag}, }, + ColInfos: colInfos, } codecConfig := common.NewConfig(config.ProtocolCanalJSON) @@ -103,7 +110,7 @@ func TestInsert(t *testing.T) { require.NoError(t, err) require.Equal(t, "测试", col.GetValue()) require.Equal(t, "text", col.GetMysqlType()) - case "blob": + case "bb": require.Equal(t, int32(internal.JavaSQLTypeBLOB), col.GetSqlType()) require.False(t, col.GetIsKey()) require.False(t, col.GetIsNull()) @@ -116,12 +123,22 @@ func TestInsert(t *testing.T) { } func TestUpdate(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(id int primary key, name varchar(32))` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + event := &model.RowChangedEvent{ CommitTs: 417318403368288260, Table: &model.TableName{ - Schema: "cdc", - Table: "person", + Schema: "test", + Table: "t", }, + TableInfo: tableInfo, Columns: []*model.Column{ {Name: "id", Type: mysql.TypeLong, Flag: model.PrimaryKeyFlag, Value: 1}, {Name: "name", Type: mysql.TypeVarchar, Value: "Bob"}, @@ -130,10 +147,7 @@ func TestUpdate(t *testing.T) { {Name: "id", Type: mysql.TypeLong, Flag: model.PrimaryKeyFlag, Value: 2}, {Name: "name", Type: mysql.TypeVarchar, Value: "Nancy"}, }, - ColInfos: []rowcodec.ColInfo{ - {ID: 1, IsPKHandle: true, Ft: utils.SetFlag(types.NewFieldType(mysql.TypeLong), uint(model.PrimaryKeyFlag))}, - {ID: 2, Ft: types.NewFieldType(mysql.TypeVarchar)}, - }, + ColInfos: colInfos, } codecConfig := common.NewConfig(config.ProtocolCanalJSON) builder := newCanalEntryBuilder(codecConfig) @@ -198,18 +212,26 @@ func TestUpdate(t *testing.T) { } func TestDelete(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(id int primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + event := &model.RowChangedEvent{ CommitTs: 417318403368288260, Table: &model.TableName{ - Schema: "cdc", - Table: "person", + Schema: "test", + Table: "t", }, + TableInfo: tableInfo, PreColumns: []*model.Column{ {Name: "id", Type: mysql.TypeLong, Flag: model.PrimaryKeyFlag, Value: 1}, }, - ColInfos: []rowcodec.ColInfo{ - {ID: 1, IsPKHandle: true, Ft: utils.SetFlag(types.NewFieldType(mysql.TypeLong), uint(model.PrimaryKeyFlag))}, - }, + ColInfos: colInfos, } codecConfig := common.NewConfig(config.ProtocolCanalJSON) builder := newCanalEntryBuilder(codecConfig) diff --git a/pkg/sink/codec/canal/canal_json_decoder_test.go b/pkg/sink/codec/canal/canal_json_decoder_test.go index 1150c2add5f..d95446f42f3 100644 --- a/pkg/sink/codec/canal/canal_json_decoder_test.go +++ b/pkg/sink/codec/canal/canal_json_decoder_test.go @@ -24,8 +24,7 @@ import ( ) func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { - t.Parallel() - + insertEvent, _, _ := newLargeEvent4Test(t) ctx := context.Background() expectedDecodedValue := collectExpectedDecodedValue(testColumnsTable) for _, encodeEnable := range []bool{false, true} { @@ -37,7 +36,7 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { require.NoError(t, err) encoder := builder.Build() - err = encoder.AppendRowChangedEvent(ctx, "", testCaseInsert, nil) + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil) require.NoError(t, err) messages := encoder.Build() @@ -60,9 +59,9 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { consumed, err := decoder.NextRowChangedEvent() require.NoError(t, err) - require.Equal(t, testCaseInsert.Table, consumed.Table) + require.Equal(t, insertEvent.Table, consumed.Table) if encodeEnable && decodeEnable { - require.Equal(t, testCaseInsert.CommitTs, consumed.CommitTs) + require.Equal(t, insertEvent.CommitTs, consumed.CommitTs) } else { require.Equal(t, uint64(0), consumed.CommitTs) } @@ -72,7 +71,7 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { require.True(t, ok) require.Equal(t, expected, col.Value) - for _, item := range testCaseInsert.Columns { + for _, item := range insertEvent.Columns { if item.Name == col.Name { require.Equal(t, item.Type, col.Type) } diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index 64bb02aec30..566ac88495d 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -21,9 +21,9 @@ import ( "github.com/pingcap/log" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tiflow/cdc/model" cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec/utils" canal "github.com/pingcap/tiflow/proto/canal" "go.uber.org/zap" "golang.org/x/text/encoding/charmap" @@ -213,19 +213,8 @@ func canalJSONColumnMap2RowChangeColumns(cols map[string]interface{}, mysqlType return result, nil } -func extractBasicMySQLType(mysqlType string) string { - for i := 0; i < len(mysqlType); i++ { - if mysqlType[i] == '(' || mysqlType[i] == ' ' { - return mysqlType[:i] - } - } - return mysqlType -} - func canalJSONFormatColumn(value interface{}, name string, mysqlTypeStr string) *model.Column { - mysqlTypeStr = extractBasicMySQLType(mysqlTypeStr) - mysqlType := types.StrToType(mysqlTypeStr) - + mysqlType := utils.ExtractBasicMySQLType(mysqlTypeStr) result := &model.Column{ Type: mysqlType, Name: name, diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder.go b/pkg/sink/codec/canal/canal_json_row_event_encoder.go index ff606752d18..bbe4c8a1b90 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder.go @@ -26,7 +26,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec" "github.com/pingcap/tiflow/pkg/sink/codec/common" - "github.com/pingcap/tiflow/pkg/sink/codec/internal" + "github.com/pingcap/tiflow/pkg/sink/codec/utils" "github.com/pingcap/tiflow/pkg/sink/kafka/claimcheck" "go.uber.org/zap" ) @@ -38,7 +38,6 @@ func fillColumns( newColumnMap map[string]*model.Column, out *jwriter.Writer, builder *canalEntryBuilder, - javaTypeMap map[string]internal.JavaSQLType, ) error { if len(columns) == 0 { out.RawString("null") @@ -61,11 +60,7 @@ func fillColumns( } else { out.RawByte(',') } - javaType, ok := javaTypeMap[col.Name] - if !ok { - return cerror.ErrCanalEncodeFailed.GenWithStack("java type is not found for column %s", col.Name) - } - value, err := builder.formatValue(col.Value, javaType) + value, err := builder.formatValue(col.Value, col.Flag.IsBinary()) if err != nil { return cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } @@ -98,8 +93,6 @@ func newJSONMessageForDML( } mysqlTypeMap := make(map[string]string, len(e.Columns)) - javaTypeMap := make(map[string]internal.JavaSQLType, len(e.Columns)) - out := &jwriter.Writer{} out.RawByte('{') { @@ -185,8 +178,12 @@ func newJSONMessageForDML( out.String(col.Name) out.RawByte(':') out.Int32(int32(javaType)) - javaTypeMap[col.Name] = javaType - mysqlTypeMap[col.Name] = getMySQLType(e.ColInfos[idx].Ft, col.Flag, config.ContentCompatible) + columnInfo, ok := e.TableInfo.GetColumnInfo(e.ColInfos[idx].ID) + if !ok { + return nil, cerror.ErrCanalEncodeFailed.GenWithStack( + "cannot found the column info by the column ID: %d", e.ColInfos[idx].ID) + } + mysqlTypeMap[col.Name] = utils.GetMySQLType(columnInfo, config.ContentCompatible) } } if emptyColumn { @@ -221,7 +218,7 @@ func newJSONMessageForDML( out.RawString(",\"old\":null") out.RawString(",\"data\":") if err := fillColumns( - e.PreColumns, false, onlyHandleKey, nil, out, builder, javaTypeMap, + e.PreColumns, false, onlyHandleKey, nil, out, builder, ); err != nil { return nil, err } @@ -229,7 +226,7 @@ func newJSONMessageForDML( out.RawString(",\"old\":null") out.RawString(",\"data\":") if err := fillColumns( - e.Columns, false, onlyHandleKey, nil, out, builder, javaTypeMap, + e.Columns, false, onlyHandleKey, nil, out, builder, ); err != nil { return nil, err } @@ -243,13 +240,13 @@ func newJSONMessageForDML( } out.RawString(",\"old\":") if err := fillColumns( - e.PreColumns, config.OnlyOutputUpdatedColumns, onlyHandleKey, newColsMap, out, builder, javaTypeMap, + e.PreColumns, config.OnlyOutputUpdatedColumns, onlyHandleKey, newColsMap, out, builder, ); err != nil { return nil, err } out.RawString(",\"data\":") if err := fillColumns( - e.Columns, false, onlyHandleKey, nil, out, builder, javaTypeMap, + e.Columns, false, onlyHandleKey, nil, out, builder, ); err != nil { return nil, err } diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index 9fcf0fb6c90..ef48961636c 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -20,8 +20,7 @@ import ( "testing" "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/rowcodec" + "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/compression" "github.com/pingcap/tiflow/pkg/config" @@ -43,8 +42,6 @@ func TestBuildCanalJSONRowEventEncoder(t *testing.T) { } func TestNewCanalJSONMessage4DML(t *testing.T) { - t.Parallel() - ctx := context.Background() codecConfig := common.NewConfig(config.ProtocolCanalJSON) builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) @@ -53,7 +50,8 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { encoder, ok := builder.Build().(*JSONRowEventEncoder) require.True(t, ok) - data, err := newJSONMessageForDML(encoder.builder, testCaseInsert, encoder.config, false, "") + insertEvent, updateEvent, deleteEvent := newLargeEvent4Test(t) + data, err := newJSONMessageForDML(encoder.builder, insertEvent, encoder.config, false, "") require.NoError(t, err) var msg canalJSONMessageInterface = &JSONMessage{} @@ -65,12 +63,12 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.NotNil(t, jsonMsg.Data) require.Nil(t, jsonMsg.Old) require.Equal(t, "INSERT", jsonMsg.EventType) - require.Equal(t, convertToCanalTs(testCaseInsert.CommitTs), jsonMsg.ExecutionTime) - require.Equal(t, "cdc", jsonMsg.Schema) - require.Equal(t, "person", jsonMsg.Table) + require.Equal(t, convertToCanalTs(insertEvent.CommitTs), jsonMsg.ExecutionTime) + require.Equal(t, "test", jsonMsg.Schema) + require.Equal(t, "t", jsonMsg.Table) require.False(t, jsonMsg.IsDDL) - for _, col := range testCaseInsert.Columns { + for _, col := range insertEvent.Columns { require.Contains(t, jsonMsg.Data[0], col.Name) require.Contains(t, jsonMsg.SQLType, col.Name) require.Contains(t, jsonMsg.MySQLType, col.Name) @@ -104,7 +102,7 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.Equal(t, item.expectedEncodedValue, obtainedValue) } - data, err = newJSONMessageForDML(encoder.builder, testCaseUpdate, encoder.config, false, "") + data, err = newJSONMessageForDML(encoder.builder, updateEvent, encoder.config, false, "") require.NoError(t, err) jsonMsg = &JSONMessage{} @@ -115,16 +113,16 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.NotNil(t, jsonMsg.Old) require.Equal(t, "UPDATE", jsonMsg.EventType) - for _, col := range testCaseUpdate.Columns { + for _, col := range updateEvent.Columns { require.Contains(t, jsonMsg.Data[0], col.Name) require.Contains(t, jsonMsg.SQLType, col.Name) require.Contains(t, jsonMsg.MySQLType, col.Name) } - for _, col := range testCaseUpdate.PreColumns { + for _, col := range updateEvent.PreColumns { require.Contains(t, jsonMsg.Old[0], col.Name) } - data, err = newJSONMessageForDML(encoder.builder, testCaseDelete, encoder.config, false, "") + data, err = newJSONMessageForDML(encoder.builder, deleteEvent, encoder.config, false, "") require.NoError(t, err) jsonMsg = &JSONMessage{} @@ -134,12 +132,12 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.Nil(t, jsonMsg.Old) require.Equal(t, "DELETE", jsonMsg.EventType) - for _, col := range testCaseDelete.PreColumns { + for _, col := range deleteEvent.PreColumns { require.Contains(t, jsonMsg.Data[0], col.Name) } codecConfig = &common.Config{DeleteOnlyHandleKeyColumns: true} - data, err = newJSONMessageForDML(encoder.builder, testCaseDelete, codecConfig, false, "") + data, err = newJSONMessageForDML(encoder.builder, deleteEvent, codecConfig, false, "") require.NoError(t, err) jsonMsg = &JSONMessage{} @@ -148,7 +146,7 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.NotNil(t, jsonMsg.Data) require.Nil(t, jsonMsg.Old) - for _, col := range testCaseDelete.PreColumns { + for _, col := range deleteEvent.PreColumns { if col.Flag.IsHandleKey() { require.Contains(t, jsonMsg.Data[0], col.Name) require.Contains(t, jsonMsg.SQLType, col.Name) @@ -169,7 +167,7 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { encoder, ok = builder.Build().(*JSONRowEventEncoder) require.True(t, ok) - data, err = newJSONMessageForDML(encoder.builder, testCaseUpdate, encoder.config, false, "") + data, err = newJSONMessageForDML(encoder.builder, updateEvent, encoder.config, false, "") require.NoError(t, err) withExtension := &canalJSONMessageWithTiDBExtension{} @@ -177,11 +175,11 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.NoError(t, err) require.NotNil(t, withExtension.Extensions) - require.Equal(t, testCaseUpdate.CommitTs, withExtension.Extensions.CommitTs) + require.Equal(t, updateEvent.CommitTs, withExtension.Extensions.CommitTs) encoder, ok = builder.Build().(*JSONRowEventEncoder) require.True(t, ok) - data, err = newJSONMessageForDML(encoder.builder, testCaseUpdate, encoder.config, false, "") + data, err = newJSONMessageForDML(encoder.builder, updateEvent, encoder.config, false, "") require.NoError(t, err) withExtension = &canalJSONMessageWithTiDBExtension{} @@ -190,12 +188,10 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.Equal(t, 0, len(withExtension.JSONMessage.Old[0])) require.NotNil(t, withExtension.Extensions) - require.Equal(t, testCaseUpdate.CommitTs, withExtension.Extensions.CommitTs) + require.Equal(t, updateEvent.CommitTs, withExtension.Extensions.CommitTs) } func TestCanalJSONCompressionE2E(t *testing.T) { - t.Parallel() - codecConfig := common.NewConfig(config.ProtocolCanalJSON) codecConfig.EnableTiDBExtension = true codecConfig.LargeMessageHandle.LargeMessageHandleCompression = compression.LZ4 @@ -205,8 +201,10 @@ func TestCanalJSONCompressionE2E(t *testing.T) { require.NoError(t, err) encoder := builder.Build() + insertEvent, _, _ := newLargeEvent4Test(t) + // encode normal row changed event - err = encoder.AppendRowChangedEvent(ctx, "", testCaseInsert, func() {}) + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {}) require.NoError(t, err) message := encoder.Build()[0] @@ -224,9 +222,9 @@ func TestCanalJSONCompressionE2E(t *testing.T) { decodedEvent, err := decoder.NextRowChangedEvent() require.NoError(t, err) - require.Equal(t, decodedEvent.CommitTs, testCaseInsert.CommitTs) - require.Equal(t, decodedEvent.Table.Schema, testCaseInsert.Table.Schema) - require.Equal(t, decodedEvent.Table.Table, testCaseInsert.Table.Table) + require.Equal(t, decodedEvent.CommitTs, insertEvent.CommitTs) + require.Equal(t, decodedEvent.Table.Schema, insertEvent.Table.Schema) + require.Equal(t, decodedEvent.Table.Table, insertEvent.Table.Table) // encode DDL event message, err = encoder.EncodeDDLEvent(testCaseDDL) @@ -267,8 +265,6 @@ func TestCanalJSONCompressionE2E(t *testing.T) { } func TestCanalJSONClaimCheckE2E(t *testing.T) { - t.Parallel() - codecConfig := common.NewConfig(config.ProtocolCanalJSON) codecConfig.EnableTiDBExtension = true codecConfig.LargeMessageHandle.LargeMessageHandleOption = config.LargeMessageHandleOptionClaimCheck @@ -281,7 +277,8 @@ func TestCanalJSONClaimCheckE2E(t *testing.T) { require.NoError(t, err) encoder := builder.Build() - err = encoder.AppendRowChangedEvent(ctx, "", testCaseInsert, func() {}) + insertEvent, _, _ := newLargeEvent4Test(t) + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {}) require.NoError(t, err) // this is a large message, should be delivered to the external storage. @@ -301,9 +298,9 @@ func TestCanalJSONClaimCheckE2E(t *testing.T) { decodedLargeEvent, err := decoder.NextRowChangedEvent() require.NoError(t, err) - require.Equal(t, testCaseInsert.CommitTs, decodedLargeEvent.CommitTs) - require.Equal(t, testCaseInsert.Table, decodedLargeEvent.Table) - require.Equal(t, testCaseInsert.PreColumns, decodedLargeEvent.PreColumns) + require.Equal(t, insertEvent.CommitTs, decodedLargeEvent.CommitTs) + require.Equal(t, insertEvent.Table, decodedLargeEvent.Table) + require.Equal(t, insertEvent.PreColumns, decodedLargeEvent.PreColumns) decodedColumns := make(map[string]*model.Column, len(decodedLargeEvent.Columns)) for _, column := range decodedLargeEvent.Columns { @@ -311,7 +308,7 @@ func TestCanalJSONClaimCheckE2E(t *testing.T) { } expectedValue := collectExpectedDecodedValue(testColumnsTable) - for _, column := range testCaseInsert.Columns { + for _, column := range insertEvent.Columns { decodedColumn, ok := decodedColumns[column.Name] require.True(t, ok) require.Equal(t, column.Type, decodedColumn.Type) @@ -320,8 +317,6 @@ func TestCanalJSONClaimCheckE2E(t *testing.T) { } func TestNewCanalJSONMessageHandleKeyOnly4LargeMessage(t *testing.T) { - t.Parallel() - codecConfig := common.NewConfig(config.ProtocolCanalJSON) codecConfig.EnableTiDBExtension = true codecConfig.LargeMessageHandle.LargeMessageHandleOption = config.LargeMessageHandleOptionHandleKeyOnly @@ -334,7 +329,8 @@ func TestNewCanalJSONMessageHandleKeyOnly4LargeMessage(t *testing.T) { require.NoError(t, err) encoder := builder.Build() - err = encoder.AppendRowChangedEvent(context.Background(), "", testCaseInsert, func() {}) + insertEvent, _, _ := newLargeEvent4Test(t) + err = encoder.AppendRowChangedEvent(context.Background(), "", insertEvent, func() {}) require.NoError(t, err) message := encoder.Build()[0] @@ -353,7 +349,7 @@ func TestNewCanalJSONMessageHandleKeyOnly4LargeMessage(t *testing.T) { handleKeyOnlyMessage := decoder.(*batchDecoder).msg.(*canalJSONMessageWithTiDBExtension) require.True(t, handleKeyOnlyMessage.Extensions.OnlyHandleKey) - for _, col := range testCaseInsert.Columns { + for _, col := range insertEvent.Columns { if col.Flag.IsHandleKey() { require.Contains(t, handleKeyOnlyMessage.Data[0], col.Name) require.Contains(t, handleKeyOnlyMessage.SQLType, col.Name) @@ -404,8 +400,6 @@ func TestNewCanalJSONMessageFromDDL(t *testing.T) { } func TestBatching(t *testing.T) { - t.Parallel() - ctx := context.Background() codecConfig := common.NewConfig(config.ProtocolCanalJSON) builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) @@ -413,7 +407,8 @@ func TestBatching(t *testing.T) { encoder := builder.Build() require.NotNil(t, encoder) - updateCase := *testCaseUpdate + _, updateEvent, _ := newLargeEvent4Test(t) + updateCase := *updateEvent for i := 1; i <= 1000; i++ { ts := uint64(i) updateCase.CommitTs = ts @@ -586,6 +581,15 @@ func TestDDLEventWithExtensionValueMarshal(t *testing.T) { } func TestCanalJSONAppendRowChangedEventWithCallback(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + codecConfig := common.NewConfig(config.ProtocolCanalJSON) codecConfig.EnableTiDBExtension = true ctx := context.Background() @@ -596,19 +600,15 @@ func TestCanalJSONAppendRowChangedEventWithCallback(t *testing.T) { count := 0 row := &model.RowChangedEvent{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, Columns: []*model.Column{{ - Name: "col1", + Name: "a", Type: mysql.TypeVarchar, Value: []byte("aa"), }}, - ColInfos: []rowcodec.ColInfo{ - { - ID: 0, - Ft: types.NewFieldType(mysql.TypeVarchar), - }, - }, + ColInfos: colInfos, } tests := []struct { @@ -673,21 +673,26 @@ func TestCanalJSONAppendRowChangedEventWithCallback(t *testing.T) { } func TestMaxMessageBytes(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() + // the size of `testEvent` after being encoded by canal-json is 200 testEvent := &model.RowChangedEvent{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, Columns: []*model.Column{{ - Name: "col1", + Name: "a", Type: mysql.TypeVarchar, Value: []byte("aa"), }}, - ColInfos: []rowcodec.ColInfo{ - { - ID: 0, - Ft: types.NewFieldType(mysql.TypeVarchar), - }, - }, + ColInfos: colInfos, } ctx := context.Background() @@ -716,8 +721,6 @@ func TestMaxMessageBytes(t *testing.T) { } func TestCanalJSONContentCompatibleE2E(t *testing.T) { - t.Parallel() - ctx := context.Background() codecConfig := common.NewConfig(config.ProtocolCanalJSON) codecConfig.EnableTiDBExtension = true @@ -728,7 +731,8 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) { encoder := builder.Build() - err = encoder.AppendRowChangedEvent(ctx, "", testCaseInsert, func() {}) + insertEvent, _, _ := newLargeEvent4Test(t) + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {}) require.NoError(t, err) message := encoder.Build()[0] @@ -746,9 +750,9 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) { decodedEvent, err := decoder.NextRowChangedEvent() require.NoError(t, err) - require.Equal(t, decodedEvent.CommitTs, testCaseInsert.CommitTs) - require.Equal(t, decodedEvent.Table.Schema, testCaseInsert.Table.Schema) - require.Equal(t, decodedEvent.Table.Table, testCaseInsert.Table.Table) + require.Equal(t, decodedEvent.CommitTs, insertEvent.CommitTs) + require.Equal(t, decodedEvent.Table.Schema, insertEvent.Table.Schema) + require.Equal(t, decodedEvent.Table.Table, insertEvent.Table.Table) obtainedColumns := make(map[string]*model.Column, len(decodedEvent.Columns)) for _, column := range decodedEvent.Columns { @@ -756,7 +760,7 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) { } expectedValue := collectExpectedDecodedValue(testColumnsTable) - for _, actual := range testCaseInsert.Columns { + for _, actual := range insertEvent.Columns { obtained, ok := obtainedColumns[actual.Name] require.True(t, ok) require.Equal(t, actual.Type, obtained.Type) diff --git a/pkg/sink/codec/canal/canal_json_txn_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_txn_event_encoder_test.go index cf7c767329a..dfbae44c58e 100644 --- a/pkg/sink/codec/canal/canal_json_txn_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_txn_event_encoder_test.go @@ -17,8 +17,7 @@ import ( "testing" "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/rowcodec" + "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" @@ -36,26 +35,29 @@ func TestBuildCanalJSONTxnEventEncoder(t *testing.T) { } func TestCanalJSONTxnEventEncoderMaxMessageBytes(t *testing.T) { - t.Parallel() + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() // the size of `testEvent` after being encoded by canal-json is 200 testEvent := &model.SingleTableTxn{ - Table: &model.TableName{Schema: "a", Table: "b"}, + Table: &model.TableName{Schema: "test", Table: "t"}, Rows: []*model.RowChangedEvent{ { - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, Columns: []*model.Column{{ Name: "col1", Type: mysql.TypeVarchar, Value: []byte("aa"), }}, - ColInfos: []rowcodec.ColInfo{ - { - ID: 1, - Ft: types.NewFieldType(mysql.TypeVarchar), - }, - }, + ColInfos: colInfos, }, }, } @@ -75,7 +77,14 @@ func TestCanalJSONTxnEventEncoderMaxMessageBytes(t *testing.T) { } func TestCanalJSONAppendTxnEventEncoderWithCallback(t *testing.T) { - t.Parallel() + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t(a varchar(255) primary key)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + + _, _, colInfos := tableInfo.GetRowColInfos() cfg := common.NewConfig(config.ProtocolCanalJSON) encoder := NewJSONTxnEventEncoderBuilder(cfg).Build() @@ -84,37 +93,29 @@ func TestCanalJSONAppendTxnEventEncoderWithCallback(t *testing.T) { count := 0 txn := &model.SingleTableTxn{ - Table: &model.TableName{Schema: "a", Table: "b"}, + Table: &model.TableName{Schema: "test", Table: "t"}, Rows: []*model.RowChangedEvent{ { - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, Columns: []*model.Column{{ - Name: "col1", + Name: "a", Type: mysql.TypeVarchar, Value: []byte("aa"), }}, - ColInfos: []rowcodec.ColInfo{ - { - ID: 1, - Ft: types.NewFieldType(mysql.TypeVarchar), - }, - }, + ColInfos: colInfos, }, { - CommitTs: 2, - Table: &model.TableName{Schema: "a", Table: "b"}, + CommitTs: 2, + Table: &model.TableName{Schema: "test", Table: "t"}, + TableInfo: tableInfo, Columns: []*model.Column{{ - Name: "col1", + Name: "a", Type: mysql.TypeVarchar, Value: []byte("bb"), }}, - ColInfos: []rowcodec.ColInfo{ - { - ID: 1, - Ft: types.NewFieldType(mysql.TypeVarchar), - }, - }, + ColInfos: colInfos, }, }, } diff --git a/pkg/sink/codec/canal/canal_test_util.go b/pkg/sink/codec/canal/canal_test_util.go index 7932f6cf40a..c7d3d82ba03 100644 --- a/pkg/sink/codec/canal/canal_test_util.go +++ b/pkg/sink/codec/canal/canal_test_util.go @@ -14,17 +14,16 @@ package canal import ( + "testing" + mm "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/rowcodec" + "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/sink/codec/utils" ) type testColumnTuple struct { - column *model.Column - colInfo rowcodec.ColInfo + column *model.Column // expectedEncodedValue is expected by encoding expectedEncodedValue string @@ -36,232 +35,184 @@ type testColumnTuple struct { var ( testColumnsTable = []*testColumnTuple{ { - &model.Column{Name: "tinyint", Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Type: mysql.TypeTiny, Value: int64(127)}, - rowcodec.ColInfo{ID: 1, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTiny)}, + &model.Column{Name: "t", Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Type: mysql.TypeTiny, Value: int64(127)}, "127", "127", }, { &model.Column{ - Name: "tinyint unsigned", Type: mysql.TypeTiny, Value: uint64(127), + Name: "tu1", Type: mysql.TypeTiny, Value: uint64(127), Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ID: 2, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeTiny))}, "127", "127", }, { &model.Column{ - Name: "tinyint unsigned 2", Type: mysql.TypeTiny, Value: uint64(128), + Name: "tu2", Type: mysql.TypeTiny, Value: uint64(128), Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ID: 3, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeTiny))}, "128", "128", }, { &model.Column{ - Name: "tinyint unsigned 3", Type: mysql.TypeTiny, Value: "0", + Name: "tu3", Type: mysql.TypeTiny, Value: "0", Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ID: 4, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeTiny))}, "0", "0", }, { &model.Column{ - Name: "tinyint unsigned 4", Type: mysql.TypeTiny, Value: nil, + Name: "tu4", Type: mysql.TypeTiny, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag, }, - rowcodec.ColInfo{ID: 5, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeTiny))}, "", nil, }, { - &model.Column{Name: "smallint", Type: mysql.TypeShort, Value: int64(32767)}, - rowcodec.ColInfo{ID: 6, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeShort)}, + &model.Column{Name: "s", Type: mysql.TypeShort, Value: int64(32767)}, "32767", "32767", }, { &model.Column{ - Name: "smallint unsigned", Type: mysql.TypeShort, Value: uint64(32767), + Name: "su1", Type: mysql.TypeShort, Value: uint64(32767), Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ID: 7, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeShort))}, "32767", "32767", }, { &model.Column{ - Name: "smallint unsigned 2", Type: mysql.TypeShort, Value: uint64(32768), + Name: "su2", Type: mysql.TypeShort, Value: uint64(32768), Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ID: 8, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeShort))}, "32768", "32768", }, { &model.Column{ - Name: "smallint unsigned 3", Type: mysql.TypeShort, Value: "0", + Name: "su3", Type: mysql.TypeShort, Value: "0", Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ID: 9, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeShort))}, "0", "0", }, { &model.Column{ - Name: "smallint unsigned 4", Type: mysql.TypeShort, Value: nil, + Name: "su4", Type: mysql.TypeShort, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag, }, - rowcodec.ColInfo{ID: 10, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeShort))}, "", nil, }, { - &model.Column{Name: "mediumint", Type: mysql.TypeInt24, Value: int64(8388607)}, - rowcodec.ColInfo{ID: 11, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeInt24)}, + &model.Column{Name: "m", Type: mysql.TypeInt24, Value: int64(8388607)}, "8388607", "8388607", }, { &model.Column{ - Name: "mediumint unsigned", Type: mysql.TypeInt24, Value: uint64(8388607), + Name: "mu1", Type: mysql.TypeInt24, Value: uint64(8388607), Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ID: 12, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeInt24))}, "8388607", "8388607", }, { &model.Column{ - Name: "mediumint unsigned 2", Type: mysql.TypeInt24, Value: uint64(8388608), + Name: "mu2", Type: mysql.TypeInt24, Value: uint64(8388608), Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ID: 13, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeInt24))}, "8388608", "8388608", }, { &model.Column{ - Name: "mediumint unsigned 3", Type: mysql.TypeInt24, Value: "0", + Name: "mu3", Type: mysql.TypeInt24, Value: "0", Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ID: 14, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeInt24))}, "0", "0", }, { &model.Column{ - Name: "mediumint unsigned 4", Type: mysql.TypeInt24, Value: nil, + Name: "mu4", Type: mysql.TypeInt24, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag, }, - rowcodec.ColInfo{ID: 15, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeInt24))}, "", nil, }, { - &model.Column{Name: "int", Type: mysql.TypeLong, Value: int64(2147483647)}, - rowcodec.ColInfo{ID: 16, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)}, + &model.Column{Name: "i", Type: mysql.TypeLong, Value: int64(2147483647)}, "2147483647", "2147483647", }, { &model.Column{ - Name: "int unsigned", Type: mysql.TypeLong, Value: uint64(2147483647), + Name: "iu1", Type: mysql.TypeLong, Value: uint64(2147483647), Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ID: 17, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeLong))}, "2147483647", "2147483647", }, { &model.Column{ - Name: "int unsigned 2", Type: mysql.TypeLong, Value: uint64(2147483648), + Name: "iu2", Type: mysql.TypeLong, Value: uint64(2147483648), Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ID: 18, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeLong))}, "2147483648", "2147483648", }, { &model.Column{ - Name: "int unsigned 3", Type: mysql.TypeLong, Value: "0", + Name: "iu3", Type: mysql.TypeLong, Value: "0", Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ID: 19, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeLong))}, "0", "0", }, { &model.Column{ - Name: "int unsigned 4", Type: mysql.TypeLong, Value: nil, + Name: "iu4", Type: mysql.TypeLong, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag, }, - rowcodec.ColInfo{ID: 20, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeLong))}, "", nil, }, { - &model.Column{Name: "bigint", Type: mysql.TypeLonglong, Value: int64(9223372036854775807)}, - rowcodec.ColInfo{ID: 21, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLonglong)}, + &model.Column{Name: "bi", Type: mysql.TypeLonglong, Value: int64(9223372036854775807)}, "9223372036854775807", "9223372036854775807", }, { &model.Column{ - Name: "bigint unsigned", Type: mysql.TypeLonglong, Value: uint64(9223372036854775807), + Name: "biu1", Type: mysql.TypeLonglong, Value: uint64(9223372036854775807), Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ - ID: 22, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeLonglong)), - }, "9223372036854775807", "9223372036854775807", }, { &model.Column{ - Name: "bigint unsigned 2", Type: mysql.TypeLonglong, Value: uint64(9223372036854775808), + Name: "biu2", Type: mysql.TypeLonglong, Value: uint64(9223372036854775808), Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ - ID: 23, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeLonglong)), - }, "9223372036854775808", "9223372036854775808", }, { &model.Column{ - Name: "bigint unsigned 3", Type: mysql.TypeLonglong, Value: "0", + Name: "biu3", Type: mysql.TypeLonglong, Value: "0", Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ - ID: 24, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeLonglong)), - }, "0", "0", }, { &model.Column{ - Name: "bigint unsigned 4", Type: mysql.TypeLonglong, Value: nil, + Name: "biu4", Type: mysql.TypeLonglong, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag, }, - rowcodec.ColInfo{ - ID: 25, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeLonglong)), - }, "", nil, }, { - &model.Column{Name: "float", Type: mysql.TypeFloat, Value: 3.14}, - rowcodec.ColInfo{ID: 26, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeFloat)}, + &model.Column{Name: "floatT", Type: mysql.TypeFloat, Value: 3.14}, "3.14", "3.14", }, { - &model.Column{Name: "double", Type: mysql.TypeDouble, Value: 2.71}, - rowcodec.ColInfo{ID: 27, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeDouble)}, + &model.Column{Name: "doubleT", Type: mysql.TypeDouble, Value: 2.71}, "2.71", "2.71", }, { - &model.Column{Name: "decimal", Type: mysql.TypeNewDecimal, Value: "2333"}, - rowcodec.ColInfo{ID: 28, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeNewDecimal)}, + &model.Column{Name: "decimalT", Type: mysql.TypeNewDecimal, Value: "2333"}, "2333", "2333", }, @@ -270,7 +221,6 @@ var ( Name: "float unsigned", Type: mysql.TypeFloat, Value: 3.14, Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ID: 29, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeFloat))}, "3.14", "3.14", }, { @@ -278,7 +228,6 @@ var ( Name: "double unsigned", Type: mysql.TypeDouble, Value: 2.71, Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ID: 30, IsPKHandle: false, VirtualGenCol: false, Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeDouble))}, "2.71", "2.71", }, { @@ -286,313 +235,250 @@ var ( Name: "decimal unsigned", Type: mysql.TypeNewDecimal, Value: "2333", Flag: model.UnsignedFlag, }, - rowcodec.ColInfo{ - ID: 31, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetUnsigned(types.NewFieldType(mysql.TypeNewDecimal)), - }, "2333", "2333", }, - // + // for column value type in `[]uint8` and have `BinaryFlag`, expectedEncodedValue is dummy. { - &model.Column{Name: "varchar", Type: mysql.TypeVarchar, Value: []uint8("测试Varchar")}, - rowcodec.ColInfo{ID: 32, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeVarchar)}, + &model.Column{Name: "varcharT", Type: mysql.TypeVarchar, Value: []uint8("测试Varchar")}, "测试Varchar", "测试Varchar", }, { - &model.Column{Name: "char", Type: mysql.TypeString, Value: []uint8("测试String")}, - rowcodec.ColInfo{ID: 33, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeString)}, + &model.Column{Name: "charT", Type: mysql.TypeString, Value: []uint8("测试String")}, "测试String", "测试String", }, { &model.Column{ - Name: "binary", Type: mysql.TypeString, Value: []uint8("测试Binary"), + Name: "binaryT", Type: mysql.TypeString, Value: []uint8("测试Binary"), Flag: model.BinaryFlag, }, - rowcodec.ColInfo{ - ID: 34, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetBinChsClnFlag(types.NewFieldType(mysql.TypeString)), - }, "测试Binary", "测试Binary", }, { &model.Column{ - Name: "varbinary", Type: mysql.TypeVarchar, Value: []uint8("测试varbinary"), + Name: "varbinaryT", Type: mysql.TypeVarchar, Value: []uint8("测试varbinary"), Flag: model.BinaryFlag, }, - rowcodec.ColInfo{ - ID: 35, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetBinChsClnFlag(types.NewFieldType(mysql.TypeVarchar)), - }, "测试varbinary", "测试varbinary", }, { - &model.Column{Name: "tinytext", Type: mysql.TypeTinyBlob, Value: []uint8("测试Tinytext")}, - rowcodec.ColInfo{ID: 36, IsPKHandle: false, VirtualGenCol: false, Ft: utils.NewTextFieldType(mysql.TypeTinyBlob)}, + &model.Column{Name: "tinytextT", Type: mysql.TypeTinyBlob, Value: []uint8("测试Tinytext")}, "测试Tinytext", "测试Tinytext", }, { - &model.Column{Name: "text", Type: mysql.TypeBlob, Value: []uint8("测试text")}, - rowcodec.ColInfo{ID: 37, IsPKHandle: false, VirtualGenCol: false, Ft: utils.NewTextFieldType(mysql.TypeBlob)}, + &model.Column{Name: "textT", Type: mysql.TypeBlob, Value: []uint8("测试text")}, "测试text", "测试text", }, { - &model.Column{Name: "mediumtext", Type: mysql.TypeMediumBlob, Value: []uint8("测试mediumtext")}, - rowcodec.ColInfo{ID: 38, IsPKHandle: false, VirtualGenCol: false, Ft: utils.NewTextFieldType(mysql.TypeMediumBlob)}, + &model.Column{Name: "mediumtextT", Type: mysql.TypeMediumBlob, Value: []uint8("测试mediumtext")}, "测试mediumtext", "测试mediumtext", }, { - &model.Column{Name: "longtext", Type: mysql.TypeLongBlob, Value: []uint8("测试longtext")}, - rowcodec.ColInfo{ID: 39, IsPKHandle: false, VirtualGenCol: false, Ft: utils.NewTextFieldType(mysql.TypeLongBlob)}, + &model.Column{Name: "longtextT", Type: mysql.TypeLongBlob, Value: []uint8("测试longtext")}, "测试longtext", "测试longtext", }, { &model.Column{ - Name: "tinyblob", Type: mysql.TypeTinyBlob, Value: []uint8("测试tinyblob"), + Name: "tinyblobT", Type: mysql.TypeTinyBlob, Value: []uint8("测试tinyblob"), Flag: model.BinaryFlag, }, - rowcodec.ColInfo{ID: 40, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTinyBlob)}, "测试tinyblob", "测试tinyblob", }, { &model.Column{ - Name: "blob", Type: mysql.TypeBlob, Value: []uint8("测试blob"), + Name: "blobT", Type: mysql.TypeBlob, Value: []uint8("测试blob"), Flag: model.BinaryFlag, }, - rowcodec.ColInfo{ID: 41, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeBlob)}, "测试blob", "测试blob", }, { &model.Column{ - Name: "mediumblob", Type: mysql.TypeMediumBlob, Value: []uint8("测试mediumblob"), + Name: "mediumblobT", Type: mysql.TypeMediumBlob, Value: []uint8("测试mediumblob"), Flag: model.BinaryFlag, }, - rowcodec.ColInfo{ID: 42, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeMediumBlob)}, "测试mediumblob", "测试mediumblob", }, { &model.Column{ - Name: "longblob", Type: mysql.TypeLongBlob, Value: []uint8("测试longblob"), + Name: "longblobT", Type: mysql.TypeLongBlob, Value: []uint8("测试longblob"), Flag: model.BinaryFlag, }, - rowcodec.ColInfo{ID: 43, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLongBlob)}, "测试longblob", "测试longblob", }, { - &model.Column{Name: "date", Type: mysql.TypeDate, Value: "2020-02-20"}, - rowcodec.ColInfo{ID: 44, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeDate)}, + &model.Column{Name: "dateT", Type: mysql.TypeDate, Value: "2020-02-20"}, "2020-02-20", "2020-02-20", }, { - &model.Column{Name: "datetime", Type: mysql.TypeDatetime, Value: "2020-02-20 02:20:20"}, - rowcodec.ColInfo{ID: 45, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeDatetime)}, + &model.Column{Name: "datetimeT", Type: mysql.TypeDatetime, Value: "2020-02-20 02:20:20"}, "2020-02-20 02:20:20", "2020-02-20 02:20:20", }, { - &model.Column{Name: "timestamp", Type: mysql.TypeTimestamp, Value: "2020-02-20 10:20:20"}, - rowcodec.ColInfo{ID: 46, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTimestamp)}, + &model.Column{Name: "timestampT", Type: mysql.TypeTimestamp, Value: "2020-02-20 10:20:20"}, "2020-02-20 10:20:20", "2020-02-20 10:20:20", }, { - &model.Column{Name: "time", Type: mysql.TypeDuration, Value: "02:20:20"}, - rowcodec.ColInfo{ID: 47, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeDuration)}, + &model.Column{Name: "timeT", Type: mysql.TypeDuration, Value: "02:20:20"}, "02:20:20", "02:20:20", }, { - &model.Column{Name: "year", Type: mysql.TypeYear, Value: "2020", Flag: model.UnsignedFlag}, - rowcodec.ColInfo{ID: 48, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeYear)}, + &model.Column{Name: "yearT", Type: mysql.TypeYear, Value: "2020", Flag: model.UnsignedFlag}, "2020", "2020", }, { - &model.Column{Name: "enum", Type: mysql.TypeEnum, Value: uint64(1)}, - rowcodec.ColInfo{ID: 49, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeEnum)}, + &model.Column{Name: "enumT", Type: mysql.TypeEnum, Value: uint64(1)}, "1", "1", }, { - &model.Column{Name: "set", Type: mysql.TypeSet, Value: uint64(2)}, - rowcodec.ColInfo{ - ID: 50, - IsPKHandle: false, - VirtualGenCol: false, - Ft: utils.SetElems(types.NewFieldType(mysql.TypeSet), []string{"a", "b", "c"}), - }, + &model.Column{Name: "setT", Type: mysql.TypeSet, Value: uint64(2)}, "2", uint64(2), }, { &model.Column{ - Name: "bit", Type: mysql.TypeBit, Value: uint64(65), + Name: "bitT", Type: mysql.TypeBit, Value: uint64(65), Flag: model.UnsignedFlag | model.BinaryFlag, }, - rowcodec.ColInfo{ID: 51, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeBit)}, "65", uint64(65), }, { &model.Column{ - Name: "json", Type: mysql.TypeJSON, Value: "{\"key1\": \"value1\"}", + Name: "jsonT", Type: mysql.TypeJSON, Value: "{\"key1\": \"value1\"}", Flag: model.BinaryFlag, }, - rowcodec.ColInfo{ID: 52, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeJSON)}, "{\"key1\": \"value1\"}", "{\"key1\": \"value1\"}", }, } - defaultCanalBatchTester = &struct { - rowCases [][]*model.RowChangedEvent - ddlCases [][]*model.DDLEvent - }{ - rowCases: [][]*model.RowChangedEvent{ - {{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{ - Name: "col1", - Type: mysql.TypeVarchar, - Value: []byte("aa"), - }}, - ColInfos: []rowcodec.ColInfo{{ - ID: 1, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeVarchar), - }}, - }}, - { - { - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{ - Name: "col1", - Type: mysql.TypeVarchar, - Value: []byte("aa"), - }}, - ColInfos: []rowcodec.ColInfo{{ - ID: 1, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeVarchar), - }}, - }, - { - CommitTs: 2, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}}, - ColInfos: []rowcodec.ColInfo{{ - ID: 1, - IsPKHandle: false, - VirtualGenCol: false, - Ft: types.NewFieldType(mysql.TypeVarchar), - }}, - }, - }, - }, - ddlCases: [][]*model.DDLEvent{ - {{ - CommitTs: 1, - TableInfo: &model.TableInfo{ - TableName: model.TableName{ - Schema: "a", Table: "b", - }, - }, - Query: "create table a", - Type: 1, - }}, - { - { - CommitTs: 2, - TableInfo: &model.TableInfo{ - TableName: model.TableName{ - Schema: "a", Table: "b", - }, - }, - Query: "create table b", - Type: 3, - }, - { - CommitTs: 3, - TableInfo: &model.TableInfo{ - TableName: model.TableName{ - Schema: "a", Table: "b", - }, - }, - Query: "create table c", - Type: 3, - }, + testCaseDDL = &model.DDLEvent{ + CommitTs: 417318403368288260, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "cdc", Table: "person", }, }, + Query: "create table person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))", + Type: mm.ActionCreateTable, + } +) + +func collectAllColumns(groups []*testColumnTuple) []*model.Column { + columns := make([]*model.Column, 0, len(groups)) + for _, item := range groups { + columns = append(columns, item.column) } + return columns +} - testColumns, testColInfos = collectAllColumns(testColumnsTable) +func collectExpectedDecodedValue(columns []*testColumnTuple) map[string]interface{} { + result := make(map[string]interface{}, len(columns)) + for _, item := range columns { + result[item.column.Name] = item.expectedDecodedValue + } + return result +} - testCaseInsert = &model.RowChangedEvent{ +func newLargeEvent4Test(t *testing.T) (*model.RowChangedEvent, *model.RowChangedEvent, *model.RowChangedEvent) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t( + t tinyint primary key, + tu1 tinyint unsigned, + tu2 tinyint unsigned, + tu3 tinyint unsigned, + tu4 tinyint unsigned, + s smallint, + su1 smallint unsigned, + su2 smallint unsigned, + su3 smallint unsigned, + su4 smallint unsigned, + m mediumint, + mu1 mediumint unsigned, + mu2 mediumint unsigned, + mu3 mediumint unsigned, + mu4 mediumint unsigned, + i int, + iu1 int unsigned, + iu2 int unsigned, + iu3 int unsigned, + iu4 int unsigned, + bi bigint, + biu1 bigint unsigned, + biu2 bigint unsigned, + biu3 bigint unsigned, + biu4 bigint unsigned, + floatT float, + doubleT double, + decimalT decimal, + floatTu float unsigned, + doubleTu double unsigned, + decimalTu decimal unsigned, + varcharT varchar(255), + charT char, + binaryT binary, + varbinaryT varbinary(255), + tinytextT tinytext, + textT text, + mediumtextT mediumtext, + longtextT longtext, + tinyblobT tinyblob, + blobT blob, + mediumblobT mediumblob, + longblobT longblob, + dateT date, + datetimeT datetime, + timestampT timestamp, + timeT time, + yearT year, + enumT enum('a', 'b', 'c'), + setT set('a', 'b', 'c'), + bitT bit(4), + jsonT json)` + job := helper.DDL2Job(sql) + tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) + _, _, colInfo := tableInfo.GetRowColInfos() + + testColumns := collectAllColumns(testColumnsTable) + + insert := &model.RowChangedEvent{ CommitTs: 417318403368288260, Table: &model.TableName{ - Schema: "cdc", - Table: "person", + Schema: "test", + Table: "t", }, + TableInfo: tableInfo, Columns: testColumns, PreColumns: nil, - ColInfos: testColInfos, + ColInfos: colInfo, } - testCaseUpdate = &model.RowChangedEvent{ + update := &model.RowChangedEvent{ CommitTs: 417318403368288260, Table: &model.TableName{ Schema: "cdc", Table: "person", }, + TableInfo: tableInfo, Columns: testColumns, PreColumns: testColumns, - ColInfos: testColInfos, + ColInfos: colInfo, } - testCaseDelete = &model.RowChangedEvent{ + deleteE := &model.RowChangedEvent{ CommitTs: 417318403368288260, Table: &model.TableName{ Schema: "cdc", Table: "person", }, + TableInfo: tableInfo, Columns: nil, PreColumns: testColumns, - ColInfos: testColInfos, - } - - testCaseDDL = &model.DDLEvent{ - CommitTs: 417318403368288260, - TableInfo: &model.TableInfo{ - TableName: model.TableName{ - Schema: "cdc", Table: "person", - }, - }, - Query: "create table person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))", - Type: mm.ActionCreateTable, - } -) - -func collectAllColumns(groups []*testColumnTuple) ([]*model.Column, []rowcodec.ColInfo) { - columns := make([]*model.Column, 0, len(groups)) - colInfos := make([]rowcodec.ColInfo, 0, len(groups)) - for _, item := range groups { - columns = append(columns, item.column) - colInfos = append(colInfos, item.colInfo) + ColInfos: colInfo, } - return columns, colInfos -} - -func collectExpectedDecodedValue(columns []*testColumnTuple) map[string]interface{} { - result := make(map[string]interface{}, len(columns)) - for _, item := range columns { - result[item.column.Name] = item.expectedDecodedValue - } - return result + return insert, update, deleteE } diff --git a/pkg/sink/codec/canal/type_test.go b/pkg/sink/codec/canal/type_test.go index 864ed954fa0..864a6faa84a 100644 --- a/pkg/sink/codec/canal/type_test.go +++ b/pkg/sink/codec/canal/type_test.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/sink/codec/internal" + "github.com/pingcap/tiflow/pkg/sink/codec/utils" "github.com/stretchr/testify/require" ) @@ -37,54 +38,63 @@ func TestGetMySQLType4IntTypes(t *testing.T) { _, _, colInfos := tableInfo.GetRowColInfos() - fieldType := colInfos[0].Ft - flag := tableInfo.ColumnsFlag[colInfos[0].ID] - mysqlType := getMySQLType(fieldType, flag, false) + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[0].ID) + require.True(t, ok) + + mysqlType := utils.GetMySQLType(columnInfo, false) require.Equal(t, "int", mysqlType) // mysql type with the default type length - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "int(11)", mysqlType) - javaType, err := getJavaSQLType(int64(2147483647), fieldType.GetType(), flag) + + flag := tableInfo.ColumnsFlag[colInfos[0].ID] + javaType, err := getJavaSQLType(int64(2147483647), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) - fieldType = colInfos[1].Ft - flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "tinyint", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "tinyint(4)", mysqlType) - javaType, err = getJavaSQLType(int64(127), fieldType.GetType(), flag) + + flag = tableInfo.ColumnsFlag[colInfos[1].ID] + javaType, err = getJavaSQLType(int64(127), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeTINYINT, javaType) - fieldType = colInfos[2].Ft - flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "smallint", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "smallint(6)", mysqlType) - javaType, err = getJavaSQLType(int64(32767), fieldType.GetType(), flag) + + flag = tableInfo.ColumnsFlag[colInfos[2].ID] + javaType, err = getJavaSQLType(int64(32767), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeSMALLINT, javaType) - fieldType = colInfos[3].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[3].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "mediumint", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "mediumint(9)", mysqlType) - javaType, err = getJavaSQLType(int64(8388607), fieldType.GetType(), flag) + javaType, err = getJavaSQLType(int64(8388607), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) - fieldType = colInfos[4].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[4].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "bigint", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "bigint(20)", mysqlType) - javaType, err = getJavaSQLType(int64(9223372036854775807), fieldType.GetType(), flag) + javaType, err = getJavaSQLType(int64(9223372036854775807), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeBIGINT, javaType) @@ -99,101 +109,106 @@ func TestGetMySQLType4IntTypes(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[0].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[0].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[0].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "int unsigned", mysqlType) // mysql type with the default type length - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "int(10) unsigned", mysqlType) - javaType, err = getJavaSQLType(uint64(2147483647), fieldType.GetType(), flag) + javaType, err = getJavaSQLType(uint64(2147483647), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) - javaType, err = getJavaSQLType(uint64(2147483648), fieldType.GetType(), flag) + javaType, err = getJavaSQLType(uint64(2147483648), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeBIGINT, javaType) - javaType, err = getJavaSQLType("0", fieldType.GetType(), flag) + javaType, err = getJavaSQLType("0", columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) - javaType, err = getJavaSQLType(nil, fieldType.GetType(), flag) + javaType, err = getJavaSQLType(nil, columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) - fieldType = colInfos[1].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "tinyint unsigned", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "tinyint(3) unsigned", mysqlType) - javaType, err = getJavaSQLType(uint64(127), fieldType.GetType(), flag) + javaType, err = getJavaSQLType(uint64(127), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeTINYINT, javaType) - javaType, err = getJavaSQLType(uint64(128), fieldType.GetType(), flag) + javaType, err = getJavaSQLType(uint64(128), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeSMALLINT, javaType) - javaType, err = getJavaSQLType("0", fieldType.GetType(), flag) + javaType, err = getJavaSQLType("0", columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeTINYINT, javaType) - javaType, err = getJavaSQLType(nil, fieldType.GetType(), flag) + javaType, err = getJavaSQLType(nil, columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeTINYINT, javaType) - fieldType = colInfos[2].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "smallint unsigned", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "smallint(5) unsigned", mysqlType) - javaType, err = getJavaSQLType(uint64(32767), fieldType.GetType(), flag) + javaType, err = getJavaSQLType(uint64(32767), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeSMALLINT, javaType) - javaType, err = getJavaSQLType(uint64(32768), fieldType.GetType(), flag) + javaType, err = getJavaSQLType(uint64(32768), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) - javaType, err = getJavaSQLType("0", fieldType.GetType(), flag) + javaType, err = getJavaSQLType("0", columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeSMALLINT, javaType) - javaType, err = getJavaSQLType(nil, fieldType.GetType(), flag) + javaType, err = getJavaSQLType(nil, columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeSMALLINT, javaType) - fieldType = colInfos[3].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[3].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "mediumint unsigned", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "mediumint(8) unsigned", mysqlType) - javaType, err = getJavaSQLType(uint64(8388607), fieldType.GetType(), flag) + javaType, err = getJavaSQLType(uint64(8388607), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) - javaType, err = getJavaSQLType(uint64(8388608), fieldType.GetType(), flag) + javaType, err = getJavaSQLType(uint64(8388608), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) - javaType, err = getJavaSQLType("0", fieldType.GetType(), flag) + javaType, err = getJavaSQLType("0", columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) - javaType, err = getJavaSQLType(nil, fieldType.GetType(), flag) + javaType, err = getJavaSQLType(nil, columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) - fieldType = colInfos[4].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[4].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "bigint unsigned", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "bigint(20) unsigned", mysqlType) - javaType, err = getJavaSQLType(uint64(9223372036854775807), fieldType.GetType(), flag) + javaType, err = getJavaSQLType(uint64(9223372036854775807), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeBIGINT, javaType) - javaType, err = getJavaSQLType(uint64(9223372036854775808), fieldType.GetType(), flag) + javaType, err = getJavaSQLType(uint64(9223372036854775808), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeDECIMAL, javaType) - javaType, err = getJavaSQLType("0", fieldType.GetType(), flag) + javaType, err = getJavaSQLType("0", columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeBIGINT, javaType) - javaType, err = getJavaSQLType(nil, fieldType.GetType(), flag) + javaType, err = getJavaSQLType(nil, columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeBIGINT, javaType) @@ -208,39 +223,39 @@ func TestGetMySQLType4IntTypes(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[0].Ft - flag = tableInfo.ColumnsFlag[colInfos[0].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[0].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "int", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "int(10)", mysqlType) - fieldType = colInfos[1].Ft - flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "tinyint", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "tinyint(3)", mysqlType) - fieldType = colInfos[2].Ft - flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "smallint", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "smallint(5)", mysqlType) - fieldType = colInfos[3].Ft - flag = tableInfo.ColumnsFlag[colInfos[3].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "mediumint", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "mediumint(8)", mysqlType) - fieldType = colInfos[4].Ft - flag = tableInfo.ColumnsFlag[colInfos[4].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "bigint", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "bigint(19)", mysqlType) sql = `create table test.t4 ( @@ -254,39 +269,39 @@ func TestGetMySQLType4IntTypes(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[0].Ft - flag = tableInfo.ColumnsFlag[colInfos[0].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[0].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "int unsigned", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "int(10) unsigned", mysqlType) - fieldType = colInfos[1].Ft - flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "tinyint unsigned", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "tinyint(3) unsigned", mysqlType) - fieldType = colInfos[2].Ft - flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "smallint unsigned", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "smallint(5) unsigned", mysqlType) - fieldType = colInfos[3].Ft - flag = tableInfo.ColumnsFlag[colInfos[3].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "mediumint unsigned", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "mediumint(8) unsigned", mysqlType) - fieldType = colInfos[4].Ft - flag = tableInfo.ColumnsFlag[colInfos[4].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "bigint unsigned", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "bigint(19) unsigned", mysqlType) sql = `create table test.t5 ( @@ -300,39 +315,39 @@ func TestGetMySQLType4IntTypes(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[0].Ft - flag = tableInfo.ColumnsFlag[colInfos[0].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[0].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "int unsigned zerofill", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "int(10) unsigned zerofill", mysqlType) - fieldType = colInfos[1].Ft - flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "tinyint unsigned zerofill", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "tinyint(3) unsigned zerofill", mysqlType) - fieldType = colInfos[2].Ft - flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "smallint unsigned zerofill", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "smallint(5) unsigned zerofill", mysqlType) - fieldType = colInfos[3].Ft - flag = tableInfo.ColumnsFlag[colInfos[3].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "mediumint unsigned zerofill", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "mediumint(8) unsigned zerofill", mysqlType) - fieldType = colInfos[4].Ft - flag = tableInfo.ColumnsFlag[colInfos[4].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "bigint unsigned zerofill", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "bigint(20) unsigned zerofill", mysqlType) sql = `create table test.t6( @@ -345,29 +360,30 @@ func TestGetMySQLType4IntTypes(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[1].Ft - flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "bit", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "bit(1)", mysqlType) - fieldType = colInfos[2].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "bit", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "bit(3)", mysqlType) - javaType, err = getJavaSQLType(uint64(65), fieldType.GetType(), flag) + javaType, err = getJavaSQLType(uint64(65), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeBIT, javaType) // bool is identical to tinyint in the TiDB. - fieldType = colInfos[3].Ft - flag = tableInfo.ColumnsFlag[colInfos[3].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "tinyint", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "tinyint(1)", mysqlType) } @@ -384,23 +400,25 @@ func TestGetMySQLType4FloatType(t *testing.T) { _, _, colInfos := tableInfo.GetRowColInfos() - fieldType := colInfos[1].Ft + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) flag := tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType := getMySQLType(fieldType, flag, false) + mysqlType := utils.GetMySQLType(columnInfo, false) require.Equal(t, "float", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "float", mysqlType) - javaType, err := getJavaSQLType(3.14, fieldType.GetType(), flag) + javaType, err := getJavaSQLType(3.14, columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeREAL, javaType) - fieldType = colInfos[2].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "double", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "double", mysqlType) - javaType, err = getJavaSQLType(2.71, fieldType.GetType(), flag) + javaType, err = getJavaSQLType(2.71, columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeDOUBLE, javaType) @@ -410,18 +428,18 @@ func TestGetMySQLType4FloatType(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[1].Ft - flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "float", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "float(10,3)", mysqlType) - fieldType = colInfos[2].Ft - flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "float", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "float", mysqlType) sql = `create table test.t3(a int primary key, b double(20, 3))` @@ -430,12 +448,11 @@ func TestGetMySQLType4FloatType(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[1].Ft - flag = tableInfo.ColumnsFlag[colInfos[1].ID] - - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "double", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "double(20,3)", mysqlType) sql = `create table test.t4( @@ -449,38 +466,40 @@ func TestGetMySQLType4FloatType(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[1].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "float unsigned", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "float unsigned", mysqlType) - javaType, err = getJavaSQLType(3.14, fieldType.GetType(), flag) + javaType, err = getJavaSQLType(3.14, columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeREAL, javaType) - fieldType = colInfos[2].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "double unsigned", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "double unsigned", mysqlType) - javaType, err = getJavaSQLType(2.71, fieldType.GetType(), flag) + javaType, err = getJavaSQLType(2.71, columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeDOUBLE, javaType) - fieldType = colInfos[3].Ft - flag = tableInfo.ColumnsFlag[colInfos[3].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "float unsigned zerofill", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "float unsigned zerofill", mysqlType) - fieldType = colInfos[4].Ft - flag = tableInfo.ColumnsFlag[colInfos[4].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "double unsigned zerofill", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "double unsigned zerofill", mysqlType) } @@ -494,18 +513,18 @@ func TestGetMySQLType4Decimal(t *testing.T) { _, _, colInfos := tableInfo.GetRowColInfos() - fieldType := colInfos[1].Ft - flag := tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType := getMySQLType(fieldType, flag, false) + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType := utils.GetMySQLType(columnInfo, false) require.Equal(t, "decimal", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "decimal(10,0)", mysqlType) - fieldType = colInfos[2].Ft - flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "decimal", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "decimal(10,0)", mysqlType) sql = `create table test.t2(a int primary key, b decimal(5), c decimal(5, 2))` @@ -514,20 +533,21 @@ func TestGetMySQLType4Decimal(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[1].Ft - flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "decimal", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "decimal(5,0)", mysqlType) - fieldType = colInfos[2].Ft - flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag := tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "decimal", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "decimal(5,2)", mysqlType) - javaType, err := getJavaSQLType("2333", fieldType.GetType(), flag) + javaType, err := getJavaSQLType("2333", columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeDECIMAL, javaType) @@ -537,20 +557,21 @@ func TestGetMySQLType4Decimal(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[1].Ft - flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "decimal unsigned", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "decimal(10,0) unsigned", mysqlType) - fieldType = colInfos[2].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "decimal unsigned zerofill", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "decimal(10,0) unsigned zerofill", mysqlType) - javaType, err = getJavaSQLType("2333", fieldType.GetType(), flag) + javaType, err = getJavaSQLType("2333", columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeDECIMAL, javaType) } @@ -565,20 +586,21 @@ func TestGetMySQLType4TimeTypes(t *testing.T) { _, _, colInfos := tableInfo.GetRowColInfos() - fieldType := colInfos[1].Ft - flag := tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType := getMySQLType(fieldType, flag, false) + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType := utils.GetMySQLType(columnInfo, false) require.Equal(t, "time", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "time", mysqlType) - fieldType = colInfos[2].Ft - flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag := tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "time", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "time(3)", mysqlType) - javaType, err := getJavaSQLType("02:20:20", fieldType.GetType(), flag) + javaType, err := getJavaSQLType("02:20:20", columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeTIME) @@ -588,20 +610,21 @@ func TestGetMySQLType4TimeTypes(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[1].Ft - flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "datetime", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "datetime", mysqlType) - fieldType = colInfos[2].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "datetime", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "datetime(3)", mysqlType) - javaType, err = getJavaSQLType("2020-02-20 02:20:20", fieldType.GetType(), flag) + javaType, err = getJavaSQLType("2020-02-20 02:20:20", columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeTIMESTAMP) @@ -611,20 +634,21 @@ func TestGetMySQLType4TimeTypes(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[1].Ft - flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "timestamp", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "timestamp", mysqlType) - fieldType = colInfos[2].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "timestamp", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "timestamp(3)", mysqlType) - javaType, err = getJavaSQLType("2020-02-20 02:20:20", fieldType.GetType(), flag) + javaType, err = getJavaSQLType("2020-02-20 02:20:20", columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeTIMESTAMP) @@ -633,13 +657,14 @@ func TestGetMySQLType4TimeTypes(t *testing.T) { tableInfo = model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[1].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "date", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "date", mysqlType) - javaType, err = getJavaSQLType("2020-02-20", fieldType.GetType(), flag) + javaType, err = getJavaSQLType("2020-02-20", columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeDATE) @@ -649,20 +674,21 @@ func TestGetMySQLType4TimeTypes(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[1].Ft - flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "year", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "year(4)", mysqlType) - fieldType = colInfos[2].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "year", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "year(4)", mysqlType) - javaType, err = getJavaSQLType("2020", fieldType.GetType(), flag) + javaType, err = getJavaSQLType("2020", columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeVARCHAR) } @@ -677,20 +703,21 @@ func TestGetMySQLType4Char(t *testing.T) { _, _, colInfos := tableInfo.GetRowColInfos() - fieldType := colInfos[1].Ft - flag := tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType := getMySQLType(fieldType, flag, false) + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) + mysqlType := utils.GetMySQLType(columnInfo, false) require.Equal(t, "char", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "char(1)", mysqlType) - fieldType = colInfos[2].Ft - flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + flag := tableInfo.ColumnsFlag[colInfos[2].ID] + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "char", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "char(123)", mysqlType) - javaType, err := getJavaSQLType([]uint8("测试char"), fieldType.GetType(), flag) + javaType, err := getJavaSQLType([]uint8("测试char"), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeCHAR) @@ -700,13 +727,14 @@ func TestGetMySQLType4Char(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[1].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "varchar", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "varchar(123)", mysqlType) - javaType, err = getJavaSQLType([]uint8("测试varchar"), fieldType.GetType(), flag) + javaType, err = getJavaSQLType([]uint8("测试varchar"), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeVARCHAR) } @@ -721,43 +749,47 @@ func TestGetMySQLType4TextTypes(t *testing.T) { _, _, colInfos := tableInfo.GetRowColInfos() - fieldType := colInfos[1].Ft + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) flag := tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType := getMySQLType(fieldType, flag, false) + mysqlType := utils.GetMySQLType(columnInfo, false) require.Equal(t, "text", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "text", mysqlType) - javaType, err := getJavaSQLType([]uint8("测试text"), fieldType.GetType(), flag) + javaType, err := getJavaSQLType([]uint8("测试text"), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeCLOB) - fieldType = colInfos[2].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "tinytext", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "tinytext", mysqlType) - javaType, err = getJavaSQLType([]uint8("测试tinytext"), fieldType.GetType(), flag) + javaType, err = getJavaSQLType([]uint8("测试tinytext"), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeCLOB) - fieldType = colInfos[3].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[3].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "mediumtext", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "mediumtext", mysqlType) - javaType, err = getJavaSQLType([]uint8("测试mediumtext"), fieldType.GetType(), flag) + javaType, err = getJavaSQLType([]uint8("测试mediumtext"), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeCLOB) - fieldType = colInfos[4].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[4].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "longtext", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "longtext", mysqlType) - javaType, err = getJavaSQLType([]uint8("测试longtext"), fieldType.GetType(), flag) + javaType, err = getJavaSQLType([]uint8("测试longtext"), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeCLOB) } @@ -772,21 +804,22 @@ func TestGetMySQLType4BinaryType(t *testing.T) { _, _, colInfos := tableInfo.GetRowColInfos() - fieldType := colInfos[1].Ft + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) flag := tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType := getMySQLType(fieldType, flag, false) + mysqlType := utils.GetMySQLType(columnInfo, false) require.Equal(t, "binary", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "binary(1)", mysqlType) - javaType, err := getJavaSQLType([]uint8("测试binary"), fieldType.GetType(), flag) + javaType, err := getJavaSQLType([]uint8("测试binary"), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeBLOB) - fieldType = colInfos[2].Ft - flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "binary", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "binary(10)", mysqlType) sql = `create table test.t2(a int primary key, b varbinary(23))` @@ -795,13 +828,14 @@ func TestGetMySQLType4BinaryType(t *testing.T) { _, _, colInfos = tableInfo.GetRowColInfos() - fieldType = colInfos[1].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "varbinary", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "varbinary(23)", mysqlType) - javaType, err = getJavaSQLType([]uint8("测试varbinary"), fieldType.GetType(), flag) + javaType, err = getJavaSQLType([]uint8("测试varbinary"), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeBLOB, javaType) } @@ -816,43 +850,47 @@ func TestGetMySQLType4BlobType(t *testing.T) { _, _, colInfos := tableInfo.GetRowColInfos() - fieldType := colInfos[1].Ft + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) flag := tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType := getMySQLType(fieldType, flag, false) + mysqlType := utils.GetMySQLType(columnInfo, false) require.Equal(t, "blob", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "blob", mysqlType) - javaType, err := getJavaSQLType([]uint8("测试blob"), fieldType.GetType(), flag) + javaType, err := getJavaSQLType([]uint8("测试blob"), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeBLOB) - fieldType = colInfos[2].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "tinyblob", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "tinyblob", mysqlType) - javaType, err = getJavaSQLType([]uint8("测试tinyblob"), fieldType.GetType(), flag) + javaType, err = getJavaSQLType([]uint8("测试tinyblob"), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeBLOB) - fieldType = colInfos[3].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[3].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[3].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "mediumblob", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "mediumblob", mysqlType) - javaType, err = getJavaSQLType([]uint8("测试mediumblob"), fieldType.GetType(), flag) + javaType, err = getJavaSQLType([]uint8("测试mediumblob"), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeBLOB) - fieldType = colInfos[4].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[4].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[4].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "longblob", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "longblob", mysqlType) - javaType, err = getJavaSQLType([]uint8("测试longblob"), fieldType.GetType(), flag) + javaType, err = getJavaSQLType([]uint8("测试longblob"), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, javaType, internal.JavaSQLTypeBLOB) } @@ -867,27 +905,29 @@ func TestGetMySQLType4EnumAndSet(t *testing.T) { _, _, colInfos := tableInfo.GetRowColInfos() - fieldType := colInfos[1].Ft + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) flag := tableInfo.ColumnsFlag[colInfos[1].ID] - mysqlType := getMySQLType(fieldType, flag, false) + mysqlType := utils.GetMySQLType(columnInfo, false) require.Equal(t, "enum", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "enum('a','b','c')", mysqlType) - javaType, err := getJavaSQLType(uint64(1), fieldType.GetType(), flag) + javaType, err := getJavaSQLType(uint64(1), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeINTEGER, javaType) - fieldType = colInfos[2].Ft + columnInfo, ok = tableInfo.GetColumnInfo(colInfos[2].ID) + require.True(t, ok) flag = tableInfo.ColumnsFlag[colInfos[2].ID] - mysqlType = getMySQLType(fieldType, flag, false) + mysqlType = utils.GetMySQLType(columnInfo, false) require.Equal(t, "set", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "set('a','b','c')", mysqlType) - javaType, err = getJavaSQLType(uint64(2), fieldType.GetType(), flag) + javaType, err = getJavaSQLType(uint64(2), columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeBIT, javaType) } @@ -902,19 +942,19 @@ func TestGetMySQLType4JSON(t *testing.T) { _, _, colInfos := tableInfo.GetRowColInfos() - fieldType := colInfos[1].Ft + columnInfo, ok := tableInfo.GetColumnInfo(colInfos[1].ID) + require.True(t, ok) flag := tableInfo.ColumnsFlag[colInfos[1].ID] - - mysqlType := getMySQLType(fieldType, flag, false) + mysqlType := utils.GetMySQLType(columnInfo, false) require.Equal(t, "json", mysqlType) - mysqlType = getMySQLType(fieldType, flag, true) + mysqlType = utils.GetMySQLType(columnInfo, true) require.Equal(t, "json", mysqlType) - javaType, err := getJavaSQLType("{\"key1\": \"value1\"}", fieldType.GetType(), flag) + javaType, err := getJavaSQLType("{\"key1\": \"value1\"}", columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeVARCHAR, javaType) - javaType, err = getJavaSQLType(nil, fieldType.GetType(), flag) + javaType, err = getJavaSQLType(nil, columnInfo.FieldType.GetType(), flag) require.NoError(t, err) require.Equal(t, internal.JavaSQLTypeVARCHAR, javaType) } diff --git a/pkg/sink/codec/utils/field_types.go b/pkg/sink/codec/utils/field_types.go index 04e43e35897..d327b4b5fcf 100644 --- a/pkg/sink/codec/utils/field_types.go +++ b/pkg/sink/codec/utils/field_types.go @@ -14,14 +14,20 @@ package utils import ( + "strings" + + "github.com/pingcap/tidb/parser/charset" + timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tiflow/cdc/model" ) // SetBinChsClnFlag set the binary charset flag. func SetBinChsClnFlag(ft *types.FieldType) *types.FieldType { - types.SetBinChsClnFlag(ft) + ft.SetCharset(charset.CharsetBin) + ft.SetCollate(charset.CollationBin) + ft.AddFlag(mysql.BinaryFlag) return ft } @@ -31,22 +37,46 @@ func SetUnsigned(ft *types.FieldType) *types.FieldType { return ft } -// SetFlag set the specific flag to the ft -func SetFlag(ft *types.FieldType, flag uint) *types.FieldType { - ft.SetFlag(flag) - return ft -} - // SetElems set the elems to the ft func SetElems(ft *types.FieldType, elems []string) *types.FieldType { ft.SetElems(elems) return ft } -// NewTextFieldType create a new text field type. -func NewTextFieldType(tp byte) *types.FieldType { - ft := types.NewFieldType(tp) - ft.SetCollate(mysql.DefaultCollationName) - ft.SetCharset(mysql.DefaultCharset) - return ft +// when encoding the canal format, for unsigned mysql type, add `unsigned` keyword. +// it should have the form `t unsigned`, such as `int unsigned` +func withUnsigned4MySQLType(mysqlType string, unsigned bool) string { + if unsigned && mysqlType != "bit" && mysqlType != "year" { + return mysqlType + " unsigned" + } + return mysqlType +} + +func withZerofill4MySQLType(mysqlType string, zerofill bool) string { + if zerofill && !strings.HasPrefix(mysqlType, "year") { + return mysqlType + " zerofill" + } + return mysqlType +} + +// GetMySQLType get the mysql type from column info +func GetMySQLType(columnInfo *timodel.ColumnInfo, fullType bool) string { + if !fullType { + result := types.TypeToStr(columnInfo.GetType(), columnInfo.GetCharset()) + result = withUnsigned4MySQLType(result, mysql.HasUnsignedFlag(columnInfo.GetFlag())) + result = withZerofill4MySQLType(result, mysql.HasZerofillFlag(columnInfo.GetFlag())) + return result + } + return columnInfo.GetTypeDesc() +} + +// ExtractBasicMySQLType return the mysql type +func ExtractBasicMySQLType(mysqlType string) byte { + for i := 0; i < len(mysqlType); i++ { + if mysqlType[i] == '(' || mysqlType[i] == ' ' { + return types.StrToType(mysqlType[:i]) + } + } + + return types.StrToType(mysqlType) }