diff --git a/pkg/sink/codec/canal/canal_json_decoder_test.go b/pkg/sink/codec/canal/canal_json_decoder_test.go deleted file mode 100644 index 7cd86873b1b..00000000000 --- a/pkg/sink/codec/canal/canal_json_decoder_test.go +++ /dev/null @@ -1,172 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package canal - -import ( - "context" - "testing" - - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/sink/codec/common" - "github.com/pingcap/tiflow/pkg/sink/codec/utils" - "github.com/stretchr/testify/require" -) - -func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { - insertEvent, _, _ := utils.NewLargeEvent4Test(t) - ctx := context.Background() - expectedDecodedValue := utils.CollectExpectedDecodedValue(utils.TestColumnsTable) - for _, encodeEnable := range []bool{false, true} { - codecConfig := common.NewConfig(config.ProtocolCanalJSON) - codecConfig.EnableTiDBExtension = encodeEnable - codecConfig.Terminator = config.CRLF - - builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) - require.NoError(t, err) - encoder := builder.Build() - - err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil) - require.NoError(t, err) - - messages := encoder.Build() - require.Equal(t, 1, len(messages)) - msg := messages[0] - - for _, decodeEnable := range []bool{false, true} { - codecConfig := common.NewConfig(config.ProtocolCanalJSON) - codecConfig.EnableTiDBExtension = decodeEnable - decoder, err := NewBatchDecoder(ctx, codecConfig, nil) - require.NoError(t, err) - err = decoder.AddKeyValue(msg.Key, msg.Value) - require.NoError(t, err) - - ty, hasNext, err := decoder.HasNext() - require.NoError(t, err) - require.True(t, hasNext) - require.Equal(t, model.MessageTypeRow, ty) - - consumed, err := decoder.NextRowChangedEvent() - require.NoError(t, err) - - require.Equal(t, insertEvent.Table, consumed.Table) - if encodeEnable && decodeEnable { - require.Equal(t, insertEvent.CommitTs, consumed.CommitTs) - } else { - require.Equal(t, uint64(0), consumed.CommitTs) - } - - for _, col := range consumed.Columns { - expected, ok := expectedDecodedValue[col.Name] - require.True(t, ok) - require.Equal(t, expected, col.Value) - - for _, item := range insertEvent.Columns { - if item.Name == col.Name { - require.Equal(t, item.Type, col.Type) - } - } - } - - _, hasNext, _ = decoder.HasNext() - require.False(t, hasNext) - - consumed, err = decoder.NextRowChangedEvent() - require.Error(t, err) - require.Nil(t, consumed) - } - } -} - -func TestNewCanalJSONBatchDecoder4DDLMessage(t *testing.T) { - t.Parallel() - - ctx := context.Background() - for _, encodeEnable := range []bool{false, true} { - codecConfig := common.NewConfig(config.ProtocolCanalJSON) - codecConfig.EnableTiDBExtension = encodeEnable - - builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) - require.NoError(t, err) - encoder := builder.Build() - - result, err := encoder.EncodeDDLEvent(utils.TestCaseDDL) - require.NoError(t, err) - require.NotNil(t, result) - - for _, decodeEnable := range []bool{false, true} { - codecConfig := common.NewConfig(config.ProtocolCanalJSON) - codecConfig.EnableTiDBExtension = decodeEnable - decoder, err := NewBatchDecoder(ctx, codecConfig, nil) - require.NoError(t, err) - err = decoder.AddKeyValue(nil, result.Value) - require.NoError(t, err) - - ty, hasNext, err := decoder.HasNext() - require.Nil(t, err) - require.True(t, hasNext) - require.Equal(t, model.MessageTypeDDL, ty) - - consumed, err := decoder.NextDDLEvent() - require.Nil(t, err) - - if encodeEnable && decodeEnable { - require.Equal(t, utils.testCaseDDL.CommitTs, consumed.CommitTs) - } else { - require.Equal(t, uint64(0), consumed.CommitTs) - } - - require.Equal(t, utils.testCaseDDL.TableInfo, consumed.TableInfo) - require.Equal(t, utils.testCaseDDL.Query, consumed.Query) - - ty, hasNext, err = decoder.HasNext() - require.Nil(t, err) - require.False(t, hasNext) - require.Equal(t, model.MessageTypeUnknown, ty) - - consumed, err = decoder.NextDDLEvent() - require.NotNil(t, err) - require.Nil(t, consumed) - } - } -} - -func TestCanalJSONBatchDecoderWithTerminator(t *testing.T) { - encodedValue := `{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1668067205238,"ts":1668067206650,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}],"old":null} -{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1668067229137,"ts":1668067230720,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}]} -{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"DELETE","es":1668067230388,"ts":1668067231725,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":null}` - ctx := context.Background() - codecConfig := common.NewConfig(config.ProtocolCanalJSON) - codecConfig.Terminator = "\n" - decoder, err := NewBatchDecoder(ctx, codecConfig, nil) - require.NoError(t, err) - - err = decoder.AddKeyValue(nil, []byte(encodedValue)) - require.NoError(t, err) - - cnt := 0 - for { - tp, hasNext, err := decoder.HasNext() - if !hasNext { - break - } - require.NoError(t, err) - require.Equal(t, model.MessageTypeRow, tp) - cnt++ - event, err := decoder.NextRowChangedEvent() - require.NoError(t, err) - require.NotNil(t, event) - } - require.Equal(t, 3, cnt) -} diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index 162395172bb..96f9996970a 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -757,3 +757,154 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) { require.Equal(t, expectedValue[actual.Name], obtained.Value) } } + +func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { + insertEvent, _, _ := utils.NewLargeEvent4Test(t) + ctx := context.Background() + expectedDecodedValue := utils.CollectExpectedDecodedValue(utils.TestColumnsTable) + for _, encodeEnable := range []bool{false, true} { + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = encodeEnable + codecConfig.Terminator = config.CRLF + + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil) + require.NoError(t, err) + + messages := encoder.Build() + require.Equal(t, 1, len(messages)) + msg := messages[0] + + for _, decodeEnable := range []bool{false, true} { + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = decodeEnable + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + err = decoder.AddKeyValue(msg.Key, msg.Value) + require.NoError(t, err) + + ty, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, ty) + + consumed, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + + require.Equal(t, insertEvent.Table, consumed.Table) + if encodeEnable && decodeEnable { + require.Equal(t, insertEvent.CommitTs, consumed.CommitTs) + } else { + require.Equal(t, uint64(0), consumed.CommitTs) + } + + for _, col := range consumed.Columns { + expected, ok := expectedDecodedValue[col.Name] + require.True(t, ok) + require.Equal(t, expected, col.Value) + + for _, item := range insertEvent.Columns { + if item.Name == col.Name { + require.Equal(t, item.Type, col.Type) + } + } + } + + _, hasNext, _ = decoder.HasNext() + require.False(t, hasNext) + + consumed, err = decoder.NextRowChangedEvent() + require.Error(t, err) + require.Nil(t, consumed) + } + } +} + +func TestNewCanalJSONBatchDecoder4DDLMessage(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))` + ddlEvent := helper.DDL2Event(sql) + + ctx := context.Background() + for _, encodeEnable := range []bool{false, true} { + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = encodeEnable + + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + result, err := encoder.EncodeDDLEvent(ddlEvent) + require.NoError(t, err) + require.NotNil(t, result) + + for _, decodeEnable := range []bool{false, true} { + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = decodeEnable + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + err = decoder.AddKeyValue(nil, result.Value) + require.NoError(t, err) + + ty, hasNext, err := decoder.HasNext() + require.Nil(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeDDL, ty) + + consumed, err := decoder.NextDDLEvent() + require.Nil(t, err) + + if encodeEnable && decodeEnable { + require.Equal(t, ddlEvent.CommitTs, consumed.CommitTs) + } else { + require.Equal(t, uint64(0), consumed.CommitTs) + } + + require.Equal(t, ddlEvent.TableInfo, consumed.TableInfo) + require.Equal(t, ddlEvent.Query, consumed.Query) + + ty, hasNext, err = decoder.HasNext() + require.Nil(t, err) + require.False(t, hasNext) + require.Equal(t, model.MessageTypeUnknown, ty) + + consumed, err = decoder.NextDDLEvent() + require.NotNil(t, err) + require.Nil(t, consumed) + } + } +} + +func TestCanalJSONBatchDecoderWithTerminator(t *testing.T) { + encodedValue := `{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1668067205238,"ts":1668067206650,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}],"old":null} +{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1668067229137,"ts":1668067230720,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}]} +{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"DELETE","es":1668067230388,"ts":1668067231725,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":null}` + ctx := context.Background() + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.Terminator = "\n" + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + err = decoder.AddKeyValue(nil, []byte(encodedValue)) + require.NoError(t, err) + + cnt := 0 + for { + tp, hasNext, err := decoder.HasNext() + if !hasNext { + break + } + require.NoError(t, err) + require.Equal(t, model.MessageTypeRow, tp) + cnt++ + event, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.NotNil(t, event) + } + require.Equal(t, 3, cnt) +}