From 21e9dfc5ab581bea4d23dc0f4540821f0e5f5cf2 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Tue, 31 Oct 2023 05:38:08 -0500 Subject: [PATCH] codec(ticdc): kafka simple encoding protocol support encoding checkpoint event. (#9911) close pingcap/tiflow#9900 --- pkg/config/sink_protocol.go | 5 +++ pkg/sink/codec/simple/decoder.go | 55 +++++++++++++++++++++------ pkg/sink/codec/simple/encoder.go | 20 ++++++++-- pkg/sink/codec/simple/encoder_test.go | 44 +++++++++++++++++++++ pkg/sink/codec/simple/message.go | 46 ++++++++++++++++++++++ 5 files changed, 155 insertions(+), 15 deletions(-) create mode 100644 pkg/sink/codec/simple/encoder_test.go create mode 100644 pkg/sink/codec/simple/message.go diff --git a/pkg/config/sink_protocol.go b/pkg/config/sink_protocol.go index 0b4d4138059..87c99c55bfa 100644 --- a/pkg/config/sink_protocol.go +++ b/pkg/config/sink_protocol.go @@ -38,6 +38,7 @@ const ( ProtocolCraft ProtocolOpen ProtocolCsv + ProtocolSimple ) // IsBatchEncode returns whether the protocol is a batch encoder. @@ -66,6 +67,8 @@ func ParseSinkProtocolFromString(protocol string) (Protocol, error) { return ProtocolOpen, nil case "csv": return ProtocolCsv, nil + case "simple": + return ProtocolSimple, nil default: return ProtocolUnknown, cerror.ErrSinkUnknownProtocol.GenWithStackByArgs(protocol) } @@ -90,6 +93,8 @@ func (p Protocol) String() string { return "open-protocol" case ProtocolCsv: return "csv" + case ProtocolSimple: + return "simple" default: panic("unreachable") } diff --git a/pkg/sink/codec/simple/decoder.go b/pkg/sink/codec/simple/decoder.go index ef8e4598bc0..2913a50839d 100644 --- a/pkg/sink/codec/simple/decoder.go +++ b/pkg/sink/codec/simple/decoder.go @@ -1,4 +1,4 @@ -// Copyright 2022 PingCAP, Inc. +// Copyright 2023 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,33 +14,66 @@ package simple import ( + "encoding/json" + "github.com/pingcap/tiflow/cdc/model" + cerror "github.com/pingcap/tiflow/pkg/errors" ) -type decoder struct{} +type decoder struct { + value []byte + + msg *message +} // NewDecoder returns a new decoder func NewDecoder() *decoder { - // TODO implement me - panic("implement me") + return &decoder{} } // AddKeyValue add the received key and values to the decoder, -func (d *decoder) AddKeyValue(key, value []byte) error { - // TODO implement me - panic("implement me") +func (d *decoder) AddKeyValue(_, value []byte) error { + if d.value != nil { + return cerror.ErrDecodeFailed.GenWithStack( + "decoder value already exists, not consumed yet") + } + d.value = value + return nil } // HasNext returns whether there is any event need to be consumed func (d *decoder) HasNext() (model.MessageType, bool, error) { - // TODO implement me - panic("implement me") + if d.value == nil { + return model.MessageTypeUnknown, false, nil + } + + var m message + if err := json.Unmarshal(d.value, &m); err != nil { + return model.MessageTypeUnknown, false, cerror.WrapError(cerror.ErrDecodeFailed, err) + } + d.msg = &m + d.value = nil + + if d.msg.Type == WatermarkType { + return model.MessageTypeResolved, true, nil + } + if d.msg.Data != nil || d.msg.Old != nil { + return model.MessageTypeRow, true, nil + } + return model.MessageTypeDDL, true, nil } // NextResolvedEvent returns the next resolved event if exists func (d *decoder) NextResolvedEvent() (uint64, error) { - // TODO implement me - panic("implement me") + if d.msg.Type != WatermarkType { + return 0, cerror.ErrCodecDecode.GenWithStack( + "not found resolved event message") + } + + ts := d.msg.CommitTs + d.msg = nil + + return ts, nil } // NextRowChangedEvent returns the next row changed event if exists diff --git a/pkg/sink/codec/simple/encoder.go b/pkg/sink/codec/simple/encoder.go index babf9d8cf20..be0b34aa89a 100644 --- a/pkg/sink/codec/simple/encoder.go +++ b/pkg/sink/codec/simple/encoder.go @@ -15,8 +15,12 @@ package simple import ( "context" + "encoding/json" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec" "github.com/pingcap/tiflow/pkg/sink/codec/common" ) @@ -27,8 +31,12 @@ type builder struct{} // NewBuilder returns a new builder func NewBuilder() *builder { - // TODO implement me - panic("implement me") + return &builder{} +} + +// Build implement the RowEventEncoderBuilder interface +func (b *builder) Build() codec.RowEventEncoder { + return &encoder{} } // AppendRowChangedEvent implement the RowEventEncoder interface @@ -53,8 +61,12 @@ func (e *encoder) Build() []*common.Message { // //nolint:unused func (e *encoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) { - // TODO implement me - panic("implement me") + message := newResolvedMessage(ts) + value, err := json.Marshal(message) + if err != nil { + return nil, cerror.WrapError(cerror.ErrEncodeFailed, err) + } + return common.NewResolvedMsg(config.ProtocolSimple, nil, value, ts), nil } // EncodeDDLEvent implement the DDLEventBatchEncoder interface diff --git a/pkg/sink/codec/simple/encoder_test.go b/pkg/sink/codec/simple/encoder_test.go new file mode 100644 index 00000000000..ea165e8eaae --- /dev/null +++ b/pkg/sink/codec/simple/encoder_test.go @@ -0,0 +1,44 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package simple + +import ( + "testing" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestEncodeCheckpoint(t *testing.T) { + t.Parallel() + + enc := NewBuilder().Build() + + checkpoint := 23 + m, err := enc.EncodeCheckpointEvent(uint64(checkpoint)) + require.NoError(t, err) + + dec := NewDecoder() + err = dec.AddKeyValue(m.Key, m.Value) + require.NoError(t, err) + + messageType, hasNext, err := dec.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeResolved, messageType) + + ts, err := dec.NextResolvedEvent() + require.NoError(t, err) + require.Equal(t, uint64(checkpoint), ts) +} diff --git a/pkg/sink/codec/simple/message.go b/pkg/sink/codec/simple/message.go new file mode 100644 index 00000000000..6e5cb8fc3fd --- /dev/null +++ b/pkg/sink/codec/simple/message.go @@ -0,0 +1,46 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package simple + +const ( + defaultVersion = 0 +) + +// EventType describes the type of the event. +type EventType string + +const ( + // WatermarkType is the type of the watermark event. + WatermarkType EventType = "WATERMARK" +) + +type message struct { + Version int `json:"version"` + // Scheme and Table is empty for the resolved ts event. + Schema string `json:"schema,omitempty"` + Table string `json:"table,omitempty"` + Type EventType `json:"type"` + CommitTs uint64 `json:"commitTs"` + // Data is available for the row changed event. + Data map[string]interface{} `json:"data,omitempty"` + Old map[string]interface{} `json:"old,omitempty"` +} + +func newResolvedMessage(ts uint64) *message { + return &message{ + Version: defaultVersion, + Type: WatermarkType, + CommitTs: ts, + } +}