Skip to content

Commit

Permalink
codec(ticdc): kafka simple encoding protocol support encoding checkpo…
Browse files Browse the repository at this point in the history
…int event. (#9911)

close #9900
  • Loading branch information
3AceShowHand authored Oct 31, 2023
1 parent 44a7474 commit 21e9dfc
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 15 deletions.
5 changes: 5 additions & 0 deletions pkg/config/sink_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
ProtocolCraft
ProtocolOpen
ProtocolCsv
ProtocolSimple
)

// IsBatchEncode returns whether the protocol is a batch encoder.
Expand Down Expand Up @@ -66,6 +67,8 @@ func ParseSinkProtocolFromString(protocol string) (Protocol, error) {
return ProtocolOpen, nil
case "csv":
return ProtocolCsv, nil
case "simple":
return ProtocolSimple, nil
default:
return ProtocolUnknown, cerror.ErrSinkUnknownProtocol.GenWithStackByArgs(protocol)
}
Expand All @@ -90,6 +93,8 @@ func (p Protocol) String() string {
return "open-protocol"
case ProtocolCsv:
return "csv"
case ProtocolSimple:
return "simple"
default:
panic("unreachable")
}
Expand Down
55 changes: 44 additions & 11 deletions pkg/sink/codec/simple/decoder.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Inc.
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,33 +14,66 @@
package simple

import (
"encoding/json"

"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
)

type decoder struct{}
type decoder struct {
value []byte

msg *message
}

// NewDecoder returns a new decoder
func NewDecoder() *decoder {
// TODO implement me
panic("implement me")
return &decoder{}
}

// AddKeyValue add the received key and values to the decoder,
func (d *decoder) AddKeyValue(key, value []byte) error {
// TODO implement me
panic("implement me")
func (d *decoder) AddKeyValue(_, value []byte) error {
if d.value != nil {
return cerror.ErrDecodeFailed.GenWithStack(
"decoder value already exists, not consumed yet")
}
d.value = value
return nil
}

// HasNext returns whether there is any event need to be consumed
func (d *decoder) HasNext() (model.MessageType, bool, error) {
// TODO implement me
panic("implement me")
if d.value == nil {
return model.MessageTypeUnknown, false, nil
}

var m message
if err := json.Unmarshal(d.value, &m); err != nil {
return model.MessageTypeUnknown, false, cerror.WrapError(cerror.ErrDecodeFailed, err)
}
d.msg = &m
d.value = nil

if d.msg.Type == WatermarkType {
return model.MessageTypeResolved, true, nil
}
if d.msg.Data != nil || d.msg.Old != nil {
return model.MessageTypeRow, true, nil
}
return model.MessageTypeDDL, true, nil
}

// NextResolvedEvent returns the next resolved event if exists
func (d *decoder) NextResolvedEvent() (uint64, error) {
// TODO implement me
panic("implement me")
if d.msg.Type != WatermarkType {
return 0, cerror.ErrCodecDecode.GenWithStack(
"not found resolved event message")
}

ts := d.msg.CommitTs
d.msg = nil

return ts, nil
}

// NextRowChangedEvent returns the next row changed event if exists
Expand Down
20 changes: 16 additions & 4 deletions pkg/sink/codec/simple/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ package simple

import (
"context"
"encoding/json"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
)

Expand All @@ -27,8 +31,12 @@ type builder struct{}

// NewBuilder returns a new builder
func NewBuilder() *builder {
// TODO implement me
panic("implement me")
return &builder{}
}

// Build implement the RowEventEncoderBuilder interface
func (b *builder) Build() codec.RowEventEncoder {
return &encoder{}
}

// AppendRowChangedEvent implement the RowEventEncoder interface
Expand All @@ -53,8 +61,12 @@ func (e *encoder) Build() []*common.Message {
//
//nolint:unused
func (e *encoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) {
// TODO implement me
panic("implement me")
message := newResolvedMessage(ts)
value, err := json.Marshal(message)
if err != nil {
return nil, cerror.WrapError(cerror.ErrEncodeFailed, err)
}
return common.NewResolvedMsg(config.ProtocolSimple, nil, value, ts), nil
}

// EncodeDDLEvent implement the DDLEventBatchEncoder interface
Expand Down
44 changes: 44 additions & 0 deletions pkg/sink/codec/simple/encoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package simple

import (
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
)

func TestEncodeCheckpoint(t *testing.T) {
t.Parallel()

enc := NewBuilder().Build()

checkpoint := 23
m, err := enc.EncodeCheckpointEvent(uint64(checkpoint))
require.NoError(t, err)

dec := NewDecoder()
err = dec.AddKeyValue(m.Key, m.Value)
require.NoError(t, err)

messageType, hasNext, err := dec.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeResolved, messageType)

ts, err := dec.NextResolvedEvent()
require.NoError(t, err)
require.Equal(t, uint64(checkpoint), ts)
}
46 changes: 46 additions & 0 deletions pkg/sink/codec/simple/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package simple

const (
defaultVersion = 0
)

// EventType describes the type of the event.
type EventType string

const (
// WatermarkType is the type of the watermark event.
WatermarkType EventType = "WATERMARK"
)

type message struct {
Version int `json:"version"`
// Scheme and Table is empty for the resolved ts event.
Schema string `json:"schema,omitempty"`
Table string `json:"table,omitempty"`
Type EventType `json:"type"`
CommitTs uint64 `json:"commitTs"`
// Data is available for the row changed event.
Data map[string]interface{} `json:"data,omitempty"`
Old map[string]interface{} `json:"old,omitempty"`
}

func newResolvedMessage(ts uint64) *message {
return &message{
Version: defaultVersion,
Type: WatermarkType,
CommitTs: ts,
}
}

0 comments on commit 21e9dfc

Please sign in to comment.