Skip to content

Commit

Permalink
codec(ticdc): decoder support only handle key columns (#9187)
Browse files Browse the repository at this point in the history
close #9189
  • Loading branch information
3AceShowHand authored Jun 14, 2023
1 parent 2473041 commit ee9db02
Show file tree
Hide file tree
Showing 16 changed files with 196 additions and 61 deletions.
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
c.appendDDL(ddl)
}
case model.MessageTypeRow:
row, err := decoder.NextRowChangedEvent()
row, _, err := decoder.NextRowChangedEvent()
if err != nil {
log.Panic("decode message value failed",
zap.ByteString("value", message.Value),
Expand Down
2 changes: 1 addition & 1 deletion cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (c *consumer) emitDMLEvents(
cnt++

if tp == model.MessageTypeRow {
row, err := decoder.NextRowChangedEvent()
row, _, err := decoder.NextRowChangedEvent()
if err != nil {
log.Error("failed to get next row changed event", zap.Error(err))
return errors.Trace(err)
Expand Down
26 changes: 13 additions & 13 deletions pkg/sink/codec/avro/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (d *decoder) HasNext() (model.MessageType, bool, error) {
}

// NextRowChangedEvent returns the next row changed event if exists
func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, bool, error) {
var (
valueMap map[string]interface{}
rawSchema string
Expand All @@ -107,7 +107,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
ctx := context.Background()
key, rawSchema, err := d.decodeKey(ctx)
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}

isDelete := len(d.value) == 0
Expand All @@ -116,25 +116,25 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
} else {
valueMap, rawSchema, err = d.decodeValue(ctx)
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
}

schema := make(map[string]interface{})
if err := json.Unmarshal([]byte(rawSchema), &schema); err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}

fields, ok := schema["fields"].([]interface{})
if !ok {
return nil, errors.New("schema fields should be a map")
return nil, false, errors.New("schema fields should be a map")
}

columns := make([]*model.Column, 0, len(valueMap))
for _, value := range fields {
field, ok := value.(map[string]interface{})
if !ok {
return nil, errors.New("schema field should be a map")
return nil, false, errors.New("schema field should be a map")
}

// `tidbOp` is the first extension field in the schema, so we can break here.
Expand Down Expand Up @@ -167,7 +167,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {

value, ok := valueMap[colName]
if !ok {
return nil, errors.New("value not found")
return nil, false, errors.New("value not found")
}

switch t := value.(type) {
Expand All @@ -188,7 +188,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
case string:
enum, err := types.ParseEnum(allowed, t, "")
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
value = enum.Value
case nil:
Expand All @@ -202,7 +202,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
case string:
s, err := types.ParseSet(elems, t, "")
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
value = s.Value
case nil:
Expand All @@ -223,7 +223,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
if !isDelete {
o, ok := valueMap[tidbCommitTs]
if !ok {
return nil, errors.New("commit ts not found")
return nil, false, errors.New("commit ts not found")
}
commitTs = o.(int64)

Expand All @@ -234,7 +234,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
if checksum != "" {
expected, err = strconv.ParseUint(checksum, 10, 64)
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
if o, ok := valueMap[tidbCorrupted]; ok {
corrupted := o.(bool)
Expand All @@ -256,7 +256,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
}

if err := d.verifyChecksum(columns, expected); err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
}
}
Expand All @@ -275,7 +275,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
}
event.Columns = columns

return event, nil
return event, false, nil
}

// NextResolvedEvent returns the next resolved event if exists
Expand Down
3 changes: 2 additions & 1 deletion pkg/sink/codec/avro/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ func TestDecodeEvent(t *testing.T) {
require.True(t, exist)
require.Equal(t, model.MessageTypeRow, messageType)

decodedEvent, err := decoder.NextRowChangedEvent()
decodedEvent, onlyHandleKey, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.False(t, onlyHandleKey)
require.NotNil(t, decodedEvent)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sink/codec/builder/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func BenchmarkCraftDecoding(b *testing.B) {
if _, hasNext, err := decoder.HasNext(); err != nil {
panic(err)
} else if hasNext {
_, _ = decoder.NextRowChangedEvent()
_, _, _ = decoder.NextRowChangedEvent()
} else {
break
}
Expand All @@ -312,7 +312,7 @@ func BenchmarkJsonDecoding(b *testing.B) {
if _, hasNext, err := decoder.HasNext(); err != nil {
panic(err)
} else if hasNext {
_, _ = decoder.NextRowChangedEvent()
_, _, _ = decoder.NextRowChangedEvent()
} else {
break
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sink/codec/canal/canal_json_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,17 @@ func (b *batchDecoder) HasNext() (model.MessageType, bool, error) {

// NextRowChangedEvent implements the RowEventDecoder interface
// `HasNext` should be called before this.
func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, bool, error) {
if b.msg == nil || b.msg.messageType() != model.MessageTypeRow {
return nil, cerror.ErrCanalDecodeFailed.
return nil, false, cerror.ErrCanalDecodeFailed.
GenWithStack("not found row changed event message")
}
result, err := canalJSONMessage2RowChange(b.msg)
if err != nil {
return nil, err
return nil, false, err
}
b.msg = nil
return result, nil
return result, false, nil
}

// NextDDLEvent implements the RowEventDecoder interface
Expand Down
15 changes: 9 additions & 6 deletions pkg/sink/codec/canal/canal_json_decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) {
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, ty)

consumed, err := decoder.NextRowChangedEvent()
require.Nil(t, err)
consumed, onlyHandleKey, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.False(t, onlyHandleKey)

require.Equal(t, testCaseInsert.Table, consumed.Table)
if encodeEnable && decodeEnable {
Expand All @@ -76,8 +77,9 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) {
_, hasNext, _ = decoder.HasNext()
require.False(t, hasNext)

consumed, err = decoder.NextRowChangedEvent()
require.NotNil(t, err)
consumed, onlyHandleKey, err = decoder.NextRowChangedEvent()
require.Error(t, err)
require.False(t, onlyHandleKey)
require.Nil(t, consumed)
}
}
Expand Down Expand Up @@ -147,8 +149,9 @@ func TestCanalJSONBatchDecoderWithTerminator(t *testing.T) {
require.Nil(t, err)
require.Equal(t, model.MessageTypeRow, tp)
cnt++
event, err := decoder.NextRowChangedEvent()
require.Nil(t, err)
event, onlyHandleKey, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.False(t, onlyHandleKey)
require.NotNil(t, event)
}
require.Equal(t, 3, cnt)
Expand Down
15 changes: 7 additions & 8 deletions pkg/sink/codec/craft/craft_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,28 +52,27 @@ func (b *batchDecoder) NextResolvedEvent() (uint64, error) {
}

// NextRowChangedEvent implements the RowEventDecoder interface
func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, bool, error) {
ty, hasNext, err := b.HasNext()
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
if !hasNext || ty != model.MessageTypeRow {
return nil,
cerror.ErrCraftCodecInvalidData.GenWithStack("not found row changed event message")
return nil, false, cerror.ErrCraftCodecInvalidData.GenWithStack("not found row changed event message")
}
oldValue, newValue, err := b.decoder.RowChangedEvent(b.index)
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
ev := &model.RowChangedEvent{}
if oldValue != nil {
if ev.PreColumns, err = oldValue.ToModel(); err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
}
if newValue != nil {
if ev.Columns, err = newValue.ToModel(); err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
}
ev.CommitTs = b.headers.GetTs(b.index)
Expand All @@ -87,7 +86,7 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
ev.Table.IsPartition = true
}
b.index++
return ev, nil
return ev, false, nil
}

// NextDDLEvent implements the RowEventDecoder interface
Expand Down
10 changes: 6 additions & 4 deletions pkg/sink/codec/craft/craft_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ func TestCraftMaxBatchSize(t *testing.T) {
}

require.Equal(t, model.MessageTypeRow, v)
_, err = decoder.NextRowChangedEvent()
require.Nil(t, err)
_, onlyHandleKey, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.False(t, onlyHandleKey)
count++
}
require.LessOrEqual(t, count, 64)
Expand Down Expand Up @@ -121,8 +122,9 @@ func testBatchCodec(
break
}
require.Equal(t, model.MessageTypeRow, tp)
row, err := decoder.NextRowChangedEvent()
require.Nil(t, err)
row, onlyHandleKey, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.False(t, onlyHandleKey)
require.Equal(t, cs[index], row)
index++
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sink/codec/csv/csv_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ func (b *batchDecoder) NextResolvedEvent() (uint64, error) {
}

// NextRowChangedEvent implements the RowEventDecoder interface.
func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, bool, error) {
if b.closed {
return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed, errors.New("no csv row can be found"))
return nil, false, cerror.WrapError(cerror.ErrCSVDecodeFailed, errors.New("no csv row can be found"))
}

e, err := csvMsg2RowChangedEvent(b.msg, b.tableInfo.Columns)
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
return e, nil
return e, false, nil
}

// NextDDLEvent implements the RowEventDecoder interface.
Expand Down
5 changes: 3 additions & 2 deletions pkg/sink/codec/csv/csv_decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ func TestCSVBatchDecoder(t *testing.T) {
require.Nil(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, tp)
event, err := decoder.NextRowChangedEvent()
require.Nil(t, err)
event, onlyHandleKey, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.False(t, onlyHandleKey)
require.NotNil(t, event)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/sink/codec/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ type RowEventDecoder interface {
// NextResolvedEvent returns the next resolved event if exists
NextResolvedEvent() (uint64, error)
// NextRowChangedEvent returns the next row changed event if exists
NextRowChangedEvent() (*model.RowChangedEvent, error)
// if the event only has handle key sent, return true,
// consumer should query the row data by the handle key and commit-ts
NextRowChangedEvent() (*model.RowChangedEvent, bool, error)
// NextDDLEvent returns the next DDL event if exists
NextDDLEvent() (*model.DDLEvent, error)
}
5 changes: 3 additions & 2 deletions pkg/sink/codec/internal/batch_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,9 @@ func (s *BatchTester) TestBatchCodec(
break
}
require.Equal(t, model.MessageTypeRow, tp)
row, err := decoder.NextRowChangedEvent()
require.Nil(t, err)
row, onlyHandleKey, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.False(t, onlyHandleKey)
sortColumnArrays(row.Columns, row.PreColumns, cs[index].Columns, cs[index].PreColumns)
require.Equal(t, cs[index], row)
index++
Expand Down
Loading

0 comments on commit ee9db02

Please sign in to comment.