Skip to content

Commit

Permalink
adjust canal json unit test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 7, 2023
1 parent 94cb64d commit 55f38e3
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 172 deletions.
172 changes: 0 additions & 172 deletions pkg/sink/codec/canal/canal_json_decoder_test.go

This file was deleted.

151 changes: 151 additions & 0 deletions pkg/sink/codec/canal/canal_json_row_event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,3 +757,154 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) {
require.Equal(t, expectedValue[actual.Name], obtained.Value)
}
}

func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) {
insertEvent, _, _ := utils.NewLargeEvent4Test(t)
ctx := context.Background()
expectedDecodedValue := utils.CollectExpectedDecodedValue(utils.TestColumnsTable)
for _, encodeEnable := range []bool{false, true} {
codecConfig := common.NewConfig(config.ProtocolCanalJSON)
codecConfig.EnableTiDBExtension = encodeEnable
codecConfig.Terminator = config.CRLF

builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()

err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil)
require.NoError(t, err)

messages := encoder.Build()
require.Equal(t, 1, len(messages))
msg := messages[0]

for _, decodeEnable := range []bool{false, true} {
codecConfig := common.NewConfig(config.ProtocolCanalJSON)
codecConfig.EnableTiDBExtension = decodeEnable
decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)
err = decoder.AddKeyValue(msg.Key, msg.Value)
require.NoError(t, err)

ty, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, ty)

consumed, err := decoder.NextRowChangedEvent()
require.NoError(t, err)

require.Equal(t, insertEvent.Table, consumed.Table)
if encodeEnable && decodeEnable {
require.Equal(t, insertEvent.CommitTs, consumed.CommitTs)
} else {
require.Equal(t, uint64(0), consumed.CommitTs)
}

for _, col := range consumed.Columns {
expected, ok := expectedDecodedValue[col.Name]
require.True(t, ok)
require.Equal(t, expected, col.Value)

for _, item := range insertEvent.Columns {
if item.Name == col.Name {
require.Equal(t, item.Type, col.Type)
}
}
}

_, hasNext, _ = decoder.HasNext()
require.False(t, hasNext)

consumed, err = decoder.NextRowChangedEvent()
require.Error(t, err)
require.Nil(t, consumed)
}
}
}

func TestNewCanalJSONBatchDecoder4DDLMessage(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))`
ddlEvent := helper.DDL2Event(sql)

ctx := context.Background()
for _, encodeEnable := range []bool{false, true} {
codecConfig := common.NewConfig(config.ProtocolCanalJSON)
codecConfig.EnableTiDBExtension = encodeEnable

builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()

result, err := encoder.EncodeDDLEvent(ddlEvent)
require.NoError(t, err)
require.NotNil(t, result)

for _, decodeEnable := range []bool{false, true} {
codecConfig := common.NewConfig(config.ProtocolCanalJSON)
codecConfig.EnableTiDBExtension = decodeEnable
decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)
err = decoder.AddKeyValue(nil, result.Value)
require.NoError(t, err)

ty, hasNext, err := decoder.HasNext()
require.Nil(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, ty)

consumed, err := decoder.NextDDLEvent()
require.Nil(t, err)

if encodeEnable && decodeEnable {
require.Equal(t, ddlEvent.CommitTs, consumed.CommitTs)
} else {
require.Equal(t, uint64(0), consumed.CommitTs)
}

require.Equal(t, ddlEvent.TableInfo, consumed.TableInfo)
require.Equal(t, ddlEvent.Query, consumed.Query)

ty, hasNext, err = decoder.HasNext()
require.Nil(t, err)
require.False(t, hasNext)
require.Equal(t, model.MessageTypeUnknown, ty)

consumed, err = decoder.NextDDLEvent()
require.NotNil(t, err)
require.Nil(t, consumed)
}
}
}

func TestCanalJSONBatchDecoderWithTerminator(t *testing.T) {
encodedValue := `{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1668067205238,"ts":1668067206650,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}],"old":null}
{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1668067229137,"ts":1668067230720,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}]}
{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"DELETE","es":1668067230388,"ts":1668067231725,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":null}`
ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolCanalJSON)
codecConfig.Terminator = "\n"
decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

err = decoder.AddKeyValue(nil, []byte(encodedValue))
require.NoError(t, err)

cnt := 0
for {
tp, hasNext, err := decoder.HasNext()
if !hasNext {
break
}
require.NoError(t, err)
require.Equal(t, model.MessageTypeRow, tp)
cnt++
event, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.NotNil(t, event)
}
require.Equal(t, 3, cnt)
}

0 comments on commit 55f38e3

Please sign in to comment.