diff --git a/pkg/sink/codec/canal/canal_json_decoder_test.go b/pkg/sink/codec/canal/canal_json_decoder_test.go index df88117dc94..7cd86873b1b 100644 --- a/pkg/sink/codec/canal/canal_json_decoder_test.go +++ b/pkg/sink/codec/canal/canal_json_decoder_test.go @@ -25,9 +25,9 @@ import ( ) func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { - insertEvent, _, _ := utils.newLargeEvent4Test(t) + insertEvent, _, _ := utils.NewLargeEvent4Test(t) ctx := context.Background() - expectedDecodedValue := utils.collectExpectedDecodedValue(utils.testColumnsTable) + expectedDecodedValue := utils.CollectExpectedDecodedValue(utils.TestColumnsTable) for _, encodeEnable := range []bool{false, true} { codecConfig := common.NewConfig(config.ProtocolCanalJSON) codecConfig.EnableTiDBExtension = encodeEnable @@ -101,7 +101,7 @@ func TestNewCanalJSONBatchDecoder4DDLMessage(t *testing.T) { require.NoError(t, err) encoder := builder.Build() - result, err := encoder.EncodeDDLEvent(utils.testCaseDDL) + result, err := encoder.EncodeDDLEvent(utils.TestCaseDDL) require.NoError(t, err) require.NotNil(t, result) diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index 23332ecbaa8..c065aef3d57 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -228,7 +228,13 @@ func TestCanalJSONCompressionE2E(t *testing.T) { require.Equal(t, decodedEvent.Table.Table, insertEvent.Table.Table) // encode DDL event - message, err = encoder.EncodeDDLEvent(utils.TestCaseDDL) + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))` + ddlEvent := helper.DDL2Event(sql) + + message, err = encoder.EncodeDDLEvent(ddlEvent) require.NoError(t, err) err = decoder.AddKeyValue(message.Key, message.Value) @@ -242,10 +248,10 @@ func TestCanalJSONCompressionE2E(t *testing.T) { decodedDDL, err := decoder.NextDDLEvent() require.NoError(t, err) - require.Equal(t, decodedDDL.Query, utils.TestCaseDDL.Query) - require.Equal(t, decodedDDL.CommitTs, utils.TestCaseDDL.CommitTs) - require.Equal(t, decodedDDL.TableInfo.TableName.Schema, utils.TestCaseDDL.TableInfo.TableName.Schema) - require.Equal(t, decodedDDL.TableInfo.TableName.Table, utils.TestCaseDDL.TableInfo.TableName.Table) + require.Equal(t, decodedDDL.Query, ddlEvent.Query) + require.Equal(t, decodedDDL.CommitTs, ddlEvent.CommitTs) + require.Equal(t, decodedDDL.TableInfo.TableName.Schema, ddlEvent.TableInfo.TableName.Schema) + require.Equal(t, decodedDDL.TableInfo.TableName.Table, ddlEvent.TableInfo.TableName.Table) // encode checkpoint event waterMark := uint64(2333) @@ -364,7 +370,8 @@ func TestNewCanalJSONMessageHandleKeyOnly4LargeMessage(t *testing.T) { } func TestNewCanalJSONMessageFromDDL(t *testing.T) { - t.Parallel() + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() codecConfig := common.NewConfig(config.ProtocolCanalJSON) ctx := context.Background() @@ -373,16 +380,19 @@ func TestNewCanalJSONMessageFromDDL(t *testing.T) { require.NoError(t, err) encoder := builder.Build().(*JSONRowEventEncoder) - message := encoder.newJSONMessageForDDL(utils.TestCaseDDL) + sql := `create table test.person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))` + ddlEvent := helper.DDL2Event(sql) + + message := encoder.newJSONMessageForDDL(ddlEvent) require.NotNil(t, message) msg, ok := message.(*JSONMessage) require.True(t, ok) - require.Equal(t, convertToCanalTs(utils.TestCaseDDL.CommitTs), msg.ExecutionTime) + require.Equal(t, convertToCanalTs(ddlEvent.CommitTs), msg.ExecutionTime) require.True(t, msg.IsDDL) require.Equal(t, "cdc", msg.Schema) require.Equal(t, "person", msg.Table) - require.Equal(t, utils.TestCaseDDL.Query, msg.Query) + require.Equal(t, ddlEvent.Query, msg.Query) require.Equal(t, "CREATE", msg.EventType) codecConfig.EnableTiDBExtension = true @@ -390,14 +400,14 @@ func TestNewCanalJSONMessageFromDDL(t *testing.T) { require.NoError(t, err) encoder = builder.Build().(*JSONRowEventEncoder) - message = encoder.newJSONMessageForDDL(utils.TestCaseDDL) + message = encoder.newJSONMessageForDDL(ddlEvent) require.NotNil(t, message) withExtension, ok := message.(*canalJSONMessageWithTiDBExtension) require.True(t, ok) require.NotNil(t, withExtension.Extensions) - require.Equal(t, utils.TestCaseDDL.CommitTs, withExtension.Extensions.CommitTs) + require.Equal(t, ddlEvent.CommitTs, withExtension.Extensions.CommitTs) } func TestBatching(t *testing.T) { @@ -539,46 +549,41 @@ func TestCheckpointEventValueMarshal(t *testing.T) { require.Equal(t, expectedJSON, string(rawBytes)) } -func TestDDLEventWithExtensionValueMarshal(t *testing.T) { - t.Parallel() +func TestDDLEventWithExtension(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + ctx := context.Background() codecConfig := common.NewConfig(config.ProtocolCanalJSON) - encoder := &JSONRowEventEncoder{ - builder: newCanalEntryBuilder(codecConfig), - config: &common.Config{EnableTiDBExtension: true}, - } + codecConfig.EnableTiDBExtension = true + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() require.NotNil(t, encoder) - message := encoder.newJSONMessageForDDL(utils.TestCaseDDL) - require.NotNil(t, message) + sql := `create table test.person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))` + ddlEvent := helper.DDL2Event(sql) - msg, ok := message.(*canalJSONMessageWithTiDBExtension) - require.True(t, ok) - // Hack the build time. - // Otherwise, the timing will be inconsistent. - msg.BuildTime = 1469579899 - rawBytes, err := json.MarshalIndent(msg, "", " ") + message, err := encoder.EncodeDDLEvent(ddlEvent) require.NoError(t, err) - // No watermark ts will be output. - expectedJSON := `{ - "id": 0, - "database": "cdc", - "table": "person", - "pkNames": null, - "isDdl": true, - "type": "CREATE", - "es": 1591943372224, - "ts": 1469579899, - "sql": "create table person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))", - "sqlType": null, - "mysqlType": null, - "data": null, - "old": null, - "_tidb": { - "commitTs": 417318403368288260 - } -}` - require.Equal(t, expectedJSON, string(rawBytes)) + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + + messageType, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, messageType, model.MessageTypeDDL) + + decodedDDL, err := decoder.NextDDLEvent() + require.NoError(t, err) + require.Equal(t, ddlEvent.Query, decodedDDL.Query) + require.Equal(t, ddlEvent.CommitTs, decodedDDL.CommitTs) + require.Equal(t, ddlEvent.TableInfo.TableName.Schema, decodedDDL.TableInfo.TableName.Schema) + require.Equal(t, ddlEvent.TableInfo.TableName.Table, decodedDDL.TableInfo.TableName.Table) } func TestCanalJSONAppendRowChangedEventWithCallback(t *testing.T) { diff --git a/pkg/sink/codec/utils/test_utils.go b/pkg/sink/codec/utils/test_utils.go index a322f0111b2..fa14c484b7b 100644 --- a/pkg/sink/codec/utils/test_utils.go +++ b/pkg/sink/codec/utils/test_utils.go @@ -16,7 +16,6 @@ package utils import ( "testing" - mm "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" @@ -353,17 +352,6 @@ var ( "{\"key1\": \"value1\"}", "{\"key1\": \"value1\"}", }, } - - TestCaseDDL = &model.DDLEvent{ - CommitTs: 417318403368288260, - TableInfo: &model.TableInfo{ - TableName: model.TableName{ - Schema: "cdc", Table: "person", - }, - }, - Query: "create table person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))", - Type: mm.ActionCreateTable, - } ) func collectAllColumns(groups []*testColumnTuple) []*model.Column {