Skip to content

Commit

Permalink
codec(ticdc): canal-json support compatible content by output detaile…
Browse files Browse the repository at this point in the history
…d mysql type information (#10014) (#10126)

close #10106
  • Loading branch information
ti-chi-bot authored Nov 24, 2023
1 parent 794de44 commit 587b155
Show file tree
Hide file tree
Showing 31 changed files with 2,495 additions and 598 deletions.
3 changes: 3 additions & 0 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ func (ti *TableInfo) initColumnsFlag() {
if mysql.HasUnsignedFlag(colInfo.GetFlag()) {
flag.SetIsUnsigned()
}
if mysql.HasZerofillFlag(colInfo.GetFlag()) {
flag.SetZeroFill()
}
ti.ColumnsFlag[colInfo.ID] = flag
}

Expand Down
12 changes: 12 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,20 @@ const (
NullableFlag
// UnsignedFlag means the column stores an unsigned integer
UnsignedFlag
// ZerofillFlag means the column is zerofill
ZerofillFlag
)

// SetZeroFill sets ZerofillFlag
func (b *ColumnFlagType) SetZeroFill() {
(*util.Flag)(b).Add(util.Flag(ZerofillFlag))
}

// IsZerofill shows whether ZerofillFlag is set
func (b *ColumnFlagType) IsZerofill() bool {
return (*util.Flag)(b).HasAll(util.Flag(ZerofillFlag))
}

// SetIsBinary sets BinaryFlag
func (b *ColumnFlagType) SetIsBinary() {
(*util.Flag)(b).Add(util.Flag(BinaryFlag))
Expand Down
47 changes: 14 additions & 33 deletions cdc/sink/codec/avro/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,6 @@ func teardownEncoderAndSchemaRegistry() {
stopHTTPInterceptForTestingRegistry()
}

func setBinChsClnFlag(ft *types.FieldType) *types.FieldType {
types.SetBinChsClnFlag(ft)
return ft
}

//nolint:unparam
func setFlag(ft *types.FieldType, flag uint) *types.FieldType {
ft.SetFlag(flag)
return ft
}

func setElems(ft *types.FieldType, elems []string) *types.FieldType {
ft.SetElems(elems)
return ft
}

type avroTestColumnTuple struct {
col model.Column
colInfo rowcodec.ColInfo
Expand Down Expand Up @@ -162,7 +146,7 @@ var avroTestColumns = []*avroTestColumnTuple{
ID: 6,
IsPKHandle: false,
VirtualGenCol: false,
Ft: setFlag(types.NewFieldType(mysql.TypeTiny), uint(model.UnsignedFlag)),
Ft: common.SetUnsigned(types.NewFieldType(mysql.TypeTiny)),
},
avroSchema{Type: "int", Parameters: map[string]string{"tidb_type": "INT UNSIGNED"}},
int32(1), "int",
Expand All @@ -178,7 +162,7 @@ var avroTestColumns = []*avroTestColumnTuple{
ID: 7,
IsPKHandle: false,
VirtualGenCol: false,
Ft: setFlag(types.NewFieldType(mysql.TypeShort), uint(model.UnsignedFlag)),
Ft: common.SetUnsigned(types.NewFieldType(mysql.TypeShort)),
},
avroSchema{Type: "int", Parameters: map[string]string{"tidb_type": "INT UNSIGNED"}},
int32(1), "int",
Expand All @@ -194,7 +178,7 @@ var avroTestColumns = []*avroTestColumnTuple{
ID: 8,
IsPKHandle: false,
VirtualGenCol: false,
Ft: setFlag(types.NewFieldType(mysql.TypeInt24), uint(model.UnsignedFlag)),
Ft: common.SetUnsigned(types.NewFieldType(mysql.TypeInt24)),
},
avroSchema{Type: "int", Parameters: map[string]string{"tidb_type": "INT UNSIGNED"}},
int32(1), "int",
Expand All @@ -210,7 +194,7 @@ var avroTestColumns = []*avroTestColumnTuple{
ID: 9,
IsPKHandle: false,
VirtualGenCol: false,
Ft: setFlag(types.NewFieldType(mysql.TypeLong), uint(model.UnsignedFlag)),
Ft: common.SetUnsigned(types.NewFieldType(mysql.TypeLong)),
},
avroSchema{Type: "long", Parameters: map[string]string{"tidb_type": "INT UNSIGNED"}},
int64(1), "long",
Expand All @@ -226,10 +210,7 @@ var avroTestColumns = []*avroTestColumnTuple{
ID: 10,
IsPKHandle: false,
VirtualGenCol: false,
Ft: setFlag(
types.NewFieldType(mysql.TypeLonglong),
uint(model.UnsignedFlag),
),
Ft: common.SetUnsigned(types.NewFieldType(mysql.TypeLonglong)),
},
avroSchema{Type: "long", Parameters: map[string]string{"tidb_type": "BIGINT UNSIGNED"}},
int64(1), "long",
Expand Down Expand Up @@ -374,7 +355,7 @@ var avroTestColumns = []*avroTestColumnTuple{
ID: 22,
IsPKHandle: false,
VirtualGenCol: false,
Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeTinyBlob)),
Ft: common.SetBinChsClnFlag(types.NewFieldType(mysql.TypeTinyBlob)),
},
avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BLOB"}},
[]byte("hello world"), "bytes",
Expand All @@ -390,7 +371,7 @@ var avroTestColumns = []*avroTestColumnTuple{
ID: 23,
IsPKHandle: false,
VirtualGenCol: false,
Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeMediumBlob)),
Ft: common.SetBinChsClnFlag(types.NewFieldType(mysql.TypeMediumBlob)),
},
avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BLOB"}},
[]byte("hello world"), "bytes",
Expand All @@ -406,7 +387,7 @@ var avroTestColumns = []*avroTestColumnTuple{
ID: 24,
IsPKHandle: false,
VirtualGenCol: false,
Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeBlob)),
Ft: common.SetBinChsClnFlag(types.NewFieldType(mysql.TypeBlob)),
},
avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BLOB"}},
[]byte("hello world"), "bytes",
Expand All @@ -422,7 +403,7 @@ var avroTestColumns = []*avroTestColumnTuple{
ID: 25,
IsPKHandle: false,
VirtualGenCol: false,
Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeLongBlob)),
Ft: common.SetBinChsClnFlag(types.NewFieldType(mysql.TypeLongBlob)),
},
avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BLOB"}},
[]byte("hello world"), "bytes",
Expand All @@ -438,7 +419,7 @@ var avroTestColumns = []*avroTestColumnTuple{
ID: 26,
IsPKHandle: false,
VirtualGenCol: false,
Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeVarchar)),
Ft: common.SetBinChsClnFlag(types.NewFieldType(mysql.TypeVarchar)),
},
avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BLOB"}},
[]byte("hello world"), "bytes",
Expand All @@ -454,7 +435,7 @@ var avroTestColumns = []*avroTestColumnTuple{
ID: 27,
IsPKHandle: false,
VirtualGenCol: false,
Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeVarString)),
Ft: common.SetBinChsClnFlag(types.NewFieldType(mysql.TypeVarString)),
},
avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BLOB"}},
[]byte("hello world"), "bytes",
Expand All @@ -470,7 +451,7 @@ var avroTestColumns = []*avroTestColumnTuple{
ID: 28,
IsPKHandle: false,
VirtualGenCol: false,
Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeString)),
Ft: common.SetBinChsClnFlag(types.NewFieldType(mysql.TypeString)),
},
avroSchema{Type: "bytes", Parameters: map[string]string{"tidb_type": "BLOB"}},
[]byte("hello world"), "bytes",
Expand All @@ -481,7 +462,7 @@ var avroTestColumns = []*avroTestColumnTuple{
ID: 29,
IsPKHandle: false,
VirtualGenCol: false,
Ft: setElems(types.NewFieldType(mysql.TypeEnum), []string{"a,", "b"}),
Ft: common.SetElems(types.NewFieldType(mysql.TypeEnum), []string{"a,", "b"}),
},
avroSchema{
Type: "string",
Expand All @@ -495,7 +476,7 @@ var avroTestColumns = []*avroTestColumnTuple{
ID: 30,
IsPKHandle: false,
VirtualGenCol: false,
Ft: setElems(types.NewFieldType(mysql.TypeSet), []string{"a,", "b"}),
Ft: common.SetElems(types.NewFieldType(mysql.TypeSet), []string{"a,", "b"}),
},
avroSchema{
Type: "string",
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/canal/canal_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func newBatchEncoder(config *common.Config) codec.EventBatchEncoder {
encoder := &BatchEncoder{
messages: &canal.Messages{},
callbackBuf: make([]func(), 0),
entryBuilder: newCanalEntryBuilder(),
entryBuilder: newCanalEntryBuilder(config),

config: config,
}
Expand Down
109 changes: 96 additions & 13 deletions cdc/sink/codec/canal/canal_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,102 @@ import (

"github.com/golang/protobuf/proto"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/pkg/config"
canal "github.com/pingcap/tiflow/proto/canal"
"github.com/stretchr/testify/require"
)

var (
rowCases = [][]*model.RowChangedEvent{
{{
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
Columns: []*model.Column{{
Name: "col1",
Type: mysql.TypeVarchar,
Value: []byte("aa"),
}},
}},
{
{
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
Columns: []*model.Column{{
Name: "col1",
Type: mysql.TypeVarchar,
Value: []byte("aa"),
}},
},
{
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
},
},
}

ddlCases = [][]*model.DDLEvent{
{{
CommitTs: 1,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "a", Table: "b",
},
},
Query: "create table a",
Type: 1,
}},
{
{
CommitTs: 2,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "a", Table: "b",
},
},
Query: "create table b",
Type: 3,
},
{
CommitTs: 3,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "a", Table: "b",
},
},
Query: "create table c",
Type: 3,
},
},
}
)

