Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123) #10130

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions cdc/sink/dmlsink/mq/mq_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
"testing"
"time"

<<<<<<< HEAD

Check failure on line 23 in cdc/sink/dmlsink/mq/mq_dml_sink_test.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

missing import path

Check failure on line 23 in cdc/sink/dmlsink/mq/mq_dml_sink_test.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

missing import path
=======
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -56,8 +61,6 @@
}

func TestWriteEvents(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -79,11 +82,27 @@
require.NotNil(t, s)
defer s.Close()

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

tableStatus := state.TableSinkSinking
row := &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
=======
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
}

events := make([]*dmlsink.CallbackableEvent[*model.SingleTableTxn], 0, 3000)
Expand Down
131 changes: 122 additions & 9 deletions cdc/sink/dmlsink/mq/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import (
"testing"
"time"

<<<<<<< HEAD
=======
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -57,7 +62,13 @@ func newNonBatchEncodeWorker(ctx context.Context, t *testing.T) (*worker, dmlpro
}

func TestNonBatchEncode_SendMessages(t *testing.T) {
t.Parallel()
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -70,9 +81,17 @@ func TestNonBatchEncode_SendMessages(t *testing.T) {
Partition: 1,
}
row := &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
=======
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
}
tableStatus := state.TableSinkSinking

Expand Down Expand Up @@ -250,7 +269,13 @@ func TestBatchEncode_Group(t *testing.T) {
}

func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
t.Parallel()
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

key1 := TopicPartitionKey{
Topic: "test",
Expand All @@ -270,9 +295,11 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand Down Expand Up @@ -317,8 +344,6 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
}

func TestBatchEncode_SendMessages(t *testing.T) {
t.Parallel()

key1 := TopicPartitionKey{
Topic: "test",
Partition: 1,
Expand All @@ -337,13 +362,30 @@ func TestBatchEncode_SendMessages(t *testing.T) {
defer cancel()
worker, p := newBatchEncodeWorker(ctx, t)
defer worker.close()

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

events := []mqEvent{
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
=======
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -353,9 +395,17 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
=======
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -365,9 +415,17 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "cc"}},
=======
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -377,9 +435,17 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 2,
Table: &model.TableName{Schema: "aa", Table: "bb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
=======
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -389,9 +455,17 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 2,
Table: &model.TableName{Schema: "aaa", Table: "bbb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
=======
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -401,9 +475,17 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 3,
Table: &model.TableName{Schema: "aaa", Table: "bbb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
=======
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &tableStatus,
Expand Down Expand Up @@ -461,8 +543,6 @@ func TestBatchEncodeWorker_Abort(t *testing.T) {
}

func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
t.Parallel()

key1 := TopicPartitionKey{
Topic: "test",
Partition: 1,
Expand All @@ -477,13 +557,30 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
defer worker.close()
replicatingStatus := state.TableSinkSinking
stoppedStatus := state.TableSinkStopping

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

events := []mqEvent{
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
=======
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand All @@ -493,9 +590,17 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
=======
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand All @@ -505,9 +610,17 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "cc"}},
=======
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &stoppedStatus,
Expand Down
Loading
Loading