Skip to content

Commit

Permalink
add more code to the test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 21, 2023
1 parent cc34f65 commit d1485bc
Show file tree
Hide file tree
Showing 7 changed files with 878 additions and 828 deletions.
115 changes: 96 additions & 19 deletions pkg/sink/codec/canal/canal_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,102 @@ import (

"github.com/golang/protobuf/proto"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
canal "github.com/pingcap/tiflow/proto/canal"
"github.com/stretchr/testify/require"
)

var (
rowCases = [][]*model.RowChangedEvent{
{{
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
Columns: []*model.Column{{
Name: "col1",
Type: mysql.TypeVarchar,
Value: []byte("aa"),
}},
}},
{
{
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
Columns: []*model.Column{{
Name: "col1",
Type: mysql.TypeVarchar,
Value: []byte("aa"),
}},
},
{
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
},
},
}

ddlCases = [][]*model.DDLEvent{
{{
CommitTs: 1,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "a", Table: "b",
},
},
Query: "create table a",
Type: 1,
}},
{
{
CommitTs: 2,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "a", Table: "b",
},
},
Query: "create table b",
Type: 3,
},
{
CommitTs: 3,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "a", Table: "b",
},
},
Query: "create table c",
Type: 3,
},
},
}
)

func TestCanalBatchEncoder(t *testing.T) {
t.Parallel()
s := defaultCanalBatchTester
for _, cs := range s.rowCases {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(10) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)

for _, cs := range rowCases {
encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal))
for _, row := range cs {
_, _, colInfo := tableInfo.GetRowColInfos()
row.TableInfo = tableInfo
row.ColInfos = colInfo
err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil)
require.Nil(t, err)
require.NoError(t, err)
}
res := encoder.Build()

if len(cs) == 0 {
require.Nil(t, res)
continue
}

require.Len(t, res, 1)
require.Nil(t, res[0].Key)
require.Equal(t, len(cs), res[0].GetRowsCount())
Expand All @@ -58,33 +129,36 @@ func TestCanalBatchEncoder(t *testing.T) {
require.Equal(t, len(cs), len(messages.GetMessages()))
}

for _, cs := range s.ddlCases {
for _, cs := range ddlCases {
encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal))
for _, ddl := range cs {
msg, err := encoder.EncodeDDLEvent(ddl)
require.Nil(t, err)
require.NoError(t, err)
require.NotNil(t, msg)
require.Nil(t, msg.Key)

packet := &canal.Packet{}
err = proto.Unmarshal(msg.Value, packet)
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, canal.PacketType_MESSAGES, packet.GetType())
messages := &canal.Messages{}
err = proto.Unmarshal(packet.GetBody(), messages)
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, 1, len(messages.GetMessages()))
require.Nil(t, err)
require.NoError(t, err)
}
}
}

func TestCanalAppendRowChangedEventWithCallback(t *testing.T) {
encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal))
require.NotNil(t, encoder)
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

count := 0
sql := `create table test.t(a varchar(10) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)

_, _, colInfo := tableInfo.GetRowColInfos()
row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Expand All @@ -93,12 +167,15 @@ func TestCanalAppendRowChangedEventWithCallback(t *testing.T) {
Type: mysql.TypeVarchar,
Value: []byte("aa"),
}},
ColInfos: []rowcodec.ColInfo{{
ID: 1,
Ft: types.NewFieldType(mysql.TypeVarchar),
}},
TableInfo: tableInfo,
ColInfos: colInfo,
}

encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal))
require.NotNil(t, encoder)

count := 0

tests := []struct {
row *model.RowChangedEvent
callback func()
Expand Down
42 changes: 23 additions & 19 deletions pkg/sink/codec/canal/canal_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"github.com/golang/protobuf/proto" // nolint:staticcheck
"github.com/pingcap/errors"
mm "github.com/pingcap/tidb/parser/model"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
Expand Down Expand Up @@ -118,8 +118,8 @@ func (b *canalEntryBuilder) formatValue(value interface{}, isBinary bool) (resul

// build the Column in the canal RowData
// see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L756-L872
func (b *canalEntryBuilder) buildColumn(c *model.Column, colInfo rowcodec.ColInfo, colName string, updated bool) (*canal.Column, error) {
mysqlType := getMySQLType(colInfo.Ft, c.Flag, b.config.ContentCompatible)
func (b *canalEntryBuilder) buildColumn(c *model.Column, columnInfo *timodel.ColumnInfo, updated bool) (*canal.Column, error) {
mysqlType := getMySQLType(columnInfo, b.config.ContentCompatible)
javaType, err := getJavaSQLType(c.Value, c.Type, c.Flag)
if err != nil {
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
Expand All @@ -132,7 +132,7 @@ func (b *canalEntryBuilder) buildColumn(c *model.Column, colInfo rowcodec.ColInf

canalColumn := &canal.Column{
SqlType: int32(javaType),
Name: colName,
Name: c.Name,
IsKey: c.Flag.IsPrimaryKey(),
Updated: updated,
IsNullPresent: &canal.Column_IsNull{IsNull: c.Value == nil},
Expand All @@ -149,7 +149,12 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKey
if column == nil {
continue
}
c, err := b.buildColumn(column, e.ColInfos[idx], column.Name, !e.IsDelete())
columnInfo, ok := e.TableInfo.GetColumnInfo(e.ColInfos[idx].ID)
if !ok {
return nil, cerror.ErrCanalEncodeFailed.GenWithStack(
"column info not found for column id: %d", e.ColInfos[idx].ID)
}
c, err := b.buildColumn(column, columnInfo, !e.IsDelete())
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -165,7 +170,12 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKey
if onlyHandleKeyColumns && !column.Flag.IsHandleKey() {
continue
}
c, err := b.buildColumn(column, e.ColInfos[idx], column.Name, !e.IsDelete())
columnInfo, ok := e.TableInfo.GetColumnInfo(e.ColInfos[idx].ID)
if !ok {
return nil, cerror.ErrCanalEncodeFailed.GenWithStack(
"column info not found for column id: %d", e.ColInfos[idx].ID)
}
c, err := b.buildColumn(column, columnInfo, !e.IsDelete())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -374,24 +384,18 @@ func withUnsigned4MySQLType(mysqlType string, unsigned bool) string {
}

func withZerofill4MySQLType(mysqlType string, zerofill bool) string {
if zerofill &&
!strings.HasPrefix(mysqlType, "bit") &&
!strings.HasPrefix(mysqlType, "year") {
if zerofill && !strings.HasPrefix(mysqlType, "year") {
return mysqlType + " zerofill"
}
return mysqlType
}

func getMySQLType(fieldType *types.FieldType, flag model.ColumnFlagType, fullType bool) string {
func getMySQLType(columnInfo *timodel.ColumnInfo, fullType bool) string {
if !fullType {
result := types.TypeToStr(fieldType.GetType(), fieldType.GetCharset())
result = withUnsigned4MySQLType(result, flag.IsUnsigned())
result = withZerofill4MySQLType(result, flag.IsZerofill())

result := types.TypeToStr(columnInfo.GetType(), columnInfo.GetCharset())
result = withUnsigned4MySQLType(result, mysql.HasUnsignedFlag(columnInfo.GetFlag()))
result = withZerofill4MySQLType(result, mysql.HasZerofillFlag(columnInfo.GetFlag()))
return result
}

result := fieldType.InfoSchemaStr()
result = withZerofill4MySQLType(result, flag.IsZerofill())
return result
return columnInfo.GetTypeDesc()
}
Loading

0 comments on commit d1485bc

Please sign in to comment.