diff --git a/pkg/sink/codec/builder/encoder_builder.go b/pkg/sink/codec/builder/encoder_builder.go index 5e75c232790..2828e02fecd 100644 --- a/pkg/sink/codec/builder/encoder_builder.go +++ b/pkg/sink/codec/builder/encoder_builder.go @@ -49,7 +49,7 @@ func NewRowEventEncoderBuilder( case config.ProtocolCraft: return craft.NewBatchEncoderBuilder(cfg), nil case config.ProtocolDebezium: - return debezium.NewBatchEncoderBuilder(cfg), nil + return debezium.NewBatchEncoderBuilder(cfg, config.GetGlobalServerConfig().ClusterID), nil case config.ProtocolSimple: return simple.NewBuilder(cfg), nil default: diff --git a/pkg/sink/codec/debezium/codec.go b/pkg/sink/codec/debezium/codec.go index ddfb714cc78..34bc2157aa5 100644 --- a/pkg/sink/codec/debezium/codec.go +++ b/pkg/sink/codec/debezium/codec.go @@ -14,10 +14,12 @@ package debezium import ( + "bytes" "encoding/binary" "fmt" "io" "strconv" + "strings" "time" "github.com/pingcap/tidb/parser/mysql" @@ -38,7 +40,7 @@ type dbzCodec struct { nowFunc func() time.Time } -func (c *dbzCodec) writeColumnsAsField( +func (c *dbzCodec) writeDebeziumFieldValues( writer *util.JSONWriter, fieldName string, cols []*model.Column, @@ -47,7 +49,7 @@ func (c *dbzCodec) writeColumnsAsField( var err error writer.WriteObjectField(fieldName, func() { for i, col := range cols { - err = c.writeDebeziumField(writer, col, colInfos[i].Ft) + err = c.writeDebeziumFieldValue(writer, col, colInfos[i].Ft) if err != nil { break } @@ -56,8 +58,199 @@ func (c *dbzCodec) writeColumnsAsField( return err } +func (c *dbzCodec) writeDebeziumFieldSchema( + writer *util.JSONWriter, + col *model.Column, + ft *types.FieldType, +) error { + switch col.Type { + case mysql.TypeBit: + n := ft.GetFlen() + if n == 1 { + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "boolean") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("field", col.Name) + }) + } else { + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "bytes") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("name", "io.debezium.data.Bits") + writer.WriteIntField("version", 1) + writer.WriteObjectField("parameters", func() { + writer.WriteStringField("length", fmt.Sprintf("%d", n)) + }) + writer.WriteStringField("field", col.Name) + }) + } + + case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, + mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "string") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeEnum: + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "string") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("name", "io.debezium.data.Enum") + writer.WriteIntField("version", 1) + writer.WriteObjectField("parameters", func() { + writer.WriteStringField("allowed", strings.Join(ft.GetElems(), ",")) + }) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeSet: + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "string") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("name", "io.debezium.data.EnumSet") + writer.WriteIntField("version", 1) + writer.WriteObjectField("parameters", func() { + writer.WriteStringField("allowed", strings.Join(ft.GetElems(), ",")) + }) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeNewDecimal: + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "double") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeDate, mysql.TypeNewDate: + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "int32") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("name", "io.debezium.time.Date") + writer.WriteIntField("version", 1) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeDatetime: + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "int64") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + if ft.GetDecimal() <= 3 { + writer.WriteStringField("name", "io.debezium.time.Timestamp") + } else { + writer.WriteStringField("name", "io.debezium.time.MicroTimestamp") + } + writer.WriteIntField("version", 1) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeTimestamp: + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "string") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("name", "io.debezium.time.ZonedTimestamp") + writer.WriteIntField("version", 1) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeDuration: + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "int64") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("name", "io.debezium.time.MicroTime") + writer.WriteIntField("version", 1) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeJSON: + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "string") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("name", "io.debezium.data.Json") + writer.WriteIntField("version", 1) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeTiny: // TINYINT + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "int16") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeShort: // SMALLINT + writer.WriteObjectElement(func() { + if mysql.HasUnsignedFlag(ft.GetFlag()) { + writer.WriteStringField("type", "int32") + } else { + writer.WriteStringField("type", "int16") + } + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeInt24: // MEDIUMINT + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "int32") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeLong: // INT + writer.WriteObjectElement(func() { + if mysql.HasUnsignedFlag(ft.GetFlag()) { + writer.WriteStringField("type", "int64") + } else { + writer.WriteStringField("type", "int32") + } + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeLonglong: // BIGINT + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "int64") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeFloat: + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "float") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeDouble: + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "double") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("field", col.Name) + }) + + case mysql.TypeYear: + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "int32") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("name", "io.debezium.time.Year") + writer.WriteIntField("version", 1) + writer.WriteStringField("field", col.Name) + }) + + default: + return cerror.ErrDebeziumEncodeFailed.GenWithStack( + "unsupported field type %d for column %s", + col.Type, + col.Name) + } + + return nil +} + // See https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types -func (c *dbzCodec) writeDebeziumField( +func (c *dbzCodec) writeDebeziumFieldValue( writer *util.JSONWriter, col *model.Column, ft *types.FieldType, @@ -68,17 +261,23 @@ func (c *dbzCodec) writeDebeziumField( } switch col.Type { case mysql.TypeBit: - if v, ok := col.Value.(uint64); ok { - // Debezium behavior: - // BIT(1) → BOOLEAN - // BIT(>1) → BYTES The byte[] contains the bits in little-endian form and is sized to - // contain the specified number of bits. - n := ft.GetFlen() - if n == 1 { - writer.WriteBoolField(col.Name, v != 0) - return nil - } + v, ok := col.Value.(uint64) + if !ok { + return cerror.ErrDebeziumEncodeFailed.GenWithStack( + "unexpected column value type %T for bit column %s", + col.Value, + col.Name) + } + // Debezium behavior: + // BIT(1) → BOOLEAN + // BIT(>1) → BYTES The byte[] contains the bits in little-endian form and is sized to + // contain the specified number of bits. + n := ft.GetFlen() + if n == 1 { + writer.WriteBoolField(col.Name, v != 0) + return nil + } else { var buf [8]byte binary.LittleEndian.PutUint64(buf[:], v) numBytes := n / 8 @@ -88,123 +287,142 @@ func (c *dbzCodec) writeDebeziumField( c.writeBinaryField(writer, col.Name, buf[:numBytes]) return nil } - return cerror.ErrDebeziumEncodeFailed.GenWithStack( - "unexpected column value type %T for bit column %s", - col.Value, - col.Name) + case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: - if col.Flag.IsBinary() { - if v, ok := col.Value.([]byte); ok { - c.writeBinaryField(writer, col.Name, v) - return nil - } + v, ok := col.Value.([]byte) + if !ok { return cerror.ErrDebeziumEncodeFailed.GenWithStack( - "unexpected column value type %T for binary string column %s", + "unexpected column value type %T for string column %s", col.Value, col.Name) } - if v, ok := col.Value.([]byte); ok { + + if col.Flag.IsBinary() { + c.writeBinaryField(writer, col.Name, v) + return nil + } else { writer.WriteStringField(col.Name, string(hack.String(v))) return nil } - return cerror.ErrDebeziumEncodeFailed.GenWithStack( - "unexpected column value type %T for non-binary string column %s", - col.Value, - col.Name) + case mysql.TypeEnum: - if v, ok := col.Value.(uint64); ok { - enumVar, err := types.ParseEnumValue(ft.GetElems(), v) - if err != nil { - // Invalid enum value inserted in non-strict mode. - writer.WriteStringField(col.Name, "") - return nil - } - writer.WriteStringField(col.Name, enumVar.Name) + v, ok := col.Value.(uint64) + if !ok { + return cerror.ErrDebeziumEncodeFailed.GenWithStack( + "unexpected column value type %T for enum column %s", + col.Value, + col.Name) + } + + enumVar, err := types.ParseEnumValue(ft.GetElems(), v) + if err != nil { + // Invalid enum value inserted in non-strict mode. + writer.WriteStringField(col.Name, "") return nil } - return cerror.ErrDebeziumEncodeFailed.GenWithStack( - "unexpected column value type %T for enum column %s", - col.Value, - col.Name) + + writer.WriteStringField(col.Name, enumVar.Name) + return nil + case mysql.TypeSet: - if v, ok := col.Value.(uint64); ok { - setVar, err := types.ParseSetValue(ft.GetElems(), v) - if err != nil { - // Invalid enum value inserted in non-strict mode. - writer.WriteStringField(col.Name, "") - return nil - } - writer.WriteStringField(col.Name, setVar.Name) + v, ok := col.Value.(uint64) + if !ok { + return cerror.ErrDebeziumEncodeFailed.GenWithStack( + "unexpected column value type %T for set column %s", + col.Value, + col.Name) + + } + + setVar, err := types.ParseSetValue(ft.GetElems(), v) + if err != nil { + // Invalid enum value inserted in non-strict mode. + writer.WriteStringField(col.Name, "") return nil } - return cerror.ErrDebeziumEncodeFailed.GenWithStack( - "unexpected column value type %T for set column %s", - col.Value, - col.Name) + + writer.WriteStringField(col.Name, setVar.Name) + return nil + case mysql.TypeNewDecimal: - if v, ok := col.Value.(string); ok { - floatV, err := strconv.ParseFloat(v, 64) - if err != nil { - return cerror.WrapError( - cerror.ErrDebeziumEncodeFailed, - err) - } - writer.WriteFloat64Field(col.Name, floatV) - return nil + v, ok := col.Value.(string) + if !ok { + return cerror.ErrDebeziumEncodeFailed.GenWithStack( + "unexpected column value type %T for decimal column %s", + col.Value, + col.Name) } - return cerror.ErrDebeziumEncodeFailed.GenWithStack( - "unexpected column value type %T for decimal column %s", - col.Value, - col.Name) + + floatV, err := strconv.ParseFloat(v, 64) + if err != nil { + return cerror.WrapError( + cerror.ErrDebeziumEncodeFailed, + err) + } + + writer.WriteFloat64Field(col.Name, floatV) + return nil + case mysql.TypeDate, mysql.TypeNewDate: - if v, ok := col.Value.(string); ok { - t, err := time.Parse("2006-01-02", v) - if err != nil { - // For example, time may be invalid like 1000-00-00 - // return nil, nil - if mysql.HasNotNullFlag(ft.GetFlag()) { - writer.WriteInt64Field(col.Name, 0) - return nil - } + v, ok := col.Value.(string) + if !ok { + return cerror.ErrDebeziumEncodeFailed.GenWithStack( + "unexpected column value type %T for date column %s", + col.Value, + col.Name) + } + + t, err := time.Parse("2006-01-02", v) + if err != nil { + // For example, time may be invalid like 1000-00-00 + // return nil, nil + if mysql.HasNotNullFlag(ft.GetFlag()) { + writer.WriteInt64Field(col.Name, 0) + return nil + } else { writer.WriteNullField(col.Name) return nil } - writer.WriteInt64Field(col.Name, t.Unix()/60/60/24) - return nil } - return cerror.ErrDebeziumEncodeFailed.GenWithStack( - "unexpected column value type %T for date column %s", - col.Value, - col.Name) + + writer.WriteInt64Field(col.Name, t.Unix()/60/60/24) + return nil + case mysql.TypeDatetime: // Debezium behavior from doc: // > Such columns are converted into epoch milliseconds or microseconds based on the // > column's precision by using UTC. // TODO: For Default Value = CURRENT_TIMESTAMP, the result is incorrect. - if v, ok := col.Value.(string); ok { - t, err := time.Parse("2006-01-02 15:04:05.999999", v) - if err != nil { - // For example, time may be 1000-00-00 - if mysql.HasNotNullFlag(ft.GetFlag()) { - writer.WriteInt64Field(col.Name, 0) - return nil - } - writer.WriteNullField(col.Name) + v, ok := col.Value.(string) + if !ok { + return cerror.ErrDebeziumEncodeFailed.GenWithStack( + "unexpected column value type %T for datetime column %s", + col.Value, + col.Name) + } + + t, err := time.Parse("2006-01-02 15:04:05.999999", v) + if err != nil { + // For example, time may be 1000-00-00 + if mysql.HasNotNullFlag(ft.GetFlag()) { + writer.WriteInt64Field(col.Name, 0) return nil - } - if ft.GetDecimal() <= 3 { - writer.WriteInt64Field(col.Name, t.UnixMilli()) + } else { + writer.WriteNullField(col.Name) return nil } + } + + if ft.GetDecimal() <= 3 { + writer.WriteInt64Field(col.Name, t.UnixMilli()) + return nil + } else { writer.WriteInt64Field(col.Name, t.UnixMicro()) return nil } - return cerror.ErrDebeziumEncodeFailed.GenWithStack( - "unexpected column value type %T for datetime column %s", - col.Value, - col.Name) + case mysql.TypeTimestamp: // Debezium behavior from doc: // > The TIMESTAMP type represents a timestamp without time zone information. @@ -215,65 +433,73 @@ func (c *dbzCodec) writeDebeziumField( // > based on the server (or session's) current time zone. The time zone will be queried from // > the server by default. If this fails, it must be specified explicitly by the database // > connectionTimeZone MySQL configuration option. - if v, ok := col.Value.(string); ok { - t, err := time.ParseInLocation("2006-01-02 15:04:05.999999", v, c.config.TimeZone) - if err != nil { - // For example, time may be invalid like 1000-00-00 - if mysql.HasNotNullFlag(ft.GetFlag()) { - t = time.Unix(0, 0) - } else { - writer.WriteNullField(col.Name) - return nil - } - } + v, ok := col.Value.(string) + if !ok { + return cerror.ErrDebeziumEncodeFailed.GenWithStack( + "unexpected column value type %T for timestamp column %s", + col.Value, + col.Name) + } - str := t.UTC().Format("2006-01-02T15:04:05") - fsp := ft.GetDecimal() - if fsp > 0 { - tmp := fmt.Sprintf(".%06d", t.Nanosecond()/1000) - str = str + tmp[:1+fsp] + t, err := time.ParseInLocation("2006-01-02 15:04:05.999999", v, c.config.TimeZone) + if err != nil { + // For example, time may be invalid like 1000-00-00 + if mysql.HasNotNullFlag(ft.GetFlag()) { + t = time.Unix(0, 0) + } else { + writer.WriteNullField(col.Name) + return nil } - str += "Z" + } - writer.WriteStringField(col.Name, str) - return nil + str := t.UTC().Format("2006-01-02T15:04:05") + fsp := ft.GetDecimal() + if fsp > 0 { + tmp := fmt.Sprintf(".%06d", t.Nanosecond()/1000) + str = str + tmp[:1+fsp] } - return cerror.ErrDebeziumEncodeFailed.GenWithStack( - "unexpected column value type %T for timestamp column %s", - col.Value, - col.Name) + str += "Z" + + writer.WriteStringField(col.Name, str) + return nil + case mysql.TypeDuration: // Debezium behavior from doc: // > Represents the time value in microseconds and does not include // > time zone information. MySQL allows M to be in the range of 0-6. - if v, ok := col.Value.(string); ok { - ctx := &stmtctx.StatementContext{} - d, _, _, err := types.StrToDuration(ctx, v, ft.GetDecimal()) - if err != nil { - return cerror.WrapError( - cerror.ErrDebeziumEncodeFailed, - err) - } + v, ok := col.Value.(string) + if !ok { + return cerror.ErrDebeziumEncodeFailed.GenWithStack( + "unexpected column value type %T for time column %s", + col.Value, + col.Name) + } - writer.WriteInt64Field(col.Name, d.Microseconds()) - return nil + ctx := &stmtctx.StatementContext{} + d, _, _, err := types.StrToDuration(ctx, v, ft.GetDecimal()) + if err != nil { + return cerror.WrapError( + cerror.ErrDebeziumEncodeFailed, + err) } - return cerror.ErrDebeziumEncodeFailed.GenWithStack( - "unexpected column value type %T for time column %s", - col.Value, - col.Name) + + writer.WriteInt64Field(col.Name, d.Microseconds()) + return nil + case mysql.TypeLonglong: if col.Flag.IsUnsigned() { // Handle with BIGINT UNSIGNED. // Debezium always produce INT64 instead of UINT64 for BIGINT. - if v, ok := col.Value.(uint64); ok { - writer.WriteInt64Field(col.Name, int64(v)) - return nil + v, ok := col.Value.(uint64) + if !ok { + return cerror.ErrDebeziumEncodeFailed.GenWithStack( + "unexpected column value type %T for unsigned bigint column %s", + col.Value, + col.Name) } - return cerror.ErrDebeziumEncodeFailed.GenWithStack( - "unexpected column value type %T for unsigned bigint column %s", - col.Value, - col.Name) + + writer.WriteInt64Field(col.Name, int64(v)) + return nil } // Note: Although Debezium's doc claims to use INT32 for INT, but it @@ -351,19 +577,203 @@ func (c *dbzCodec) EncodeRowChangedEvent( // after: An optional field that specifies the state of the row after the event occurred. // Optional field that specifies the state of the row after the event occurred. // In a delete event value, the after field is null, signifying that the row no longer exists. - err = c.writeColumnsAsField(jWriter, "after", e.Columns, e.ColInfos) + err = c.writeDebeziumFieldValues(jWriter, "after", e.Columns, e.ColInfos) } else if e.IsDelete() { jWriter.WriteStringField("op", "d") jWriter.WriteNullField("after") - err = c.writeColumnsAsField(jWriter, "before", e.PreColumns, e.ColInfos) + err = c.writeDebeziumFieldValues(jWriter, "before", e.PreColumns, e.ColInfos) } else if e.IsUpdate() { jWriter.WriteStringField("op", "u") - err = c.writeColumnsAsField(jWriter, "before", e.PreColumns, e.ColInfos) + err = c.writeDebeziumFieldValues(jWriter, "before", e.PreColumns, e.ColInfos) if err == nil { - err = c.writeColumnsAsField(jWriter, "after", e.Columns, e.ColInfos) + err = c.writeDebeziumFieldValues(jWriter, "after", e.Columns, e.ColInfos) } } }) + + jWriter.WriteObjectField("schema", func() { + jWriter.WriteStringField("type", "struct") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("name", fmt.Sprintf("%s.%s.%s.Envelope", c.clusterID, e.Table.Schema, e.Table.Table)) + jWriter.WriteIntField("version", 1) + jWriter.WriteArrayField("fields", func() { + // schema is the same for `before` and `after`. So we build a new buffer to + // build the JSON, so that content can be reused. + var fieldsJSON string + { + fieldsBuf := &bytes.Buffer{} + fieldsWriter := util.BorrowJSONWriter(fieldsBuf) + var validCols []*model.Column + if e.IsInsert() { + validCols = e.Columns + } else if e.IsDelete() { + validCols = e.PreColumns + } else if e.IsUpdate() { + validCols = e.Columns + } + for i, col := range validCols { + err = c.writeDebeziumFieldSchema(fieldsWriter, col, e.ColInfos[i].Ft) + if err != nil { + return + } + } + util.ReturnJSONWriter(fieldsWriter) + fieldsJSON = fieldsBuf.String() + } + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "struct") + jWriter.WriteBoolField("optional", true) + jWriter.WriteStringField("name", fmt.Sprintf("%s.%s.%s.Value", c.clusterID, e.Table.Schema, e.Table.Table)) + jWriter.WriteStringField("field", "before") + jWriter.WriteArrayField("fields", func() { + jWriter.WriteRaw(fieldsJSON) + }) + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "struct") + jWriter.WriteBoolField("optional", true) + jWriter.WriteStringField("name", fmt.Sprintf("%s.%s.%s.Value", c.clusterID, e.Table.Schema, e.Table.Table)) + jWriter.WriteStringField("field", "after") + jWriter.WriteArrayField("fields", func() { + jWriter.WriteRaw(fieldsJSON) + }) + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "struct") + jWriter.WriteArrayField("fields", func() { + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "string") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "version") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "string") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "connector") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "string") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "name") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "int64") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "ts_ms") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "string") + jWriter.WriteBoolField("optional", true) + jWriter.WriteStringField("name", "io.debezium.data.Enum") + jWriter.WriteIntField("version", 1) + jWriter.WriteObjectField("parameters", func() { + jWriter.WriteStringField("allowed", "true,last,false,incremental") + }) + jWriter.WriteStringField("default", "false") + jWriter.WriteStringField("field", "snapshot") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "string") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "db") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "string") + jWriter.WriteBoolField("optional", true) + jWriter.WriteStringField("field", "sequence") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "string") + jWriter.WriteBoolField("optional", true) + jWriter.WriteStringField("field", "table") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "int64") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "server_id") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "string") + jWriter.WriteBoolField("optional", true) + jWriter.WriteStringField("field", "gtid") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "string") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "file") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "int64") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "pos") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "int32") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "row") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "int64") + jWriter.WriteBoolField("optional", true) + jWriter.WriteStringField("field", "thread") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "string") + jWriter.WriteBoolField("optional", true) + jWriter.WriteStringField("field", "query") + }) + // Below are extra TiDB fields + // jWriter.WriteObjectElement(func() { + // jWriter.WriteStringField("type", "int64") + // jWriter.WriteBoolField("optional", false) + // jWriter.WriteStringField("field", "commit_ts") + // }) + // jWriter.WriteObjectElement(func() { + // jWriter.WriteStringField("type", "string") + // jWriter.WriteBoolField("optional", false) + // jWriter.WriteStringField("field", "cluster_id") + // }) + }) + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("name", "io.debezium.connector.mysql.Source") + jWriter.WriteStringField("field", "source") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "string") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "op") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "int64") + jWriter.WriteBoolField("optional", true) + jWriter.WriteStringField("field", "ts_ms") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "struct") + jWriter.WriteArrayField("fields", func() { + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "string") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "id") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "int64") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "total_order") + }) + jWriter.WriteObjectElement(func() { + jWriter.WriteStringField("type", "int64") + jWriter.WriteBoolField("optional", false) + jWriter.WriteStringField("field", "data_collection_order") + }) + }) + jWriter.WriteBoolField("optional", true) + jWriter.WriteStringField("name", "event.block") + jWriter.WriteIntField("version", 1) + jWriter.WriteStringField("field", "transaction") + }) + }) + }) }) return err diff --git a/pkg/sink/codec/debezium/debezium_test.go b/pkg/sink/codec/debezium/debezium_test.go index af0da225932..42efb6ce585 100644 --- a/pkg/sink/codec/debezium/debezium_test.go +++ b/pkg/sink/codec/debezium/debezium_test.go @@ -132,7 +132,7 @@ func (h *SQLTestHelper) ScanTable() []*model.RowChangedEvent { func requireDebeziumJSONEq(t *testing.T, dbzOutput []byte, tiCDCOutput []byte) { var ( ignoredRecordPaths = map[string]bool{ - `{map[string]any}["schema"]`: true, + // `{map[string]any}["schema"]`: true, `{map[string]any}["payload"].(map[string]any)["source"]`: true, `{map[string]any}["payload"].(map[string]any)["ts_ms"]`: true, } @@ -178,7 +178,7 @@ func TestDataTypes(t *testing.T) { rows := helper.ScanTable() cfg := common.NewConfig(config.ProtocolDebezium) cfg.TimeZone = time.UTC - encoder := NewBatchEncoderBuilder(cfg).Build() + encoder := NewBatchEncoderBuilder(cfg, "dbserver1").Build() for _, row := range rows { err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil) require.Nil(t, err) diff --git a/pkg/sink/codec/debezium/encoder.go b/pkg/sink/codec/debezium/encoder.go index b5e36cb3567..a7ef32a0d3e 100644 --- a/pkg/sink/codec/debezium/encoder.go +++ b/pkg/sink/codec/debezium/encoder.go @@ -95,13 +95,13 @@ func (d *BatchEncoder) Build() []*common.Message { } // newBatchEncoder creates a new Debezium BatchEncoder. -func newBatchEncoder(c *common.Config) codec.RowEventEncoder { +func newBatchEncoder(c *common.Config, clusterID string) codec.RowEventEncoder { batch := &BatchEncoder{ messages: nil, config: c, codec: &dbzCodec{ config: c, - clusterID: config.GetGlobalServerConfig().ClusterID, + clusterID: clusterID, nowFunc: time.Now, }, } @@ -109,19 +109,21 @@ func newBatchEncoder(c *common.Config) codec.RowEventEncoder { } type batchEncoderBuilder struct { - config *common.Config + config *common.Config + clusterID string } // NewBatchEncoderBuilder creates a Debezium batchEncoderBuilder. -func NewBatchEncoderBuilder(config *common.Config) codec.RowEventEncoderBuilder { +func NewBatchEncoderBuilder(config *common.Config, clusterID string) codec.RowEventEncoderBuilder { return &batchEncoderBuilder{ - config: config, + config: config, + clusterID: clusterID, } } // Build a `BatchEncoder` func (b *batchEncoderBuilder) Build() codec.RowEventEncoder { - return newBatchEncoder(b.config) + return newBatchEncoder(b.config, b.clusterID) } // CleanMetrics do nothing diff --git a/pkg/util/json_writer.go b/pkg/util/json_writer.go index af0f56e5731..fd07138271f 100644 --- a/pkg/util/json_writer.go +++ b/pkg/util/json_writer.go @@ -85,6 +85,10 @@ func (w *JSONWriter) Buffer() []byte { return w.stream.Buffer() } +func (w *JSONWriter) WriteRaw(b string) { + w.stream.WriteRaw(b) +} + // WriteBase64String writes a base64 string like "". func (w *JSONWriter) WriteBase64String(b []byte) { if w.out == nil {