Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec(ticdc): kafka simple encoding protocol support encoding checkpoint event. #9911

Merged
merged 6 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/config/sink_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
ProtocolCraft
ProtocolOpen
ProtocolCsv
ProtocolSimple
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use "Simple" as the new protocol name? We will add claim-check large message feature for this protocol, we will add water mark, checkpoint, schema tracking & bootstrap and more other features, can we use a proper name that can represent its functionalities? How about SufficientJson since it is json, but not each message contains the full schema information and is more sufficient than CanalJson?

Copy link
Contributor

@zhangjinpeng87 zhangjinpeng87 Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or ExtensibleJson which means we added many customized feature for this new protocol?

Copy link
Contributor Author

@3AceShowHand 3AceShowHand Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is not decided yet, use simple temporarily.

Json should not be part of the protocol, since this new protocol does not only support JSON format, we may make it support other encoding formats such as avro, cbor.

Sufficient sounds good to me. Extensible is also a good name, but it's not easy to explain the customized, since this new protocol belongs to the TiCDC itself, all things are customized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the concrete name, further discussion is required.

)

// IsBatchEncode returns whether the protocol is a batch encoder.
Expand Down Expand Up @@ -66,6 +67,8 @@ func ParseSinkProtocolFromString(protocol string) (Protocol, error) {
return ProtocolOpen, nil
case "csv":
return ProtocolCsv, nil
case "simple":
return ProtocolSimple, nil
default:
return ProtocolUnknown, cerror.ErrSinkUnknownProtocol.GenWithStackByArgs(protocol)
}
Expand All @@ -90,6 +93,8 @@ func (p Protocol) String() string {
return "open-protocol"
case ProtocolCsv:
return "csv"
case ProtocolSimple:
return "simple"
default:
panic("unreachable")
}
Expand Down
55 changes: 44 additions & 11 deletions pkg/sink/codec/simple/decoder.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Inc.
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,33 +14,66 @@
package simple

import (
"encoding/json"

"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
)

type decoder struct{}
type decoder struct {
value []byte

msg *message
}

// NewDecoder returns a new decoder
func NewDecoder() *decoder {
// TODO implement me
panic("implement me")
return &decoder{}
}

// AddKeyValue add the received key and values to the decoder,
func (d *decoder) AddKeyValue(key, value []byte) error {
// TODO implement me
panic("implement me")
func (d *decoder) AddKeyValue(_, value []byte) error {
if d.value != nil {
return cerror.ErrDecodeFailed.GenWithStack(
"decoder value already exists, not consumed yet")
}
d.value = value
return nil
}

// HasNext returns whether there is any event need to be consumed
func (d *decoder) HasNext() (model.MessageType, bool, error) {
// TODO implement me
panic("implement me")
if d.value == nil {
return model.MessageTypeUnknown, false, nil
}

var m message
if err := json.Unmarshal(d.value, &m); err != nil {
return model.MessageTypeUnknown, false, cerror.WrapError(cerror.ErrDecodeFailed, err)
}
d.msg = &m
d.value = nil

if d.msg.Type == WatermarkType {
return model.MessageTypeResolved, true, nil
}
if d.msg.Data != nil || d.msg.Old != nil {
return model.MessageTypeRow, true, nil
}
return model.MessageTypeDDL, true, nil
}

// NextResolvedEvent returns the next resolved event if exists
func (d *decoder) NextResolvedEvent() (uint64, error) {
// TODO implement me
panic("implement me")
if d.msg.Type != WatermarkType {
return 0, cerror.ErrCodecDecode.GenWithStack(
"not found resolved event message")
}

ts := d.msg.CommitTs
d.msg = nil

return ts, nil
}

// NextRowChangedEvent returns the next row changed event if exists
Expand Down
20 changes: 16 additions & 4 deletions pkg/sink/codec/simple/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ package simple

import (
"context"
"encoding/json"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
)

Expand All @@ -27,8 +31,12 @@ type builder struct{}

// NewBuilder returns a new builder
func NewBuilder() *builder {
// TODO implement me
panic("implement me")
return &builder{}
}

// Build implement the RowEventEncoderBuilder interface
func (b *builder) Build() codec.RowEventEncoder {
return &encoder{}
}

// AppendRowChangedEvent implement the RowEventEncoder interface
Expand All @@ -53,8 +61,12 @@ func (e *encoder) Build() []*common.Message {
//
//nolint:unused
func (e *encoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) {
// TODO implement me
panic("implement me")
message := newResolvedMessage(ts)
value, err := json.Marshal(message)
if err != nil {
return nil, cerror.WrapError(cerror.ErrEncodeFailed, err)
}
return common.NewResolvedMsg(config.ProtocolSimple, nil, value, ts), nil
}

// EncodeDDLEvent implement the DDLEventBatchEncoder interface
Expand Down
44 changes: 44 additions & 0 deletions pkg/sink/codec/simple/encoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package simple

import (
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
)

func TestEncodeCheckpoint(t *testing.T) {
t.Parallel()

enc := NewBuilder().Build()

checkpoint := 23
m, err := enc.EncodeCheckpointEvent(uint64(checkpoint))
require.NoError(t, err)

dec := NewDecoder()
err = dec.AddKeyValue(m.Key, m.Value)
require.NoError(t, err)

messageType, hasNext, err := dec.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeResolved, messageType)

ts, err := dec.NextResolvedEvent()
require.NoError(t, err)
require.Equal(t, uint64(checkpoint), ts)
}
46 changes: 46 additions & 0 deletions pkg/sink/codec/simple/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package simple

const (
defaultVersion = 0
)

// EventType describes the type of the event.
type EventType string

const (
// WatermarkType is the type of the watermark event.
WatermarkType EventType = "WATERMARK"
)

type message struct {
Version int `json:"version"`
// Scheme and Table is empty for the resolved ts event.
Schema string `json:"schema,omitempty"`
Table string `json:"table,omitempty"`
Type EventType `json:"type"`
CommitTs uint64 `json:"commitTs"`
// Data is available for the row changed event.
Data map[string]interface{} `json:"data,omitempty"`
Old map[string]interface{} `json:"old,omitempty"`
}

func newResolvedMessage(ts uint64) *message {
return &message{
Version: defaultVersion,
Type: WatermarkType,
CommitTs: ts,
}
}
Loading