Skip to content

Commit

Permalink
refactor some test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 6, 2023
1 parent 9ce8aee commit 96b25b8
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 60 deletions.
6 changes: 3 additions & 3 deletions pkg/sink/codec/canal/canal_json_decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
)

func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) {
insertEvent, _, _ := utils.newLargeEvent4Test(t)
insertEvent, _, _ := utils.NewLargeEvent4Test(t)
ctx := context.Background()
expectedDecodedValue := utils.collectExpectedDecodedValue(utils.testColumnsTable)
expectedDecodedValue := utils.CollectExpectedDecodedValue(utils.TestColumnsTable)
for _, encodeEnable := range []bool{false, true} {
codecConfig := common.NewConfig(config.ProtocolCanalJSON)
codecConfig.EnableTiDBExtension = encodeEnable
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestNewCanalJSONBatchDecoder4DDLMessage(t *testing.T) {
require.NoError(t, err)
encoder := builder.Build()

result, err := encoder.EncodeDDLEvent(utils.testCaseDDL)
result, err := encoder.EncodeDDLEvent(utils.TestCaseDDL)
require.NoError(t, err)
require.NotNil(t, result)

Expand Down
95 changes: 50 additions & 45 deletions pkg/sink/codec/canal/canal_json_row_event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,13 @@ func TestCanalJSONCompressionE2E(t *testing.T) {
require.Equal(t, decodedEvent.Table.Table, insertEvent.Table.Table)

// encode DDL event
message, err = encoder.EncodeDDLEvent(utils.TestCaseDDL)
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))`
ddlEvent := helper.DDL2Event(sql)

message, err = encoder.EncodeDDLEvent(ddlEvent)
require.NoError(t, err)

err = decoder.AddKeyValue(message.Key, message.Value)
Expand All @@ -242,10 +248,10 @@ func TestCanalJSONCompressionE2E(t *testing.T) {
decodedDDL, err := decoder.NextDDLEvent()
require.NoError(t, err)

require.Equal(t, decodedDDL.Query, utils.TestCaseDDL.Query)
require.Equal(t, decodedDDL.CommitTs, utils.TestCaseDDL.CommitTs)
require.Equal(t, decodedDDL.TableInfo.TableName.Schema, utils.TestCaseDDL.TableInfo.TableName.Schema)
require.Equal(t, decodedDDL.TableInfo.TableName.Table, utils.TestCaseDDL.TableInfo.TableName.Table)
require.Equal(t, decodedDDL.Query, ddlEvent.Query)
require.Equal(t, decodedDDL.CommitTs, ddlEvent.CommitTs)
require.Equal(t, decodedDDL.TableInfo.TableName.Schema, ddlEvent.TableInfo.TableName.Schema)
require.Equal(t, decodedDDL.TableInfo.TableName.Table, ddlEvent.TableInfo.TableName.Table)

// encode checkpoint event
waterMark := uint64(2333)
Expand Down Expand Up @@ -364,7 +370,8 @@ func TestNewCanalJSONMessageHandleKeyOnly4LargeMessage(t *testing.T) {
}

func TestNewCanalJSONMessageFromDDL(t *testing.T) {
t.Parallel()
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

codecConfig := common.NewConfig(config.ProtocolCanalJSON)
ctx := context.Background()
Expand All @@ -373,31 +380,34 @@ func TestNewCanalJSONMessageFromDDL(t *testing.T) {
require.NoError(t, err)
encoder := builder.Build().(*JSONRowEventEncoder)

message := encoder.newJSONMessageForDDL(utils.TestCaseDDL)
sql := `create table test.person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))`
ddlEvent := helper.DDL2Event(sql)

message := encoder.newJSONMessageForDDL(ddlEvent)
require.NotNil(t, message)

msg, ok := message.(*JSONMessage)
require.True(t, ok)
require.Equal(t, convertToCanalTs(utils.TestCaseDDL.CommitTs), msg.ExecutionTime)
require.Equal(t, convertToCanalTs(ddlEvent.CommitTs), msg.ExecutionTime)
require.True(t, msg.IsDDL)
require.Equal(t, "cdc", msg.Schema)
require.Equal(t, "person", msg.Table)
require.Equal(t, utils.TestCaseDDL.Query, msg.Query)
require.Equal(t, ddlEvent.Query, msg.Query)
require.Equal(t, "CREATE", msg.EventType)

codecConfig.EnableTiDBExtension = true
builder, err = NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)

encoder = builder.Build().(*JSONRowEventEncoder)
message = encoder.newJSONMessageForDDL(utils.TestCaseDDL)
message = encoder.newJSONMessageForDDL(ddlEvent)
require.NotNil(t, message)

withExtension, ok := message.(*canalJSONMessageWithTiDBExtension)
require.True(t, ok)

require.NotNil(t, withExtension.Extensions)
require.Equal(t, utils.TestCaseDDL.CommitTs, withExtension.Extensions.CommitTs)
require.Equal(t, ddlEvent.CommitTs, withExtension.Extensions.CommitTs)
}

func TestBatching(t *testing.T) {
Expand Down Expand Up @@ -539,46 +549,41 @@ func TestCheckpointEventValueMarshal(t *testing.T) {
require.Equal(t, expectedJSON, string(rawBytes))
}

func TestDDLEventWithExtensionValueMarshal(t *testing.T) {
t.Parallel()
func TestDDLEventWithExtension(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolCanalJSON)
encoder := &JSONRowEventEncoder{
builder: newCanalEntryBuilder(codecConfig),
config: &common.Config{EnableTiDBExtension: true},
}
codecConfig.EnableTiDBExtension = true
builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()
require.NotNil(t, encoder)

message := encoder.newJSONMessageForDDL(utils.TestCaseDDL)
require.NotNil(t, message)
sql := `create table test.person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))`
ddlEvent := helper.DDL2Event(sql)

msg, ok := message.(*canalJSONMessageWithTiDBExtension)
require.True(t, ok)
// Hack the build time.
// Otherwise, the timing will be inconsistent.
msg.BuildTime = 1469579899
rawBytes, err := json.MarshalIndent(msg, "", " ")
message, err := encoder.EncodeDDLEvent(ddlEvent)
require.NoError(t, err)

// No watermark ts will be output.
expectedJSON := `{
"id": 0,
"database": "cdc",
"table": "person",
"pkNames": null,
"isDdl": true,
"type": "CREATE",
"es": 1591943372224,
"ts": 1469579899,
"sql": "create table person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))",
"sqlType": null,
"mysqlType": null,
"data": null,
"old": null,
"_tidb": {
"commitTs": 417318403368288260
}
}`
require.Equal(t, expectedJSON, string(rawBytes))
decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)

messageType, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, messageType, model.MessageTypeDDL)

decodedDDL, err := decoder.NextDDLEvent()
require.NoError(t, err)
require.Equal(t, ddlEvent.Query, decodedDDL.Query)
require.Equal(t, ddlEvent.CommitTs, decodedDDL.CommitTs)
require.Equal(t, ddlEvent.TableInfo.TableName.Schema, decodedDDL.TableInfo.TableName.Schema)
require.Equal(t, ddlEvent.TableInfo.TableName.Table, decodedDDL.TableInfo.TableName.Table)
}

func TestCanalJSONAppendRowChangedEventWithCallback(t *testing.T) {
Expand Down
12 changes: 0 additions & 12 deletions pkg/sink/codec/utils/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package utils
import (
"testing"

mm "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -353,17 +352,6 @@ var (
"{\"key1\": \"value1\"}", "{\"key1\": \"value1\"}",
},
}

TestCaseDDL = &model.DDLEvent{
CommitTs: 417318403368288260,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "cdc", Table: "person",
},
},
Query: "create table person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))",
Type: mm.ActionCreateTable,
}
)

func collectAllColumns(groups []*testColumnTuple) []*model.Column {
Expand Down

0 comments on commit 96b25b8

Please sign in to comment.