Skip to content

Commit

Permalink
codec(ticdc): move canal test util to the utils, it should be shared …
Browse files Browse the repository at this point in the history
…among all prot… (#10265)

close #10264
  • Loading branch information
3AceShowHand authored Dec 7, 2023
1 parent cf3d7bf commit 9be2ddd
Show file tree
Hide file tree
Showing 6 changed files with 486 additions and 918 deletions.
171 changes: 0 additions & 171 deletions pkg/sink/codec/canal/canal_json_decoder_test.go

This file was deleted.

47 changes: 38 additions & 9 deletions pkg/sink/codec/canal/canal_json_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,6 @@ func canalJSONFormatColumn(value interface{}, name string, mysqlTypeStr string)
log.Panic("canal-json encoded message should have type in `string`")
}

if mysqlType == mysql.TypeBit || mysqlType == mysql.TypeSet {
val, err := strconv.ParseUint(data, 10, 64)
if err != nil {
log.Panic("invalid column value for bit", zap.Any("col", result), zap.Error(err))
}
result.Value = val
return result
}

var err error
if utils.IsBinaryMySQLType(mysqlTypeStr) {
// when encoding the `JavaSQLTypeBLOB`, use `ISO8859_1` decoder, now reverse it back.
Expand All @@ -246,6 +237,44 @@ func canalJSONFormatColumn(value interface{}, name string, mysqlTypeStr string)
if err != nil {
log.Panic("invalid column value, please report a bug", zap.Any("col", result), zap.Error(err))
}
result.Value = value
return result
}

switch mysqlType {
case mysql.TypeBit, mysql.TypeSet:
value, err = strconv.ParseUint(data, 10, 64)
if err != nil {
log.Panic("invalid column value for bit", zap.Any("col", result), zap.Error(err))
}
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeInt24, mysql.TypeYear:
value, err = strconv.ParseInt(data, 10, 64)
if err != nil {
log.Panic("invalid column value for int", zap.Any("col", result), zap.Error(err))
}
case mysql.TypeEnum:
value, err = strconv.ParseInt(data, 10, 64)
if err != nil {
log.Panic("invalid column value for enum", zap.Any("col", result), zap.Error(err))
}
case mysql.TypeLonglong:
value, err = strconv.ParseInt(data, 10, 64)
if err != nil {
value, err = strconv.ParseUint(data, 10, 64)
if err != nil {
log.Panic("invalid column value for bigint", zap.Any("col", result), zap.Error(err))
}
}
case mysql.TypeFloat:
value, err = strconv.ParseFloat(data, 32)
if err != nil {
log.Panic("invalid column value for float", zap.Any("col", result), zap.Error(err))
}
case mysql.TypeDouble:
value, err = strconv.ParseFloat(data, 64)
if err != nil {
log.Panic("invalid column value for double", zap.Any("col", result), zap.Error(err))
}
}

result.Value = value
Expand Down
Loading

0 comments on commit 9be2ddd

Please sign in to comment.