func TestCanalBatchEncoder(t *testing.T) {
t.Parallel()
s := defaultCanalBatchTester
for _, cs := range s.rowCases {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(10) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)

for _, cs := range rowCases {
encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal))
for _, row := range cs {
_, _, colInfo := tableInfo.GetRowColInfos()
row.TableInfo = tableInfo
row.ColInfos = colInfo
err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil)
require.Nil(t, err)
require.NoError(t, err)
}
res := encoder.Build()

if len(cs) == 0 {
require.Nil(t, res)
continue
}

require.Len(t, res, 1)
require.Nil(t, res[0].Key)
require.Equal(t, len(cs), res[0].GetRowsCount())
Expand All @@ -56,33 +129,36 @@ func TestCanalBatchEncoder(t *testing.T) {
require.Equal(t, len(cs), len(messages.GetMessages()))
}

for _, cs := range s.ddlCases {
for _, cs := range ddlCases {
encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal))
for _, ddl := range cs {
msg, err := encoder.EncodeDDLEvent(ddl)
require.Nil(t, err)
require.NoError(t, err)
require.NotNil(t, msg)
require.Nil(t, msg.Key)

packet := &canal.Packet{}
err = proto.Unmarshal(msg.Value, packet)
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, canal.PacketType_MESSAGES, packet.GetType())
messages := &canal.Messages{}
err = proto.Unmarshal(packet.GetBody(), messages)
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, 1, len(messages.GetMessages()))
require.Nil(t, err)
require.NoError(t, err)
}
}
}

func TestCanalAppendRowChangedEventWithCallback(t *testing.T) {
encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal))
require.NotNil(t, encoder)
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

count := 0
sql := `create table test.t(a varchar(10) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)

_, _, colInfo := tableInfo.GetRowColInfos()
row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Expand All @@ -91,8 +167,15 @@ func TestCanalAppendRowChangedEventWithCallback(t *testing.T) {
Type: mysql.TypeVarchar,
Value: []byte("aa"),
}},
TableInfo: tableInfo,
ColInfos: colInfo,
}

encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal))
require.NotNil(t, encoder)

count := 0

tests := []struct {
row *model.RowChangedEvent
callback func()
Expand Down
Loading

0 comments on commit 587b155

Please sign in to comment.