Skip to content

Commit

Permalink
codec(ticdc): canal-json decouple java type from the mysql type (#10087
Browse files Browse the repository at this point in the history
…) (#10125)

close #10086
  • Loading branch information
ti-chi-bot authored Nov 22, 2023
1 parent d1bd4ff commit 8730276
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 29 deletions.
43 changes: 21 additions & 22 deletions cdc/sink/codec/canal/canal_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ func (b *canalEntryBuilder) formatValue(value interface{}, javaType internal.Jav
// build the Column in the canal RowData
// see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L756-L872
func (b *canalEntryBuilder) buildColumn(c *model.Column, colName string, updated bool) (*canal.Column, error) {
mysqlType := getMySQLType(c)
javaType, err := getJavaSQLType(c, mysqlType)
mysqlType := getMySQLType(c.Type, c.Flag)
javaType, err := getJavaSQLType(c.Value, c.Type, c.Flag)
if err != nil {
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}
Expand Down Expand Up @@ -299,40 +299,39 @@ func isCanalDDL(t canal.EventType) bool {
return false
}

func getJavaSQLType(c *model.Column, mysqlType string) (result internal.JavaSQLType, err error) {
javaType := internal.MySQLType2JavaType(c.Type, c.Flag.IsBinary())
func getJavaSQLType(value interface{}, tp byte, flag model.ColumnFlagType) (result internal.JavaSQLType, err error) {
javaType := internal.MySQLType2JavaType(tp, flag.IsBinary())

switch javaType {
case internal.JavaSQLTypeBINARY, internal.JavaSQLTypeVARBINARY, internal.JavaSQLTypeLONGVARBINARY:
if strings.Contains(mysqlType, "text") {
return internal.JavaSQLTypeCLOB, nil
if flag.IsBinary() {
return internal.JavaSQLTypeBLOB, nil
}
return internal.JavaSQLTypeBLOB, nil
return internal.JavaSQLTypeCLOB, nil
}

// flag `isUnsigned` only for `numerical` and `bit`, `year` data type.
if !c.Flag.IsUnsigned() {
if !flag.IsUnsigned() {
return javaType, nil
}

switch tp {
// for year, to `int64`, others to `uint64`.
// no need to promote type for `year` and `bit`
if c.Type == mysql.TypeYear || c.Type == mysql.TypeBit {
case mysql.TypeYear, mysql.TypeBit:
return javaType, nil
}

if c.Type == mysql.TypeFloat || c.Type == mysql.TypeDouble || c.Type == mysql.TypeNewDecimal {
case mysql.TypeFloat, mysql.TypeDouble, mysql.TypeNewDecimal:
return javaType, nil
}

// for **unsigned** integral types, type would be `uint64` or `string`. see reference:
// https://github.com/pingcap/tiflow/blob/1e3dd155049417e3fd7bf9b0a0c7b08723b33791/cdc/entry/mounter.go#L501
// https://github.com/pingcap/tidb/blob/6495a5a116a016a3e077d181b8c8ad81f76ac31b/types/datum.go#L423-L455
if c.Value == nil {
if value == nil {
return javaType, nil
}
var number uint64
switch v := c.Value.(type) {
switch v := value.(type) {
case uint64:
number = v
case string:
Expand All @@ -342,7 +341,7 @@ func getJavaSQLType(c *model.Column, mysqlType string) (result internal.JavaSQLT
}
number = a
default:
return javaType, errors.Errorf("unexpected type for unsigned value: %+v, column: %+v", reflect.TypeOf(v), c)
return javaType, errors.Errorf("unexpected type for unsigned value: %+v, tp: %+v", reflect.TypeOf(v), tp)
}

// Some special cases handled in canal
Expand All @@ -352,7 +351,7 @@ func getJavaSQLType(c *model.Column, mysqlType string) (result internal.JavaSQLT
// SmallInt, 2byte, [-32768, 32767], [0, 65535], if a > 32767
// Int, 4byte, [-2147483648, 2147483647], [0, 4294967295], if a > 2147483647
// BigInt, 8byte, [-2<<63, 2 << 63 - 1], [0, 2 << 64 - 1], if a > 2 << 63 - 1
switch c.Type {
switch tp {
case mysql.TypeTiny:
if number > math.MaxInt8 {
javaType = internal.JavaSQLTypeSMALLINT
Expand Down Expand Up @@ -388,20 +387,20 @@ func trimUnsignedFromMySQLType(mysqlType string) string {
return strings.TrimSuffix(mysqlType, " unsigned")
}

func getMySQLType(c *model.Column) string {
mysqlType := types.TypeStr(c.Type)
func getMySQLType(tp byte, flag model.ColumnFlagType) string {
mysqlType := types.TypeStr(tp)
// make `mysqlType` representation keep the same as the canal official implementation
mysqlType = withUnsigned4MySQLType(mysqlType, c.Flag.IsUnsigned())
mysqlType = withUnsigned4MySQLType(mysqlType, flag.IsUnsigned())

if !c.Flag.IsBinary() {
if !flag.IsBinary() {
return mysqlType
}

if types.IsTypeBlob(c.Type) {
if types.IsTypeBlob(tp) {
return strings.Replace(mysqlType, "text", "blob", 1)
}

if types.IsTypeChar(c.Type) {
if types.IsTypeChar(tp) {
return strings.Replace(mysqlType, "char", "binary", 1)
}

Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/codec/canal/canal_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func TestGetMySQLTypeAndJavaSQLType(t *testing.T) {
t.Parallel()
canalEntryBuilder := newCanalEntryBuilder()
for _, item := range testColumnsTable {
obtainedMySQLType := getMySQLType(item.column)
obtainedMySQLType := getMySQLType(item.column.Type, item.column.Flag)
require.Equal(t, item.expectedMySQLType, obtainedMySQLType)

obtainedJavaSQLType, err := getJavaSQLType(item.column, obtainedMySQLType)
obtainedJavaSQLType, err := getJavaSQLType(item.column.Value, item.column.Type, item.column.Flag)
require.Nil(t, err)
require.Equal(t, item.expectedJavaSQLType, obtainedJavaSQLType)

Expand Down
8 changes: 3 additions & 5 deletions cdc/sink/codec/canal/canal_json_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ func fillColumns(
} else {
out.RawByte(',')
}
mysqlType := getMySQLType(col)
javaType, err := getJavaSQLType(col, mysqlType)
javaType, err := getJavaSQLType(col.Value, col.Type, col.Flag)
if err != nil {
return cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}
Expand Down Expand Up @@ -190,15 +189,14 @@ func newJSONMessageForDML(
} else {
out.RawByte(',')
}
mysqlType := getMySQLType(col)
javaType, err := getJavaSQLType(col, mysqlType)
javaType, err := getJavaSQLType(col.Value, col.Type, col.Flag)
if err != nil {
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}
out.String(col.Name)
out.RawByte(':')
out.Int32(int32(javaType))
mysqlTypeMap[col.Name] = mysqlType
mysqlTypeMap[col.Name] = getMySQLType(col.Type, col.Flag)
}
}
if emptyColumn {
Expand Down

0 comments on commit 8730276

Please sign in to comment.