Skip to content

Commit

Permalink
This is an automated cherry-pick of #10123
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Nov 22, 2023
1 parent 04685c5 commit 503cc09
Show file tree
Hide file tree
Showing 13 changed files with 2,227 additions and 106 deletions.
23 changes: 21 additions & 2 deletions cdc/sink/dmlsink/mq/mq_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
"testing"
"time"

<<<<<<< HEAD

Check failure on line 23 in cdc/sink/dmlsink/mq/mq_dml_sink_test.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

missing import path

Check failure on line 23 in cdc/sink/dmlsink/mq/mq_dml_sink_test.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

missing import path
=======
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -56,8 +61,6 @@ func TestNewKafkaDMLSinkFailed(t *testing.T) {
}

func TestWriteEvents(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -79,11 +82,27 @@ func TestWriteEvents(t *testing.T) {
require.NotNil(t, s)
defer s.Close()

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

tableStatus := state.TableSinkSinking
row := &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
=======
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
}

events := make([]*dmlsink.CallbackableEvent[*model.SingleTableTxn], 0, 3000)
Expand Down
131 changes: 122 additions & 9 deletions cdc/sink/dmlsink/mq/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import (
"testing"
"time"

<<<<<<< HEAD
=======
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -57,7 +62,13 @@ func newNonBatchEncodeWorker(ctx context.Context, t *testing.T) (*worker, dmlpro
}

func TestNonBatchEncode_SendMessages(t *testing.T) {
t.Parallel()
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -70,9 +81,17 @@ func TestNonBatchEncode_SendMessages(t *testing.T) {
Partition: 1,
}
row := &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
=======
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
}
tableStatus := state.TableSinkSinking

Expand Down Expand Up @@ -250,7 +269,13 @@ func TestBatchEncode_Group(t *testing.T) {
}

func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
t.Parallel()
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

key1 := TopicPartitionKey{
Topic: "test",
Expand All @@ -270,9 +295,11 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand Down Expand Up @@ -317,8 +344,6 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
}

func TestBatchEncode_SendMessages(t *testing.T) {
t.Parallel()

key1 := TopicPartitionKey{
Topic: "test",
Partition: 1,
Expand All @@ -337,13 +362,30 @@ func TestBatchEncode_SendMessages(t *testing.T) {
defer cancel()
worker, p := newBatchEncodeWorker(ctx, t)
defer worker.close()

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

events := []mqEvent{
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
=======
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -353,9 +395,17 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
=======
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -365,9 +415,17 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "cc"}},
=======
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -377,9 +435,17 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 2,
Table: &model.TableName{Schema: "aa", Table: "bb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
=======
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -389,9 +455,17 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 2,
Table: &model.TableName{Schema: "aaa", Table: "bbb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
=======
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -401,9 +475,17 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 3,
Table: &model.TableName{Schema: "aaa", Table: "bbb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
=======
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &tableStatus,
Expand Down Expand Up @@ -461,8 +543,6 @@ func TestBatchEncodeWorker_Abort(t *testing.T) {
}

func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
t.Parallel()

key1 := TopicPartitionKey{
Topic: "test",
Partition: 1,
Expand All @@ -477,13 +557,30 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
defer worker.close()
replicatingStatus := state.TableSinkSinking
stoppedStatus := state.TableSinkStopping

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

events := []mqEvent{
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
=======
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand All @@ -493,9 +590,17 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
=======
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand All @@ -505,9 +610,17 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
<<<<<<< HEAD
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "cc"}},
=======
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123))
},
Callback: func() {},
SinkState: &stoppedStatus,
Expand Down
Loading

0 comments on commit 503cc09

Please sign in to comment.