Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#10123
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Nov 22, 2023
1 parent d1bd4ff commit 93c8afa
Show file tree
Hide file tree
Showing 13 changed files with 2,820 additions and 70 deletions.
115 changes: 102 additions & 13 deletions cdc/sink/codec/canal/canal_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,105 @@ import (

"github.com/golang/protobuf/proto"
"github.com/pingcap/tidb/parser/mysql"
<<<<<<< HEAD:cdc/sink/codec/canal/canal_encoder_test.go
=======
"github.com/pingcap/tiflow/cdc/entry"
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_encoder_test.go
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/pkg/config"
canal "github.com/pingcap/tiflow/proto/canal"
"github.com/stretchr/testify/require"
)

var (
rowCases = [][]*model.RowChangedEvent{
{{
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
Columns: []*model.Column{{
Name: "col1",
Type: mysql.TypeVarchar,
Value: []byte("aa"),
}},
}},
{
{
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
Columns: []*model.Column{{
Name: "col1",
Type: mysql.TypeVarchar,
Value: []byte("aa"),
}},
},
{
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
},
},
}

ddlCases = [][]*model.DDLEvent{
{{
CommitTs: 1,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "a", Table: "b",
},
},
Query: "create table a",
Type: 1,
}},
{
{
CommitTs: 2,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "a", Table: "b",
},
},
Query: "create table b",
Type: 3,
},
{
CommitTs: 3,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "a", Table: "b",
},
},
Query: "create table c",
Type: 3,
},
},
}
)

func TestCanalBatchEncoder(t *testing.T) {
t.Parallel()
s := defaultCanalBatchTester
for _, cs := range s.rowCases {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(10) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)

for _, cs := range rowCases {
encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal))
for _, row := range cs {
_, _, colInfo := tableInfo.GetRowColInfos()
row.TableInfo = tableInfo
row.ColInfos = colInfo
err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil)
require.Nil(t, err)
require.NoError(t, err)
}
res := encoder.Build()

if len(cs) == 0 {
require.Nil(t, res)
continue
}

require.Len(t, res, 1)
require.Nil(t, res[0].Key)
require.Equal(t, len(cs), res[0].GetRowsCount())
Expand All @@ -56,33 +132,36 @@ func TestCanalBatchEncoder(t *testing.T) {
require.Equal(t, len(cs), len(messages.GetMessages()))
}

for _, cs := range s.ddlCases {
for _, cs := range ddlCases {
encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal))
for _, ddl := range cs {
msg, err := encoder.EncodeDDLEvent(ddl)
require.Nil(t, err)
require.NoError(t, err)
require.NotNil(t, msg)
require.Nil(t, msg.Key)

packet := &canal.Packet{}
err = proto.Unmarshal(msg.Value, packet)
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, canal.PacketType_MESSAGES, packet.GetType())
messages := &canal.Messages{}
err = proto.Unmarshal(packet.GetBody(), messages)
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, 1, len(messages.GetMessages()))
require.Nil(t, err)
require.NoError(t, err)
}
}
}

func TestCanalAppendRowChangedEventWithCallback(t *testing.T) {
encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal))
require.NotNil(t, encoder)
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

