diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 4910e1dff34..fb646ca5c55 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -624,7 +624,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram c.appendDDL(ddl) } case model.MessageTypeRow: - row, err := decoder.NextRowChangedEvent() + row, _, err := decoder.NextRowChangedEvent() if err != nil { log.Panic("decode message value failed", zap.ByteString("value", message.Value), diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index 5360628538a..45c7c500998 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -334,7 +334,7 @@ func (c *consumer) emitDMLEvents( cnt++ if tp == model.MessageTypeRow { - row, err := decoder.NextRowChangedEvent() + row, _, err := decoder.NextRowChangedEvent() if err != nil { log.Error("failed to get next row changed event", zap.Error(err)) return errors.Trace(err) diff --git a/pkg/sink/codec/avro/decoder.go b/pkg/sink/codec/avro/decoder.go index e450126ed24..da6018211b4 100644 --- a/pkg/sink/codec/avro/decoder.go +++ b/pkg/sink/codec/avro/decoder.go @@ -97,7 +97,7 @@ func (d *decoder) HasNext() (model.MessageType, bool, error) { } // NextRowChangedEvent returns the next row changed event if exists -func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { +func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, bool, error) { var ( valueMap map[string]interface{} rawSchema string @@ -107,7 +107,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { ctx := context.Background() key, rawSchema, err := d.decodeKey(ctx) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } isDelete := len(d.value) == 0 @@ -116,25 +116,25 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { } else { valueMap, rawSchema, err = d.decodeValue(ctx) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } } schema := make(map[string]interface{}) if err := json.Unmarshal([]byte(rawSchema), &schema); err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } fields, ok := schema["fields"].([]interface{}) if !ok { - return nil, errors.New("schema fields should be a map") + return nil, false, errors.New("schema fields should be a map") } columns := make([]*model.Column, 0, len(valueMap)) for _, value := range fields { field, ok := value.(map[string]interface{}) if !ok { - return nil, errors.New("schema field should be a map") + return nil, false, errors.New("schema field should be a map") } // `tidbOp` is the first extension field in the schema, so we can break here. @@ -167,7 +167,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { value, ok := valueMap[colName] if !ok { - return nil, errors.New("value not found") + return nil, false, errors.New("value not found") } switch t := value.(type) { @@ -188,7 +188,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { case string: enum, err := types.ParseEnum(allowed, t, "") if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } value = enum.Value case nil: @@ -202,7 +202,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { case string: s, err := types.ParseSet(elems, t, "") if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } value = s.Value case nil: @@ -223,7 +223,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { if !isDelete { o, ok := valueMap[tidbCommitTs] if !ok { - return nil, errors.New("commit ts not found") + return nil, false, errors.New("commit ts not found") } commitTs = o.(int64) @@ -234,7 +234,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { if checksum != "" { expected, err = strconv.ParseUint(checksum, 10, 64) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } if o, ok := valueMap[tidbCorrupted]; ok { corrupted := o.(bool) @@ -256,7 +256,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { } if err := d.verifyChecksum(columns, expected); err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } } } @@ -275,7 +275,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { } event.Columns = columns - return event, nil + return event, false, nil } // NextResolvedEvent returns the next resolved event if exists diff --git a/pkg/sink/codec/avro/decoder_test.go b/pkg/sink/codec/avro/decoder_test.go index 50f956046ee..ea1b7a4be31 100644 --- a/pkg/sink/codec/avro/decoder_test.go +++ b/pkg/sink/codec/avro/decoder_test.go @@ -132,8 +132,9 @@ func TestDecodeEvent(t *testing.T) { require.True(t, exist) require.Equal(t, model.MessageTypeRow, messageType) - decodedEvent, err := decoder.NextRowChangedEvent() + decodedEvent, onlyHandleKey, err := decoder.NextRowChangedEvent() require.NoError(t, err) + require.False(t, onlyHandleKey) require.NotNil(t, decodedEvent) } diff --git a/pkg/sink/codec/builder/codec_test.go b/pkg/sink/codec/builder/codec_test.go index 4cb82895894..3c8c417308d 100644 --- a/pkg/sink/codec/builder/codec_test.go +++ b/pkg/sink/codec/builder/codec_test.go @@ -291,7 +291,7 @@ func BenchmarkCraftDecoding(b *testing.B) { if _, hasNext, err := decoder.HasNext(); err != nil { panic(err) } else if hasNext { - _, _ = decoder.NextRowChangedEvent() + _, _, _ = decoder.NextRowChangedEvent() } else { break } @@ -312,7 +312,7 @@ func BenchmarkJsonDecoding(b *testing.B) { if _, hasNext, err := decoder.HasNext(); err != nil { panic(err) } else if hasNext { - _, _ = decoder.NextRowChangedEvent() + _, _, _ = decoder.NextRowChangedEvent() } else { break } diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index 31b2ca17b46..54fab9cdb1e 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -92,17 +92,17 @@ func (b *batchDecoder) HasNext() (model.MessageType, bool, error) { // NextRowChangedEvent implements the RowEventDecoder interface // `HasNext` should be called before this. -func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { +func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, bool, error) { if b.msg == nil || b.msg.messageType() != model.MessageTypeRow { - return nil, cerror.ErrCanalDecodeFailed. + return nil, false, cerror.ErrCanalDecodeFailed. GenWithStack("not found row changed event message") } result, err := canalJSONMessage2RowChange(b.msg) if err != nil { - return nil, err + return nil, false, err } b.msg = nil - return result, nil + return result, false, nil } // NextDDLEvent implements the RowEventDecoder interface diff --git a/pkg/sink/codec/canal/canal_json_decoder_test.go b/pkg/sink/codec/canal/canal_json_decoder_test.go index 287efe2eb10..064258dfb24 100644 --- a/pkg/sink/codec/canal/canal_json_decoder_test.go +++ b/pkg/sink/codec/canal/canal_json_decoder_test.go @@ -51,8 +51,9 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { require.True(t, hasNext) require.Equal(t, model.MessageTypeRow, ty) - consumed, err := decoder.NextRowChangedEvent() - require.Nil(t, err) + consumed, onlyHandleKey, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.False(t, onlyHandleKey) require.Equal(t, testCaseInsert.Table, consumed.Table) if encodeEnable && decodeEnable { @@ -76,8 +77,9 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { _, hasNext, _ = decoder.HasNext() require.False(t, hasNext) - consumed, err = decoder.NextRowChangedEvent() - require.NotNil(t, err) + consumed, onlyHandleKey, err = decoder.NextRowChangedEvent() + require.Error(t, err) + require.False(t, onlyHandleKey) require.Nil(t, consumed) } } @@ -147,8 +149,9 @@ func TestCanalJSONBatchDecoderWithTerminator(t *testing.T) { require.Nil(t, err) require.Equal(t, model.MessageTypeRow, tp) cnt++ - event, err := decoder.NextRowChangedEvent() - require.Nil(t, err) + event, onlyHandleKey, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.False(t, onlyHandleKey) require.NotNil(t, event) } require.Equal(t, 3, cnt) diff --git a/pkg/sink/codec/craft/craft_decoder.go b/pkg/sink/codec/craft/craft_decoder.go index ab1cd1763b5..901882005f9 100644 --- a/pkg/sink/codec/craft/craft_decoder.go +++ b/pkg/sink/codec/craft/craft_decoder.go @@ -52,28 +52,27 @@ func (b *batchDecoder) NextResolvedEvent() (uint64, error) { } // NextRowChangedEvent implements the RowEventDecoder interface -func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { +func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, bool, error) { ty, hasNext, err := b.HasNext() if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } if !hasNext || ty != model.MessageTypeRow { - return nil, - cerror.ErrCraftCodecInvalidData.GenWithStack("not found row changed event message") + return nil, false, cerror.ErrCraftCodecInvalidData.GenWithStack("not found row changed event message") } oldValue, newValue, err := b.decoder.RowChangedEvent(b.index) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } ev := &model.RowChangedEvent{} if oldValue != nil { if ev.PreColumns, err = oldValue.ToModel(); err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } } if newValue != nil { if ev.Columns, err = newValue.ToModel(); err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } } ev.CommitTs = b.headers.GetTs(b.index) @@ -87,7 +86,7 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { ev.Table.IsPartition = true } b.index++ - return ev, nil + return ev, false, nil } // NextDDLEvent implements the RowEventDecoder interface diff --git a/pkg/sink/codec/craft/craft_encoder_test.go b/pkg/sink/codec/craft/craft_encoder_test.go index 195822189f1..5cd8305c93f 100644 --- a/pkg/sink/codec/craft/craft_encoder_test.go +++ b/pkg/sink/codec/craft/craft_encoder_test.go @@ -87,8 +87,9 @@ func TestCraftMaxBatchSize(t *testing.T) { } require.Equal(t, model.MessageTypeRow, v) - _, err = decoder.NextRowChangedEvent() - require.Nil(t, err) + _, onlyHandleKey, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.False(t, onlyHandleKey) count++ } require.LessOrEqual(t, count, 64) @@ -121,8 +122,9 @@ func testBatchCodec( break } require.Equal(t, model.MessageTypeRow, tp) - row, err := decoder.NextRowChangedEvent() - require.Nil(t, err) + row, onlyHandleKey, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.False(t, onlyHandleKey) require.Equal(t, cs[index], row) index++ } diff --git a/pkg/sink/codec/csv/csv_decoder.go b/pkg/sink/codec/csv/csv_decoder.go index 84e1f29d0fa..726b6da6ed6 100644 --- a/pkg/sink/codec/csv/csv_decoder.go +++ b/pkg/sink/codec/csv/csv_decoder.go @@ -104,16 +104,16 @@ func (b *batchDecoder) NextResolvedEvent() (uint64, error) { } // NextRowChangedEvent implements the RowEventDecoder interface. -func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { +func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, bool, error) { if b.closed { - return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed, errors.New("no csv row can be found")) + return nil, false, cerror.WrapError(cerror.ErrCSVDecodeFailed, errors.New("no csv row can be found")) } e, err := csvMsg2RowChangedEvent(b.msg, b.tableInfo.Columns) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } - return e, nil + return e, false, nil } // NextDDLEvent implements the RowEventDecoder interface. diff --git a/pkg/sink/codec/csv/csv_decoder_test.go b/pkg/sink/codec/csv/csv_decoder_test.go index 8b6f40a0f3e..165a91b0ddd 100644 --- a/pkg/sink/codec/csv/csv_decoder_test.go +++ b/pkg/sink/codec/csv/csv_decoder_test.go @@ -77,8 +77,9 @@ func TestCSVBatchDecoder(t *testing.T) { require.Nil(t, err) require.True(t, hasNext) require.Equal(t, model.MessageTypeRow, tp) - event, err := decoder.NextRowChangedEvent() - require.Nil(t, err) + event, onlyHandleKey, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.False(t, onlyHandleKey) require.NotNil(t, event) } diff --git a/pkg/sink/codec/decoder.go b/pkg/sink/codec/decoder.go index acfdb7ee216..8a3d2f86128 100644 --- a/pkg/sink/codec/decoder.go +++ b/pkg/sink/codec/decoder.go @@ -31,7 +31,9 @@ type RowEventDecoder interface { // NextResolvedEvent returns the next resolved event if exists NextResolvedEvent() (uint64, error) // NextRowChangedEvent returns the next row changed event if exists - NextRowChangedEvent() (*model.RowChangedEvent, error) + // if the event only has handle key sent, return true, + // consumer should query the row data by the handle key and commit-ts + NextRowChangedEvent() (*model.RowChangedEvent, bool, error) // NextDDLEvent returns the next DDL event if exists NextDDLEvent() (*model.DDLEvent, error) } diff --git a/pkg/sink/codec/internal/batch_tester.go b/pkg/sink/codec/internal/batch_tester.go index 1ea80cadaad..6fb5e4d89ad 100644 --- a/pkg/sink/codec/internal/batch_tester.go +++ b/pkg/sink/codec/internal/batch_tester.go @@ -239,8 +239,9 @@ func (s *BatchTester) TestBatchCodec( break } require.Equal(t, model.MessageTypeRow, tp) - row, err := decoder.NextRowChangedEvent() - require.Nil(t, err) + row, onlyHandleKey, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.False(t, onlyHandleKey) sortColumnArrays(row.Columns, row.PreColumns, cs[index].Columns, cs[index].PreColumns) require.Equal(t, cs[index], row) index++ diff --git a/pkg/sink/codec/open/open_protocol_decoder.go b/pkg/sink/codec/open/open_protocol_decoder.go index 0fc65a5ccae..9cc38c0a05e 100644 --- a/pkg/sink/codec/open/open_protocol_decoder.go +++ b/pkg/sink/codec/open/open_protocol_decoder.go @@ -60,26 +60,27 @@ func (b *BatchMixedDecoder) NextResolvedEvent() (uint64, error) { } // NextRowChangedEvent implements the RowEventDecoder interface -func (b *BatchMixedDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { +func (b *BatchMixedDecoder) NextRowChangedEvent() (*model.RowChangedEvent, bool, error) { if b.nextKey == nil { if err := b.decodeNextKey(); err != nil { - return nil, err + return nil, false, err } } b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] if b.nextKey.Type != model.MessageTypeRow { - return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found row event message") + return nil, false, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found row event message") } valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) value := b.mixedBytes[8 : valueLen+8] b.mixedBytes = b.mixedBytes[valueLen+8:] rowMsg := new(messageRow) if err := rowMsg.decode(value); err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } rowEvent := msgToRowChange(b.nextKey, rowMsg) + onlyHandleKey := b.nextKey.OnlyHandleKey b.nextKey = nil - return rowEvent, nil + return rowEvent, onlyHandleKey, nil } // NextDDLEvent implements the RowEventDecoder interface @@ -161,26 +162,27 @@ func (b *BatchDecoder) NextResolvedEvent() (uint64, error) { } // NextRowChangedEvent implements the RowEventDecoder interface -func (b *BatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { +func (b *BatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, bool, error) { if b.nextKey == nil { if err := b.decodeNextKey(); err != nil { - return nil, err + return nil, false, err } } b.keyBytes = b.keyBytes[b.nextKeyLen+8:] if b.nextKey.Type != model.MessageTypeRow { - return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found row event message") + return nil, false, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found row event message") } valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) value := b.valueBytes[8 : valueLen+8] b.valueBytes = b.valueBytes[valueLen+8:] rowMsg := new(messageRow) if err := rowMsg.decode(value); err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } rowEvent := msgToRowChange(b.nextKey, rowMsg) + onlyHandleKey := b.nextKey.OnlyHandleKey b.nextKey = nil - return rowEvent, nil + return rowEvent, onlyHandleKey, nil } // NextDDLEvent implements the RowEventDecoder interface diff --git a/pkg/sink/codec/open/open_protocol_decoder_test.go b/pkg/sink/codec/open/open_protocol_decoder_test.go new file mode 100644 index 00000000000..a871c9592c5 --- /dev/null +++ b/pkg/sink/codec/open/open_protocol_decoder_test.go @@ -0,0 +1,123 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package open + +import ( + "context" + "testing" + + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "github.com/stretchr/testify/require" +) + +var ( + insertEvent = &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "test", Table: "t"}, + Columns: []*model.Column{ + { + Name: "a", + Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, + Type: mysql.TypeLong, + Value: int64(1), + }, + { + Name: "b", + Type: mysql.TypeLong, + Value: int64(2), + }, + { + Name: "c", + Type: mysql.TypeLong, + Value: int64(3), + }, + }, + } +) + +func TestDecodeEvent(t *testing.T) { + config := common.NewConfig(config.ProtocolOpen) + encoder := NewBatchEncoderBuilder(config).Build() + + ctx := context.Background() + topic := "test" + err := encoder.AppendRowChangedEvent(ctx, topic, insertEvent, nil) + require.NoError(t, err) + + message := encoder.Build()[0] + + decoder := NewBatchDecoder() + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + tp, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, tp) + + obtained, onlyHandleKey, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.False(t, onlyHandleKey) + + obtainedColumns := make(map[string]*model.Column) + for _, col := range obtained.Columns { + obtainedColumns[col.Name] = col + } + + for _, col := range insertEvent.Columns { + require.Contains(t, obtainedColumns, col.Name) + } +} + +func TestDecodeEventOnlyHandleKeyColumns(t *testing.T) { + config := common.NewConfig(config.ProtocolOpen) + config.LargeMessageOnlyHandleKeyColumns = true + config.MaxMessageBytes = 185 + + encoder := NewBatchEncoderBuilder(config).Build() + + ctx := context.Background() + topic := "test" + err := encoder.AppendRowChangedEvent(ctx, topic, insertEvent, nil) + require.NoError(t, err) + + message := encoder.Build()[0] + + decoder := NewBatchDecoder() + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + tp, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, tp) + + obtained, onlyHandleKey, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.True(t, onlyHandleKey) + require.Equal(t, insertEvent.CommitTs, obtained.CommitTs) + + obtainedColumns := make(map[string]*model.Column) + for _, col := range obtained.Columns { + obtainedColumns[col.Name] = col + require.True(t, col.Flag.IsHandleKey()) + } + + for _, col := range insertEvent.Columns { + if col.Flag.IsHandleKey() { + require.Contains(t, obtainedColumns, col.Name) + } + } +} diff --git a/pkg/sink/codec/open/open_protocol_encoder_test.go b/pkg/sink/codec/open/open_protocol_encoder_test.go index 2dfc7e544b5..b814f4ac76a 100644 --- a/pkg/sink/codec/open/open_protocol_encoder_test.go +++ b/pkg/sink/codec/open/open_protocol_encoder_test.go @@ -113,8 +113,9 @@ func TestMaxBatchSize(t *testing.T) { } require.Equal(t, model.MessageTypeRow, v) - _, err = decoder.NextRowChangedEvent() - require.Nil(t, err) + _, onlyHandleKey, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.False(t, onlyHandleKey) count++ } require.LessOrEqual(t, count, 64)