Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec(ticdc): canal-json decouple get value from java type and refactor unit test #10123

Merged
merged 14 commits into from
Nov 22, 2023
3 changes: 0 additions & 3 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,6 @@ func (ti *TableInfo) initColumnsFlag() {
if mysql.HasUnsignedFlag(colInfo.GetFlag()) {
flag.SetIsUnsigned()
}
if mysql.HasZerofillFlag(colInfo.GetFlag()) {
flag.SetZeroFill()
}
ti.ColumnsFlag[colInfo.ID] = flag
}

Expand Down
12 changes: 0 additions & 12 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,8 @@ const (
NullableFlag
// UnsignedFlag means the column stores an unsigned integer
UnsignedFlag
// ZerofillFlag means the column is zerofill
ZerofillFlag
)

// SetZeroFill sets ZerofillFlag
func (b *ColumnFlagType) SetZeroFill() {
(*util.Flag)(b).Add(util.Flag(ZerofillFlag))
}

// IsZerofill shows whether ZerofillFlag is set
func (b *ColumnFlagType) IsZerofill() bool {
return (*util.Flag)(b).HasAll(util.Flag(ZerofillFlag))
}

// SetIsBinary sets BinaryFlag
func (b *ColumnFlagType) SetIsBinary() {
(*util.Flag)(b).Add(util.Flag(BinaryFlag))
Expand Down
22 changes: 14 additions & 8 deletions cdc/sink/dmlsink/mq/mq_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (
"time"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -60,8 +59,6 @@ func TestNewKafkaDMLSinkFailed(t *testing.T) {
}

func TestWriteEvents(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -84,12 +81,21 @@ func TestWriteEvents(t *testing.T) {
require.NotNil(t, s)
defer s.Close()

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

tableStatus := state.TableSinkSinking
row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
}

events := make([]*dmlsink.CallbackableEvent[*model.SingleTableTxn], 0, 3000)
Expand Down
139 changes: 88 additions & 51 deletions cdc/sink/dmlsink/mq/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
"time"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -64,7 +63,13 @@ func newNonBatchEncodeWorker(ctx context.Context, t *testing.T) (*worker, dmlpro
}

func TestNonBatchEncode_SendMessages(t *testing.T) {
t.Parallel()
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -77,10 +82,11 @@ func TestNonBatchEncode_SendMessages(t *testing.T) {
Partition: 1,
}
row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
}
tableStatus := state.TableSinkSinking

Expand Down Expand Up @@ -258,7 +264,13 @@ func TestBatchEncode_Group(t *testing.T) {
}

func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
t.Parallel()
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

key1 := TopicPartitionKey{
Topic: "test",
Expand All @@ -278,9 +290,11 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand Down Expand Up @@ -325,8 +339,6 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
}

func TestBatchEncode_SendMessages(t *testing.T) {
t.Parallel()

key1 := TopicPartitionKey{
Topic: "test",
Partition: 1,
Expand All @@ -345,14 +357,24 @@ func TestBatchEncode_SendMessages(t *testing.T) {
defer cancel()
worker, p := newBatchEncodeWorker(ctx, t)
defer worker.close()

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

events := []mqEvent{
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -362,10 +384,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -375,10 +398,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -388,10 +412,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "aa", Table: "bb"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -401,10 +426,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "aaa", Table: "bbb"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -414,10 +440,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "aaa", Table: "bbb"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand Down Expand Up @@ -475,8 +502,6 @@ func TestBatchEncodeWorker_Abort(t *testing.T) {
}

func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
t.Parallel()

key1 := TopicPartitionKey{
Topic: "test",
Partition: 1,
Expand All @@ -491,14 +516,24 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
defer worker.close()
replicatingStatus := state.TableSinkSinking
stoppedStatus := state.TableSinkStopping

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

events := []mqEvent{
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand All @@ -508,10 +543,11 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand All @@ -521,10 +557,11 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &stoppedStatus,
Expand Down
Loading
Loading