Skip to content

Commit

Permalink
codec(ticdc): canal-json decouple java type from the mysql type (ping…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 4, 2023
1 parent 1f6a44b commit f6f03f7
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 55 deletions.
5 changes: 5 additions & 0 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
var (
regionWorkerPool workerpool.WorkerPool
workerPoolOnce sync.Once
workerPoolLock sync.Mutex
// The magic number here is keep the same with some magic numbers in some
// other components in TiCDC, including worker pool task chan size, mounter
// chan size etc.
Expand Down Expand Up @@ -409,6 +410,8 @@ func (w *regionWorker) processEvent(ctx context.Context, event *regionStatefulEv

func (w *regionWorker) initPoolHandles() {
handles := make([]workerpool.EventHandle, 0, w.concurrency)
workerPoolLock.Lock()
defer workerPoolLock.Unlock()
for i := 0; i < w.concurrency; i++ {
poolHandle := regionWorkerPool.RegisterEvent(func(ctx context.Context, eventI interface{}) error {
event := eventI.(*regionStatefulEvent)
Expand Down Expand Up @@ -864,6 +867,8 @@ func getWorkerPoolSize() (size int) {
func InitWorkerPool() {
workerPoolOnce.Do(func() {
size := getWorkerPoolSize()
workerPoolLock.Lock()
defer workerPoolLock.Unlock()
regionWorkerPool = workerpool.NewDefaultWorkerPool(size)
})
}
Expand Down
8 changes: 4 additions & 4 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,6 @@ func (s *server) run(ctx context.Context) (err error) {

eg, egCtx := errgroup.WithContext(ctx)

eg.Go(func() error {
return s.capture.Run(egCtx)
})

eg.Go(func() error {
return s.upstreamPDHealthChecker(egCtx)
})
Expand All @@ -371,6 +367,10 @@ func (s *server) run(ctx context.Context) (err error) {
return nil
})

eg.Go(func() error {
return s.capture.Run(egCtx)
})

return eg.Wait()
}

Expand Down
67 changes: 30 additions & 37 deletions pkg/sink/codec/canal/canal_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (b *canalEntryBuilder) buildHeader(commitTs uint64, schema string, table st
// see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java#L276-L1147
// all value will be represented in string type
// see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L760-L855
func (b *canalEntryBuilder) formatValue(value interface{}, javaType internal.JavaSQLType) (result string, err error) {
func (b *canalEntryBuilder) formatValue(value interface{}, isBinary bool) (result string, err error) {
// value would be nil, if no value insert for the column.
if value == nil {
return "", nil
Expand All @@ -96,20 +96,14 @@ func (b *canalEntryBuilder) formatValue(value interface{}, javaType internal.Jav
case string:
result = v
case []byte:
// JavaSQLTypeVARCHAR / JavaSQLTypeCHAR / JavaSQLTypeBLOB / JavaSQLTypeCLOB /
// special handle for text and blob
// see https://github.com/alibaba/canal/blob/9f6021cf36f78cc8ac853dcf37a1769f359b868b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L801
switch javaType {
// for normal text
case internal.JavaSQLTypeVARCHAR, internal.JavaSQLTypeCHAR, internal.JavaSQLTypeCLOB:
result = string(v)
default:
// JavaSQLTypeBLOB
if isBinary {
decoded, err := b.bytesDecoder.Bytes(v)
if err != nil {
return "", err
}
result = string(decoded)
} else {
result = string(v)
}
default:
result = fmt.Sprintf("%v", v)
Expand All @@ -119,21 +113,21 @@ func (b *canalEntryBuilder) formatValue(value interface{}, javaType internal.Jav

// build the Column in the canal RowData
// see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L756-L872
func (b *canalEntryBuilder) buildColumn(c *model.Column, colName string, updated bool) (*canal.Column, error) {
mysqlType := getMySQLType(c)
javaType, err := getJavaSQLType(c, mysqlType)
func (b *canalEntryBuilder) buildColumn(c *model.Column, updated bool) (*canal.Column, error) {
mysqlType := getMySQLType(c.Type, c.Flag)
javaType, err := getJavaSQLType(c.Value, c.Type, c.Flag)
if err != nil {
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}

value, err := b.formatValue(c.Value, javaType)
value, err := b.formatValue(c.Value, c.Flag.IsBinary())
if err != nil {
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}

canalColumn := &canal.Column{
SqlType: int32(javaType),
Name: colName,
Name: c.Name,
IsKey: c.Flag.IsPrimaryKey(),
Updated: updated,
IsNullPresent: &canal.Column_IsNull{IsNull: c.Value == nil},
Expand All @@ -150,7 +144,7 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKey
if column == nil {
continue
}
c, err := b.buildColumn(column, column.Name, !e.IsDelete())
c, err := b.buildColumn(column, !e.IsDelete())
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -166,7 +160,7 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKey
if onlyHandleKeyColumns && !column.Flag.IsHandleKey() {
continue
}
c, err := b.buildColumn(column, column.Name, !e.IsDelete())
c, err := b.buildColumn(column, !e.IsDelete())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -299,40 +293,39 @@ func isCanalDDL(t canal.EventType) bool {
return false
}

func getJavaSQLType(c *model.Column, mysqlType string) (result internal.JavaSQLType, err error) {
javaType := internal.MySQLType2JavaType(c.Type, c.Flag.IsBinary())
func getJavaSQLType(value interface{}, tp byte, flag model.ColumnFlagType) (result internal.JavaSQLType, err error) {
javaType := internal.MySQLType2JavaType(tp, flag.IsBinary())

switch javaType {
case internal.JavaSQLTypeBINARY, internal.JavaSQLTypeVARBINARY, internal.JavaSQLTypeLONGVARBINARY:
if strings.Contains(mysqlType, "text") {
return internal.JavaSQLTypeCLOB, nil
if flag.IsBinary() {
return internal.JavaSQLTypeBLOB, nil
}
return internal.JavaSQLTypeBLOB, nil
return internal.JavaSQLTypeCLOB, nil
}

// flag `isUnsigned` only for `numerical` and `bit`, `year` data type.
if !c.Flag.IsUnsigned() {
if !flag.IsUnsigned() {
return javaType, nil
}

switch tp {
// for year, to `int64`, others to `uint64`.
// no need to promote type for `year` and `bit`
if c.Type == mysql.TypeYear || c.Type == mysql.TypeBit {
case mysql.TypeYear, mysql.TypeBit:
return javaType, nil
}

if c.Type == mysql.TypeFloat || c.Type == mysql.TypeDouble || c.Type == mysql.TypeNewDecimal {
case mysql.TypeFloat, mysql.TypeDouble, mysql.TypeNewDecimal:
return javaType, nil
}

// for **unsigned** integral types, type would be `uint64` or `string`. see reference:
// https://github.com/pingcap/tiflow/blob/1e3dd155049417e3fd7bf9b0a0c7b08723b33791/cdc/entry/mounter.go#L501
// https://github.com/pingcap/tidb/blob/6495a5a116a016a3e077d181b8c8ad81f76ac31b/types/datum.go#L423-L455
if c.Value == nil {
if value == nil {
return javaType, nil
}
var number uint64
switch v := c.Value.(type) {
switch v := value.(type) {
case uint64:
number = v
case string:
Expand All @@ -342,7 +335,7 @@ func getJavaSQLType(c *model.Column, mysqlType string) (result internal.JavaSQLT
}
number = a
default:
return javaType, errors.Errorf("unexpected type for unsigned value: %+v, column: %+v", reflect.TypeOf(v), c)
return javaType, errors.Errorf("unexpected type for unsigned value: %+v, tp: %+v", reflect.TypeOf(v), tp)
}

// Some special cases handled in canal
Expand All @@ -352,7 +345,7 @@ func getJavaSQLType(c *model.Column, mysqlType string) (result internal.JavaSQLT
// SmallInt, 2byte, [-32768, 32767], [0, 65535], if a > 32767
// Int, 4byte, [-2147483648, 2147483647], [0, 4294967295], if a > 2147483647
// BigInt, 8byte, [-2<<63, 2 << 63 - 1], [0, 2 << 64 - 1], if a > 2 << 63 - 1
switch c.Type {
switch tp {
case mysql.TypeTiny:
if number > math.MaxInt8 {
javaType = internal.JavaSQLTypeSMALLINT
Expand Down Expand Up @@ -388,20 +381,20 @@ func trimUnsignedFromMySQLType(mysqlType string) string {
return strings.TrimSuffix(mysqlType, " unsigned")
}

func getMySQLType(c *model.Column) string {
mysqlType := types.TypeStr(c.Type)
func getMySQLType(tp byte, flag model.ColumnFlagType) string {
mysqlType := types.TypeStr(tp)
// make `mysqlType` representation keep the same as the canal official implementation
mysqlType = withUnsigned4MySQLType(mysqlType, c.Flag.IsUnsigned())
mysqlType = withUnsigned4MySQLType(mysqlType, flag.IsUnsigned())

if !c.Flag.IsBinary() {
if !flag.IsBinary() {
return mysqlType
}

if types.IsTypeBlob(c.Type) {
if types.IsTypeBlob(tp) {
return strings.Replace(mysqlType, "text", "blob", 1)
}

if types.IsTypeChar(c.Type) {
if types.IsTypeChar(tp) {
return strings.Replace(mysqlType, "char", "binary", 1)
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/sink/codec/canal/canal_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ func TestGetMySQLTypeAndJavaSQLType(t *testing.T) {
t.Parallel()
canalEntryBuilder := newCanalEntryBuilder()
for _, item := range testColumnsTable {
obtainedMySQLType := getMySQLType(item.column)
obtainedMySQLType := getMySQLType(item.column.Type, item.column.Flag)
require.Equal(t, item.expectedMySQLType, obtainedMySQLType)

obtainedJavaSQLType, err := getJavaSQLType(item.column, obtainedMySQLType)
require.Nil(t, err)
obtainedJavaSQLType, err := getJavaSQLType(item.column.Value, item.column.Type, item.column.Flag)
require.NoError(t, err)
require.Equal(t, item.expectedJavaSQLType, obtainedJavaSQLType)

obtainedFinalValue, err := canalEntryBuilder.formatValue(item.column.Value, item.column.Flag.IsBinary())
require.NoError(t, err)
if !item.column.Flag.IsBinary() {
obtainedFinalValue, err := canalEntryBuilder.formatValue(item.column.Value, obtainedJavaSQLType)
require.Nil(t, err)
require.Equal(t, item.expectedEncodedValue, obtainedFinalValue)
}
}
Expand Down
12 changes: 3 additions & 9 deletions pkg/sink/codec/canal/canal_json_row_event_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,7 @@ func fillColumns(columns []*model.Column, out *jwriter.Writer,
} else {
out.RawByte(',')
}
mysqlType := getMySQLType(col)
javaType, err := getJavaSQLType(col, mysqlType)
if err != nil {
return cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}
value, err := builder.formatValue(col.Value, javaType)
value, err := builder.formatValue(col.Value, col.Flag.IsBinary())
if err != nil {
return cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}
Expand Down Expand Up @@ -171,15 +166,14 @@ func newJSONMessageForDML(
} else {
out.RawByte(',')
}
mysqlType := getMySQLType(col)
javaType, err := getJavaSQLType(col, mysqlType)
javaType, err := getJavaSQLType(col.Value, col.Type, col.Flag)
if err != nil {
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}
out.String(col.Name)
out.RawByte(':')
out.Int32(int32(javaType))
mysqlTypeMap[col.Name] = mysqlType
mysqlTypeMap[col.Name] = getMySQLType(col.Type, col.Flag)
}
}
if emptyColumn {
Expand Down

0 comments on commit f6f03f7

Please sign in to comment.