count := 0
sql := `create table test.t(a varchar(10) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)

_, _, colInfo := tableInfo.GetRowColInfos()
row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Expand All @@ -91,8 +170,18 @@ func TestCanalAppendRowChangedEventWithCallback(t *testing.T) {
Type: mysql.TypeVarchar,
Value: []byte("aa"),
}},
<<<<<<< HEAD:cdc/sink/codec/canal/canal_encoder_test.go
=======
TableInfo: tableInfo,
ColInfos: colInfo,
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_encoder_test.go
}

encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal))
require.NotNil(t, encoder)

count := 0

tests := []struct {
row *model.RowChangedEvent
callback func()
Expand Down
55 changes: 43 additions & 12 deletions cdc/sink/codec/canal/canal_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@ import (
"math"
"reflect"
"strconv"
"strings"

"github.com/golang/protobuf/proto" // nolint:staticcheck
"github.com/pingcap/errors"
mm "github.com/pingcap/tidb/parser/model"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry.go
"github.com/pingcap/tidb/types"
=======
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry.go
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/internal"
cerror "github.com/pingcap/tiflow/pkg/errors"
<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry.go
=======
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/codec/internal"
"github.com/pingcap/tiflow/pkg/sink/codec/utils"
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry.go
canal "github.com/pingcap/tiflow/proto/canal"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/charmap"
Expand Down Expand Up @@ -78,7 +87,7 @@ func (b *canalEntryBuilder) buildHeader(commitTs uint64, schema string, table st
// see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java#L276-L1147
// all value will be represented in string type
// see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L760-L855
func (b *canalEntryBuilder) formatValue(value interface{}, javaType internal.JavaSQLType) (result string, err error) {
func (b *canalEntryBuilder) formatValue(value interface{}, isBinary bool) (result string, err error) {
// value would be nil, if no value insert for the column.
if value == nil {
return "", nil
Expand All @@ -96,20 +105,15 @@ func (b *canalEntryBuilder) formatValue(value interface{}, javaType internal.Jav
case string:
result = v
case []byte:
// JavaSQLTypeVARCHAR / JavaSQLTypeCHAR / JavaSQLTypeBLOB / JavaSQLTypeCLOB /
// special handle for text and blob
// see https://github.com/alibaba/canal/blob/9f6021cf36f78cc8ac853dcf37a1769f359b868b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L801
switch javaType {
// for normal text
case internal.JavaSQLTypeVARCHAR, internal.JavaSQLTypeCHAR, internal.JavaSQLTypeCLOB:
result = string(v)
default:
// JavaSQLTypeBLOB
if isBinary {
decoded, err := b.bytesDecoder.Bytes(v)
if err != nil {
return "", err
}
result = string(decoded)
} else {
result = string(v)
}
default:
result = fmt.Sprintf("%v", v)
Expand All @@ -119,21 +123,27 @@ func (b *canalEntryBuilder) formatValue(value interface{}, javaType internal.Jav

// build the Column in the canal RowData
// see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L756-L872
<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry.go
func (b *canalEntryBuilder) buildColumn(c *model.Column, colName string, updated bool) (*canal.Column, error) {
mysqlType := getMySQLType(c)
javaType, err := getJavaSQLType(c, mysqlType)
=======
func (b *canalEntryBuilder) buildColumn(c *model.Column, columnInfo *timodel.ColumnInfo, updated bool) (*canal.Column, error) {
mysqlType := utils.GetMySQLType(columnInfo, b.config.ContentCompatible)
javaType, err := getJavaSQLType(c.Value, c.Type, c.Flag)
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry.go
if err != nil {
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}

value, err := b.formatValue(c.Value, javaType)
value, err := b.formatValue(c.Value, c.Flag.IsBinary())
if err != nil {
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}

canalColumn := &canal.Column{
SqlType: int32(javaType),
Name: colName,
Name: c.Name,
IsKey: c.Flag.IsPrimaryKey(),
Updated: updated,
IsNullPresent: &canal.Column_IsNull{IsNull: c.Value == nil},
Expand All @@ -150,7 +160,16 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKey
if column == nil {
continue
}
<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry.go
c, err := b.buildColumn(column, column.Name, !e.IsDelete())
=======
columnInfo, ok := e.TableInfo.GetColumnInfo(e.ColInfos[idx].ID)
if !ok {
return nil, cerror.ErrCanalEncodeFailed.GenWithStack(
"column info not found for column id: %d", e.ColInfos[idx].ID)
}
c, err := b.buildColumn(column, columnInfo, !e.IsDelete())
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry.go
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -166,7 +185,16 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKey
if onlyHandleKeyColumns && !column.Flag.IsHandleKey() {
continue
}
<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry.go
c, err := b.buildColumn(column, column.Name, !e.IsDelete())
=======
columnInfo, ok := e.TableInfo.GetColumnInfo(e.ColInfos[idx].ID)
if !ok {
return nil, cerror.ErrCanalEncodeFailed.GenWithStack(
"column info not found for column id: %d", e.ColInfos[idx].ID)
}
c, err := b.buildColumn(column, columnInfo, !e.IsDelete())
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry.go
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -373,6 +401,7 @@ func getJavaSQLType(c *model.Column, mysqlType string) (result internal.JavaSQLT

return javaType, nil
}
<<<<<<< HEAD:cdc/sink/codec/canal/canal_entry.go

// when encoding the canal format, for unsigned mysql type, add `unsigned` keyword.
// it should have the form `t unsigned`, such as `int unsigned`
Expand Down Expand Up @@ -407,3 +436,5 @@ func getMySQLType(c *model.Column) string {

return mysqlType
}
=======
>>>>>>> 5921050d90 (codec(ticdc): canal-json decouple get value from java type and refactor unit test (#10123)):pkg/sink/codec/canal/canal_entry.go
Loading

0 comments on commit 93c8afa

Please sign in to comment.