Skip to content

Commit

Permalink
fix test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 23, 2023
1 parent 5c12d98 commit 0ed9abe
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 164 deletions.
9 changes: 5 additions & 4 deletions cdc/sink/codec/canal/canal_json_decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) {
})
require.NotNil(t, encoder)

err := encoder.AppendRowChangedEvent(context.Background(), "", testCaseInsert, nil)
insertEvent, _, _ := newLargeEvent4Test(t)
err := encoder.AppendRowChangedEvent(context.Background(), "", insertEvent, nil)
require.Nil(t, err)

messages := encoder.Build()
Expand All @@ -59,9 +60,9 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) {
consumed, err := decoder.NextRowChangedEvent()
require.Nil(t, err)

require.Equal(t, testCaseInsert.Table, consumed.Table)
require.Equal(t, insertEvent.Table, consumed.Table)
if encodeEnable && decodeEnable {
require.Equal(t, testCaseInsert.CommitTs, consumed.CommitTs)
require.Equal(t, insertEvent.CommitTs, consumed.CommitTs)
} else {
require.Equal(t, uint64(0), consumed.CommitTs)
}
Expand All @@ -71,7 +72,7 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) {
require.True(t, ok)
require.Equal(t, expected, col.Value)

for _, item := range testCaseInsert.Columns {
for _, item := range insertEvent.Columns {
if item.Name == col.Name {
require.Equal(t, item.Type, col.Type)
}
Expand Down
48 changes: 26 additions & 22 deletions cdc/sink/codec/canal/canal_json_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func TestNewCanalJSONMessage4DML(t *testing.T) {
encoder, ok := builder.Build().(*JSONBatchEncoder)
require.True(t, ok)

data, err := newJSONMessageForDML(testCaseInsert, encoder.config, encoder.builder, false)
insertEvent, updateEvent, deleteEvent := newLargeEvent4Test(t)
data, err := newJSONMessageForDML(insertEvent, encoder.config, encoder.builder, false)
require.Nil(t, err)
var msg canalJSONMessageInterface = &JSONMessage{}
err = json.Unmarshal(data, msg)
Expand All @@ -62,12 +63,12 @@ func TestNewCanalJSONMessage4DML(t *testing.T) {
require.NotNil(t, jsonMsg.Data)
require.Nil(t, jsonMsg.Old)
require.Equal(t, "INSERT", jsonMsg.EventType)
require.Equal(t, convertToCanalTs(testCaseInsert.CommitTs), jsonMsg.ExecutionTime)
require.Equal(t, convertToCanalTs(insertEvent.CommitTs), jsonMsg.ExecutionTime)
require.Equal(t, "cdc", jsonMsg.Schema)
require.Equal(t, "person", jsonMsg.Table)
require.False(t, jsonMsg.IsDDL)

for _, col := range testCaseInsert.Columns {
for _, col := range insertEvent.Columns {
require.Contains(t, jsonMsg.Data[0], col.Name)
require.Contains(t, jsonMsg.SQLType, col.Name)
require.Contains(t, jsonMsg.MySQLType, col.Name)
Expand Down Expand Up @@ -101,7 +102,7 @@ func TestNewCanalJSONMessage4DML(t *testing.T) {
require.Equal(t, item.expectedEncodedValue, obtainedValue)
}

data, err = newJSONMessageForDML(testCaseUpdate, encoder.config, encoder.builder, false)
data, err = newJSONMessageForDML(updateEvent, encoder.config, encoder.builder, false)
require.NoError(t, err)
jsonMsg = &JSONMessage{}
err = json.Unmarshal(data, jsonMsg)
Expand All @@ -110,16 +111,16 @@ func TestNewCanalJSONMessage4DML(t *testing.T) {
require.NotNil(t, jsonMsg.Old)
require.Equal(t, "UPDATE", jsonMsg.EventType)

for _, col := range testCaseUpdate.Columns {
for _, col := range updateEvent.Columns {
require.Contains(t, jsonMsg.Data[0], col.Name)
require.Contains(t, jsonMsg.SQLType, col.Name)
require.Contains(t, jsonMsg.MySQLType, col.Name)
}
for _, col := range testCaseUpdate.PreColumns {
for _, col := range updateEvent.PreColumns {
require.Contains(t, jsonMsg.Old[0], col.Name)
}

data, err = newJSONMessageForDML(testCaseDelete, encoder.config, encoder.builder, false)
data, err = newJSONMessageForDML(deleteEvent, encoder.config, encoder.builder, false)
require.Nil(t, err)
jsonMsg = &JSONMessage{}
err = json.Unmarshal(data, jsonMsg)
Expand All @@ -128,35 +129,35 @@ func TestNewCanalJSONMessage4DML(t *testing.T) {
require.Nil(t, jsonMsg.Old)
require.Equal(t, "DELETE", jsonMsg.EventType)

for _, col := range testCaseDelete.PreColumns {
for _, col := range deleteEvent.PreColumns {
require.Contains(t, jsonMsg.Data[0], col.Name)
}

data, err = newJSONMessageForDML(testCaseDelete, encoder.config, encoder.builder, false)
data, err = newJSONMessageForDML(deleteEvent, encoder.config, encoder.builder, false)
require.NoError(t, err)
jsonMsg = &JSONMessage{}
err = json.Unmarshal(data, jsonMsg)
require.NoError(t, err)
require.NotNil(t, jsonMsg.Data)
require.Nil(t, jsonMsg.Old)

for _, col := range testCaseDelete.PreColumns {
for _, col := range deleteEvent.PreColumns {
require.Contains(t, jsonMsg.Data[0], col.Name)
require.Contains(t, jsonMsg.SQLType, col.Name)
require.Contains(t, jsonMsg.MySQLType, col.Name)
}

encoder, ok = newJSONBatchEncoder(&common.Config{DeleteOnlyHandleKeyColumns: true}).(*JSONBatchEncoder)
require.True(t, ok)
data, err = newJSONMessageForDML(testCaseDelete, encoder.config, encoder.builder, false)
data, err = newJSONMessageForDML(deleteEvent, encoder.config, encoder.builder, false)
require.NoError(t, err)
jsonMsg = &JSONMessage{}
err = json.Unmarshal(data, jsonMsg)
require.NoError(t, err)
require.NotNil(t, jsonMsg.Data)
require.Nil(t, jsonMsg.Old)

for _, col := range testCaseDelete.PreColumns {
for _, col := range deleteEvent.PreColumns {
if col.Flag.IsHandleKey() {
require.Contains(t, jsonMsg.Data[0], col.Name)
require.Contains(t, jsonMsg.SQLType, col.Name)
Expand All @@ -176,15 +177,15 @@ func TestNewCanalJSONMessage4DML(t *testing.T) {

encoder, ok = NewJSONBatchEncoderBuilder(codecConfig).Build().(*JSONBatchEncoder)
require.True(t, ok)
data, err = newJSONMessageForDML(testCaseUpdate, encoder.config, encoder.builder, false)
data, err = newJSONMessageForDML(updateEvent, encoder.config, encoder.builder, false)
require.Nil(t, err)

withExtension := &canalJSONMessageWithTiDBExtension{}
err = json.Unmarshal(data, withExtension)
require.Nil(t, err)

require.NotNil(t, withExtension.Extensions)
require.Equal(t, testCaseUpdate.CommitTs, withExtension.Extensions.CommitTs)
require.Equal(t, updateEvent.CommitTs, withExtension.Extensions.CommitTs)
}

func TestNewCanalJSONMessageHandleKeyOnly4LargeMessage(t *testing.T) {
Expand All @@ -196,7 +197,8 @@ func TestNewCanalJSONMessageHandleKeyOnly4LargeMessage(t *testing.T) {
codecConfig.MaxMessageBytes = 500
encoder := newJSONBatchEncoder(codecConfig)

err := encoder.AppendRowChangedEvent(context.Background(), "", testCaseInsert, func() {})
insertEvent, _, _ := newLargeEvent4Test(t)
err := encoder.AppendRowChangedEvent(context.Background(), "", insertEvent, func() {})
require.NoError(t, err)

message := encoder.Build()[0]
Expand All @@ -206,7 +208,7 @@ func TestNewCanalJSONMessageHandleKeyOnly4LargeMessage(t *testing.T) {
require.NoError(t, err)
require.True(t, decoded.Extensions.OnlyHandleKey)

for _, col := range testCaseInsert.Columns {
for _, col := range insertEvent.Columns {
if col.Flag.IsHandleKey() {
require.Contains(t, decoded.Data[0], col.Name)
require.Contains(t, decoded.SQLType, col.Name)
Expand Down Expand Up @@ -261,7 +263,8 @@ func TestBatching(t *testing.T) {
})
require.NotNil(t, encoder)

updateCase := *testCaseUpdate
_, updateEvent, _ := newLargeEvent4Test(t)
updateCase := *updateEvent
for i := 1; i <= 1000; i++ {
ts := uint64(i)
updateCase.CommitTs = ts
Expand Down Expand Up @@ -553,7 +556,8 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) {
builder := NewJSONBatchEncoderBuilder(codecConfig)
encoder := builder.Build()

err := encoder.AppendRowChangedEvent(ctx, "", testCaseInsert, func() {})
insertEvent, _, _ := newLargeEvent4Test(t)
err := encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {})
require.NoError(t, err)

message := encoder.Build()[0]
Expand All @@ -571,17 +575,17 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) {

decodedEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.Equal(t, decodedEvent.CommitTs, testCaseInsert.CommitTs)
require.Equal(t, decodedEvent.Table.Schema, testCaseInsert.Table.Schema)
require.Equal(t, decodedEvent.Table.Table, testCaseInsert.Table.Table)
require.Equal(t, decodedEvent.CommitTs, insertEvent.CommitTs)
require.Equal(t, decodedEvent.Table.Schema, insertEvent.Table.Schema)
require.Equal(t, decodedEvent.Table.Table, insertEvent.Table.Table)

obtainedColumns := make(map[string]*model.Column, len(decodedEvent.Columns))
for _, column := range decodedEvent.Columns {
obtainedColumns[column.Name] = column
}

expectedValue := collectExpectedDecodedValue(testColumnsTable)
for _, actual := range testCaseInsert.Columns {
for _, actual := range insertEvent.Columns {
obtained, ok := obtainedColumns[actual.Name]
require.True(t, ok)
require.Equal(t, actual.Type, obtained.Type)
Expand Down
Loading

0 comments on commit 0ed9abe

Please sign in to comment.