Skip to content

Commit

Permalink
tests(ticdc): simple protocol claim check integration test enable che…
Browse files Browse the repository at this point in the history
…cksum (#11058) (#11082)

close #11057
  • Loading branch information
ti-chi-bot authored May 21, 2024
1 parent 1429fe2 commit 288bb64
Show file tree
Hide file tree
Showing 14 changed files with 351 additions and 294 deletions.
67 changes: 32 additions & 35 deletions pkg/sink/codec/simple/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ import (
"sync"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

func newTableSchemaMap(tableInfo *model.TableInfo) interface{} {
Expand All @@ -41,7 +38,7 @@ func newTableSchemaMap(tableInfo *model.TableInfo) interface{} {
for _, col := range idx.Columns {
columns = append(columns, col.Name.O)
colInfo := tableInfo.Columns[col.Offset]
// An index is not null when all columns of aer not null
// An index is not null when all columns of are not null
if !mysql.HasNotNullFlag(colInfo.GetFlag()) {
index["nullable"] = true
}
Expand Down Expand Up @@ -215,6 +212,13 @@ var (
},
}

// rowMapPool return map for each row
rowMapPool = sync.Pool{
New: func() any {
return make(map[string]interface{})
},
}

// dmlPayloadHolderPool return holder for the payload
dmlPayloadHolderPool = sync.Pool{
New: func() any {
Expand Down Expand Up @@ -283,8 +287,6 @@ func (a *avroMarshaller) newDMLMessageMap(
old := a.collectColumns(event.PreColumns, event.ColInfos, onlyHandleKey)
m["old"] = old
m["type"] = string(DMLTypeUpdate)
} else {
log.Panic("invalid event type, this should not hit", zap.Any("event", event))
}

m = map[string]interface{}{
Expand All @@ -306,11 +308,11 @@ func recycleMap(m map[string]interface{}) {
payload := holder["payload"].(map[string]interface{})
eventMap := payload["com.pingcap.simple.avro.DML"].(map[string]interface{})

checksumMap := eventMap["com.pingcap.simple.avro.Checksum"]
if checksumMap != nil {
holder := checksumMap.(map[string]interface{})
clear(holder)
genericMapPool.Put(holder)
checksum := eventMap["checksum"]
if checksum != nil {
checksum := checksum.(map[string]interface{})
clear(checksum)
genericMapPool.Put(checksum)
}

dataMap := eventMap["data"]
Expand All @@ -321,6 +323,8 @@ func recycleMap(m map[string]interface{}) {
clear(colMap)
genericMapPool.Put(col)
}
clear(dataMap)
rowMapPool.Put(dataMap)
}

oldDataMap := eventMap["old"]
Expand All @@ -331,6 +335,8 @@ func recycleMap(m map[string]interface{}) {
clear(colMap)
genericMapPool.Put(col)
}
clear(oldDataMap)
rowMapPool.Put(oldDataMap)
}
holder["payload"] = nil
dmlPayloadHolderPool.Put(holder)
Expand All @@ -341,18 +347,17 @@ func recycleMap(m map[string]interface{}) {
func (a *avroMarshaller) collectColumns(
columns []*model.Column, columnInfos []rowcodec.ColInfo, onlyHandleKey bool,
) map[string]interface{} {
result := make(map[string]interface{}, len(columns))
result := rowMapPool.Get().(map[string]interface{})
for idx, col := range columns {
if col == nil {
continue
}
if onlyHandleKey && !col.Flag.IsHandleKey() {
continue
if col != nil {
if onlyHandleKey && !col.Flag.IsHandleKey() {
continue
}
value, avroType := a.encodeValue4Avro(col.Value, columnInfos[idx].Ft)
holder := genericMapPool.Get().(map[string]interface{})
holder[avroType] = value
result[col.Name] = holder
}
value, avroType := a.encodeValue4Avro(col.Value, columnInfos[idx].Ft)
holder := genericMapPool.Get().(map[string]interface{})
holder[avroType] = value
result[col.Name] = holder
}

return map[string]interface{}{
Expand Down Expand Up @@ -446,16 +451,9 @@ func newTableSchemaFromAvroNative(native map[string]interface{}) *TableSchema {
}
}

func newMessageFromAvroNative(native interface{}, m *message) error {
rawValues, ok := native.(map[string]interface{})["com.pingcap.simple.avro.Message"].(map[string]interface{})
if !ok {
return cerror.ErrDecodeFailed.GenWithStack("cannot convert the avro message to map")
}

rawPayload, ok := rawValues["payload"].(map[string]interface{})
if !ok {
return cerror.ErrDecodeFailed.GenWithStack("cannot convert the avro payload to map")
}
func newMessageFromAvroNative(native interface{}, m *message) {
rawValues := native.(map[string]interface{})["com.pingcap.simple.avro.Message"].(map[string]interface{})
rawPayload := rawValues["payload"].(map[string]interface{})

rawMessage := rawPayload["com.pingcap.simple.avro.Watermark"]
if rawMessage != nil {
Expand All @@ -464,7 +462,7 @@ func newMessageFromAvroNative(native interface{}, m *message) error {
m.Type = MessageTypeWatermark
m.CommitTs = uint64(rawValues["commitTs"].(int64))
m.BuildTs = rawValues["buildTs"].(int64)
return nil
return
}

rawMessage = rawPayload["com.pingcap.simple.avro.Bootstrap"]
Expand All @@ -474,7 +472,7 @@ func newMessageFromAvroNative(native interface{}, m *message) error {
m.Type = MessageTypeBootstrap
m.BuildTs = rawValues["buildTs"].(int64)
m.TableSchema = newTableSchemaFromAvroNative(rawValues["tableSchema"].(map[string]interface{}))
return nil
return
}

rawMessage = rawPayload["com.pingcap.simple.avro.DDL"]
Expand All @@ -499,7 +497,7 @@ func newMessageFromAvroNative(native interface{}, m *message) error {
rawPreTableSchema = rawPreTableSchema["com.pingcap.simple.avro.TableSchema"].(map[string]interface{})
m.PreTableSchema = newTableSchemaFromAvroNative(rawPreTableSchema)
}
return nil
return
}

rawValues = rawPayload["com.pingcap.simple.avro.DML"].(map[string]interface{})
Expand All @@ -522,7 +520,6 @@ func newMessageFromAvroNative(native interface{}, m *message) error {
m.Checksum = newChecksum(rawValues)
m.Data = newDataMap(rawValues["data"])
m.Old = newDataMap(rawValues["old"])
return nil
}

func newChecksum(raw map[string]interface{}) *checksum {
Expand Down
64 changes: 23 additions & 41 deletions pkg/sink/codec/simple/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@ func NewDecoder(ctx context.Context, config *common.Config, db *sql.DB) (*Decode
}

m, err := newMarshaller(config)
if err != nil {
return nil, errors.Trace(err)
}

return &Decoder{
config: config,
marshaller: m,
Expand All @@ -82,21 +78,17 @@ func NewDecoder(ctx context.Context, config *common.Config, db *sql.DB) (*Decode

memo: newMemoryTableInfoProvider(),
cachedMessages: list.New(),
}, nil
}, errors.Trace(err)
}

// AddKeyValue add the received key and values to the Decoder,
func (d *Decoder) AddKeyValue(_, value []byte) error {
func (d *Decoder) AddKeyValue(_, value []byte) (err error) {
if d.value != nil {
return cerror.ErrDecodeFailed.GenWithStack(
return cerror.ErrCodecDecode.GenWithStack(
"Decoder value already exists, not consumed yet")
}
value, err := common.Decompress(d.config.LargeMessageHandle.LargeMessageHandleCompression, value)
if err != nil {
return err
}
d.value = value
return nil
d.value, err = common.Decompress(d.config.LargeMessageHandle.LargeMessageHandleCompression, value)
return err
}

// HasNext returns whether there is any event need to be consumed
Expand Down Expand Up @@ -166,15 +158,10 @@ func (d *Decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
}

event, err := buildRowChangedEvent(d.msg, tableInfo, d.config.EnableRowChecksum)
if err != nil {
return nil, err
}
event.Table = &model.TableName{
Schema: tableInfo.TableName.Schema,
Table: tableInfo.TableName.Table,
}
d.msg = nil
return event, nil

log.Debug("row changed event assembled", zap.Any("event", event))
return event, err
}

func (d *Decoder) assembleClaimCheckRowChangedEvent(claimCheckLocation string) (*model.RowChangedEvent, error) {
Expand Down Expand Up @@ -205,9 +192,14 @@ func (d *Decoder) assembleClaimCheckRowChangedEvent(claimCheckLocation string) (
func (d *Decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowChangedEvent, error) {
tableInfo := d.memo.Read(m.Schema, m.Table, m.SchemaVersion)
if tableInfo == nil {
return nil, cerror.ErrCodecDecode.GenWithStack(
"cannot found the table info, schema: %s, table: %s, version: %d",
m.Schema, m.Table, m.SchemaVersion)
log.Debug("table info not found for the event, "+
"the consumer should cache this event temporarily, and update the tableInfo after it's received",
zap.String("schema", d.msg.Schema),
zap.String("table", d.msg.Table),
zap.Uint64("version", d.msg.SchemaVersion))
d.cachedMessages.PushBack(d.msg)
d.msg = nil
return nil, nil
}

fieldTypeMap := make(map[string]*types.FieldType, len(tableInfo.Columns))
Expand All @@ -219,6 +211,7 @@ func (d *Decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowCh
Version: defaultVersion,
Schema: m.Schema,
Table: m.Table,
TableID: m.TableID,
Type: m.Type,
CommitTs: m.CommitTs,
SchemaVersion: m.SchemaVersion,
Expand Down Expand Up @@ -255,13 +248,7 @@ func (d *Decoder) buildData(
col := holder.Types[i]
value := holder.Values[i]

fieldType, ok := fieldTypeMap[col.Name()]
if !ok {
log.Panic("cannot found the field type",
zap.String("schema", d.msg.Schema),
zap.String("table", d.msg.Table),
zap.String("column", col.Name()))
}
fieldType := fieldTypeMap[col.Name()]
result[col.Name()] = encodeValue(value, fieldType, timezone)
}
return result
Expand All @@ -273,10 +260,7 @@ func (d *Decoder) NextDDLEvent() (*model.DDLEvent, error) {
return nil, cerror.ErrCodecDecode.GenWithStack(
"no message found when decode DDL event")
}
ddl, err := newDDLEvent(d.msg)
if err != nil {
return nil, err
}
ddl := newDDLEvent(d.msg)
d.msg = nil

d.memo.Write(ddl.TableInfo)
Expand All @@ -288,14 +272,12 @@ func (d *Decoder) NextDDLEvent() (*model.DDLEvent, error) {
if err != nil {
return nil, err
}
if event == nil {
ele = ele.Next()
continue
}
d.CachedRowChangedEvents = append(d.CachedRowChangedEvents, event)

next := ele.Next()
d.cachedMessages.Remove(ele)
if event != nil {
d.CachedRowChangedEvents = append(d.CachedRowChangedEvents, event)
d.cachedMessages.Remove(ele)
}
ele = next
}
return ddl, nil
Expand Down
19 changes: 6 additions & 13 deletions pkg/sink/codec/simple/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ func (e *encoder) AppendRowChangedEvent(

// Build implement the RowEventEncoder interface
func (e *encoder) Build() []*common.Message {
if len(e.messages) == 0 {
return nil
var result []*common.Message
if len(e.messages) != 0 {
result = e.messages
e.messages = nil
}
result := e.messages
e.messages = nil
return result
}

Expand All @@ -134,10 +134,7 @@ func (e *encoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) {

value, err = common.Compress(e.config.ChangefeedID,
e.config.LargeMessageHandle.LargeMessageHandleCompression, value)
if err != nil {
return nil, err
}
return common.NewResolvedMsg(config.ProtocolSimple, nil, value, ts), nil
return common.NewResolvedMsg(config.ProtocolSimple, nil, value, ts), err
}

// EncodeDDLEvent implement the DDLEventBatchEncoder interface
Expand Down Expand Up @@ -186,15 +183,11 @@ func NewBuilder(ctx context.Context, config *common.Config) (*builder, error) {
}

m, err := newMarshaller(config)
if err != nil {
return nil, errors.Trace(err)
}

return &builder{
config: config,
claimCheck: claimCheck,
marshaller: m,
}, nil
}, errors.Trace(err)
}

// Build implement the RowEventEncoderBuilder interface
Expand Down
Loading

0 comments on commit 288bb64

Please sign in to comment.