Skip to content

Commit

Permalink
fix some test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 23, 2023
1 parent 7648e4b commit 4c998c4
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 38 deletions.
6 changes: 2 additions & 4 deletions cdc/sink/codec/canal/canal_json_decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,11 @@ func TestNewCanalJSONBatchDecoder4DDLMessage(t *testing.T) {
codecConfig := common.NewConfig(config.ProtocolCanalJSON)
codecConfig.EnableTiDBExtension = encodeEnable
codecConfig.LargeMessageHandle = config.NewDefaultLargeMessageHandleConfig()
encoder := &JSONBatchEncoder{
builder: newCanalEntryBuilder(codecConfig),
}
encoder := newJSONBatchEncoder(codecConfig)
require.NotNil(t, encoder)

result, err := encoder.EncodeDDLEvent(testCaseDDL)
require.Nil(t, err)
require.NoError(t, err)
require.NotNil(t, result)

for _, decodeEnable := range []bool{false, true} {
Expand Down
65 changes: 31 additions & 34 deletions cdc/sink/codec/canal/canal_json_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import (
"testing"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/pkg/config"
Expand All @@ -39,16 +38,9 @@ func TestBuildJSONBatchEncoder(t *testing.T) {
}

func TestNewCanalJSONMessage4DML(t *testing.T) {
t.Parallel()
e := newJSONBatchEncoder(&common.Config{
EnableTiDBExtension: false,
Terminator: "",
})
require.NotNil(t, e)

codecConfig := common.NewConfig(config.ProtocolCanalJSON)
codecConfig.EnableTiDBExtension = true
builder := NewJSONBatchEncoderBuilder(codecConfig)

encoder, ok := builder.Build().(*JSONBatchEncoder)
require.True(t, ok)

Expand All @@ -64,8 +56,8 @@ func TestNewCanalJSONMessage4DML(t *testing.T) {
require.Nil(t, jsonMsg.Old)
require.Equal(t, "INSERT", jsonMsg.EventType)
require.Equal(t, convertToCanalTs(insertEvent.CommitTs), jsonMsg.ExecutionTime)
require.Equal(t, "cdc", jsonMsg.Schema)
require.Equal(t, "person", jsonMsg.Table)
require.Equal(t, "test", jsonMsg.Schema)
require.Equal(t, "t", jsonMsg.Table)
require.False(t, jsonMsg.IsDDL)

for _, col := range insertEvent.Columns {
Expand Down Expand Up @@ -169,12 +161,6 @@ func TestNewCanalJSONMessage4DML(t *testing.T) {
}
}

e = newJSONBatchEncoder(&common.Config{
EnableTiDBExtension: true,
Terminator: "",
})
require.NotNil(t, e)

encoder, ok = NewJSONBatchEncoderBuilder(codecConfig).Build().(*JSONBatchEncoder)
require.True(t, ok)
data, err = newJSONMessageForDML(updateEvent, encoder.config, encoder.builder, false)
Expand Down Expand Up @@ -384,6 +370,7 @@ func TestCheckpointEventValueMarshal(t *testing.T) {
func TestDDLEventWithExtensionValueMarshal(t *testing.T) {
t.Parallel()
codecConfig := common.NewConfig(config.ProtocolCanalJSON)
codecConfig.EnableTiDBExtension = true

builder := NewJSONBatchEncoderBuilder(codecConfig)
encoder := builder.Build().(*JSONBatchEncoder)
Expand Down Expand Up @@ -423,6 +410,15 @@ func TestDDLEventWithExtensionValueMarshal(t *testing.T) {
}

func TestCanalJSONAppendRowChangedEventWithCallback(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)

_, _, colInfos := tableInfo.GetRowColInfos()

encoder := newJSONBatchEncoder(&common.Config{
EnableTiDBExtension: true,
Terminator: "",
Expand All @@ -433,19 +429,15 @@ func TestCanalJSONAppendRowChangedEventWithCallback(t *testing.T) {
count := 0

row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{
Name: "col1",
Type: mysql.TypeVarchar,
Value: []byte("aa"),
}},
ColInfos: []rowcodec.ColInfo{
{
ID: 0,
Ft: types.NewFieldType(mysql.TypeVarchar),
},
},
ColInfos: colInfos,
}

tests := []struct {
Expand Down Expand Up @@ -510,21 +502,26 @@ func TestCanalJSONAppendRowChangedEventWithCallback(t *testing.T) {
}

func TestMaxMessageBytes(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)

_, _, colInfos := tableInfo.GetRowColInfos()

// the size of `testEvent` after being encoded by canal-json is 200
testEvent := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{
Name: "col1",
Type: mysql.TypeVarchar,
Value: []byte("aa"),
}},
ColInfos: []rowcodec.ColInfo{
{
ID: 0,
Ft: types.NewFieldType(mysql.TypeVarchar),
},
},
ColInfos: colInfos,
}

ctx := context.Background()
Expand Down

0 comments on commit 4c998c4

Please sign in to comment.