Skip to content

Commit

Permalink
add compression related unit test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 6, 2023
1 parent 1e93072 commit 7133cac
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 265 deletions.
18 changes: 12 additions & 6 deletions pkg/sink/codec/simple/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"go.uber.org/zap"
)

type decoder struct {
value []byte

msg *message
config *common.Config

memo TableInfoProvider
value []byte
msg *message
memo TableInfoProvider
}

// NewDecoder returns a new decoder
func NewDecoder() *decoder {
func NewDecoder(config *common.Config) *decoder {
return &decoder{
memo: newMemoryTableInfoProvider(),
config: config,
memo: newMemoryTableInfoProvider(),
}
}

Expand All @@ -43,6 +45,10 @@ func (d *decoder) AddKeyValue(_, value []byte) error {
return cerror.ErrDecodeFailed.GenWithStack(
"decoder value already exists, not consumed yet")
}
value, err := common.Decompress(d.config.LargeMessageHandle.LargeMessageHandleCompression, value)
if err != nil {
return err
}
d.value = value
return nil
}
Expand Down
128 changes: 55 additions & 73 deletions pkg/sink/codec/simple/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func (e *encoder) AppendRowChangedEvent(
}

result := &common.Message{
Key: nil,
Value: value,
Ts: event.CommitTs,
Schema: &event.Table.Schema,
Expand All @@ -73,86 +72,47 @@ func (e *encoder) AppendRowChangedEvent(
zap.Any("table", event.Table))
return cerror.ErrMessageTooLarge.GenWithStackByArgs()
}
if e.config.LargeMessageHandle.HandleKeyOnly() {
m, err = newDMLMessage(event, true)
if err != nil {
return err
}
value, err = json.Marshal(m)
if err != nil {
return cerror.WrapError(cerror.ErrEncodeFailed, err)
}
value, err = common.Compress(e.config.ChangefeedID,
e.config.LargeMessageHandle.LargeMessageHandleCompression, value)
if err != nil {
return err
}
result.Value = value
if result.Length() > e.config.MaxMessageBytes {
log.Error("Single message is still too large for simple only encode handle key columns",
zap.Int("maxMessageBytes", e.config.MaxMessageBytes),
zap.Int("length", result.Length()),
zap.Any("table", event.Table))
return cerror.ErrMessageTooLarge.GenWithStackByArgs()
}
log.Warn("Single message is too large for simple, only encode handle-key columns",
zap.Int("maxMessageBytes", e.config.MaxMessageBytes),
zap.Int("originLength", result.Length()),
zap.Int("length", result.Length()),
zap.Any("table", event.Table))

m, err = newDMLMessage(event, true)
if err != nil {
return err
}
if e.config.LargeMessageHandle.EnableClaimCheck() {
claimCheckFileName := claimcheck.NewFileName()
if err := e.claimCheck.WriteMessage(ctx, result.Key, result.Value, claimCheckFileName); err != nil {
return errors.Trace(err)
}

result, err = e.newClaimCheckLocationMessage(event, callback, claimCheckFileName)
if err != nil {
if e.config.LargeMessageHandle.EnableClaimCheck() {
fileName := claimcheck.NewFileName()
if err = e.claimCheck.WriteMessage(ctx, result.Key, result.Value, fileName); err != nil {
return errors.Trace(err)
}
m.ClaimCheckLocation = e.claimCheck.FileNameWithPrefix(fileName)
}
value, err = json.Marshal(m)
if err != nil {
return cerror.WrapError(cerror.ErrEncodeFailed, err)
}
value, err = common.Compress(e.config.ChangefeedID,
e.config.LargeMessageHandle.LargeMessageHandleCompression, value)
if err != nil {
return err
}
result.Value = value
if result.Length() > e.config.MaxMessageBytes {
log.Error("Single message is still too large for simple",
zap.Int("maxMessageBytes", e.config.MaxMessageBytes),
zap.Int("length", result.Length()),
zap.Any("table", event.Table))
return cerror.ErrMessageTooLarge.GenWithStackByArgs()
}
log.Warn("Single message is too large for simple",
zap.Int("maxMessageBytes", e.config.MaxMessageBytes),
zap.Int("originLength", result.Length()),
zap.Int("length", result.Length()),
zap.Any("table", event.Table))
}

e.messages = append(e.messages, result)
return nil
}

func (e *encoder) newClaimCheckLocationMessage(
event *model.RowChangedEvent, callback func(), fileName string,
) (*common.Message, error) {
m, err := newDMLMessage(event, true)
if err != nil {
return nil, err
}
m.ClaimCheckLocation = e.claimCheck.FileNameWithPrefix(fileName)

value, err := json.Marshal(m)
if err != nil {
return nil, cerror.WrapError(cerror.ErrEncodeFailed, err)
}

value, err = common.Compress(e.config.ChangefeedID,
e.config.LargeMessageHandle.LargeMessageHandleCompression, value)
if err != nil {
return nil, errors.Trace(err)
}

result := common.NewMsg(config.ProtocolSimple, nil, value, 0, model.MessageTypeRow, nil, nil)
result.Callback = callback
result.IncRowsCount()

length := result.Length()
if length > e.config.MaxMessageBytes {
log.Warn("Single message is too large for canal-json, when create the claim check location message",
zap.Int("maxMessageBytes", e.config.MaxMessageBytes),
zap.Int("length", length),
zap.Any("table", event.Table))
return nil, cerror.ErrMessageTooLarge.GenWithStackByArgs(length)
}
return result, nil
}

// Build implement the RowEventEncoder interface
func (e *encoder) Build() []*common.Message {
if len(e.messages) == 0 {
Expand All @@ -170,17 +130,39 @@ func (e *encoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) {
if err != nil {
return nil, cerror.WrapError(cerror.ErrEncodeFailed, err)
}
value, err = common.Compress(e.config.ChangefeedID,
e.config.LargeMessageHandle.LargeMessageHandleCompression, value)
if err != nil {
return nil, err
}
return common.NewResolvedMsg(config.ProtocolSimple, nil, value, ts), nil
}

// EncodeDDLEvent implement the DDLEventBatchEncoder interface
func (e *encoder) EncodeDDLEvent(event *model.DDLEvent) (*common.Message, error) {
message := newDDLMessage(event)
value, err := json.Marshal(message)
m := newDDLMessage(event)
value, err := json.Marshal(m)
if err != nil {
return nil, cerror.WrapError(cerror.ErrEncodeFailed, err)
}
return common.NewDDLMsg(config.ProtocolSimple, nil, value, event), nil

value, err = common.Compress(e.config.ChangefeedID,
e.config.LargeMessageHandle.LargeMessageHandleCompression, value)
if err != nil {
return nil, err
}

result := common.NewDDLMsg(config.ProtocolSimple, nil, value, event)
if result.Length() > e.config.MaxMessageBytes {
if e.config.LargeMessageHandle.Disabled() {
log.Error("DDL message is too large for simple",
zap.Int("maxMessageBytes", e.config.MaxMessageBytes),
zap.Int("length", result.Length()),
zap.Any("table", event.TableInfo.TableName))
return nil, cerror.ErrMessageTooLarge.GenWithStackByArgs()
}
}
return result, nil
}

type builder struct {
Expand Down
Loading

0 comments on commit 7133cac

Please sign in to comment.