Skip to content

Commit

Permalink
codec(ticdc): simple protocol support encode dml event (#10112)
Browse files Browse the repository at this point in the history
close #9902, close #10185, close #10186
  • Loading branch information
3AceShowHand authored Nov 30, 2023
1 parent eb96b96 commit e13b048
Show file tree
Hide file tree
Showing 23 changed files with 1,447 additions and 261 deletions.
1 change: 1 addition & 0 deletions cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
ctx := context.Background()
pdClient := &mockPDClient{}
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
helper.Tk().MustExec("use test;")
storage := helper.Storage()
ctrl := mock_controller.NewMockController(gomock.NewController(t))
Expand Down
161 changes: 155 additions & 6 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package entry

import (
"context"
"encoding/json"
"strings"
"testing"
"time"

ticonfig "github.com/pingcap/tidb/config"
tiddl "github.com/pingcap/tidb/ddl"
Expand All @@ -27,6 +29,11 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)
Expand All @@ -37,10 +44,17 @@ type SchemaTestHelper struct {
tk *testkit.TestKit
storage kv.Storage
domain *domain.Domain

schemaStorage SchemaStorage
mounter Mounter
filter filter.Filter
}

// NewSchemaTestHelper creates a SchemaTestHelper
func NewSchemaTestHelper(t *testing.T) *SchemaTestHelper {
// NewSchemaTestHelperWithReplicaConfig creates a SchemaTestHelper
// by using the given replica config.
func NewSchemaTestHelperWithReplicaConfig(
t *testing.T, replicaConfig *config.ReplicaConfig,
) *SchemaTestHelper {
store, err := mockstore.NewMockStore()
require.Nil(t, err)
ticonfig.UpdateGlobal(func(conf *ticonfig.Config) {
Expand All @@ -52,14 +66,40 @@ func NewSchemaTestHelper(t *testing.T) *SchemaTestHelper {
require.Nil(t, err)
domain.SetStatsUpdating(true)
tk := testkit.NewTestKit(t, store)

filter, err := filter.NewFilter(replicaConfig, "")
require.NoError(t, err)

ver, err := store.CurrentVersion(oracle.GlobalTxnScope)
require.NoError(t, err)

changefeedID := model.DefaultChangeFeedID("changefeed-testkit")

meta := timeta.NewSnapshotMeta(store.GetSnapshot(ver))
schemaStorage, err := NewSchemaStorage(
meta, ver.Ver, replicaConfig.ForceReplicate,
changefeedID, util.RoleTester, filter)
require.NoError(t, err)

mounter := NewMounter(schemaStorage, changefeedID, time.Local,
filter, replicaConfig.Integrity)

return &SchemaTestHelper{
t: t,
tk: tk,
storage: store,
domain: domain,
t: t,
tk: tk,
storage: store,
domain: domain,
filter: filter,
schemaStorage: schemaStorage,
mounter: mounter,
}
}

// NewSchemaTestHelper creates a SchemaTestHelper
func NewSchemaTestHelper(t *testing.T) *SchemaTestHelper {
return NewSchemaTestHelperWithReplicaConfig(t, config.GetDefaultReplicaConfig())
}

// DDL2Job executes the DDL stmt and returns the DDL job
func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
s.tk.MustExec(ddl)
Expand Down Expand Up @@ -127,6 +167,115 @@ func (s *SchemaTestHelper) DDL2Jobs(ddl string, jobCnt int) []*timodel.Job {
return jobs
}

// DML2Event execute the dml and return the corresponding row changed event.
// caution: it does not support `delete` since the key value cannot be found
// after the query executed.
func (s *SchemaTestHelper) DML2Event(dml string, schema, table string) *model.RowChangedEvent {
s.tk.MustExec(dml)

tableInfo, ok := s.schemaStorage.GetLastSnapshot().TableByName(schema, table)
require.True(s.t, ok)

key, value := s.getLastKeyValue(tableInfo.ID)
ts := s.schemaStorage.GetLastSnapshot().CurrentTs()
rawKV := &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
OldValue: nil,
StartTs: ts - 1,
CRTs: ts + 1,
}
polymorphicEvent := model.NewPolymorphicEvent(rawKV)
err := s.mounter.DecodeEvent(context.Background(), polymorphicEvent)
require.NoError(s.t, err)
return polymorphicEvent.Row
}

func (s *SchemaTestHelper) getLastKeyValue(tableID int64) (key, value []byte) {
txn, err := s.storage.Begin()
require.NoError(s.t, err)
defer txn.Rollback() //nolint:errcheck

start, end := spanz.GetTableRange(tableID)
iter, err := txn.Iter(start, end)
require.NoError(s.t, err)
defer iter.Close()
for iter.Valid() {
key = iter.Key()
value = iter.Value()
err = iter.Next()
require.NoError(s.t, err)
}
return key, value
}

// DDL2Event executes the DDL and return the corresponding event.
func (s *SchemaTestHelper) DDL2Event(ddl string) *model.DDLEvent {
s.tk.MustExec(ddl)
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), 1)
require.NoError(s.t, err)
require.Len(s.t, jobs, 1)
// Set State from Synced to Done.
// Because jobs are put to history queue after TiDB alter its state from
// Done to Synced.
jobs[0].State = timodel.JobStateDone
res := jobs[0]
if res.Type == timodel.ActionRenameTables {
// the RawArgs field in job fetched from tidb snapshot meta is incorrent,
// so we manually construct `job.RawArgs` to do the workaround.
// we assume the old schema name is same as the new schema name here.
// for example, "ALTER TABLE RENAME test.t1 TO test.t1, test.t2 to test.t22", schema name is "test"
schema := strings.Split(strings.Split(strings.Split(res.Query, ",")[1], " ")[1], ".")[0]
tableNum := len(res.BinlogInfo.MultipleTableInfos)
oldSchemaIDs := make([]int64, tableNum)
for i := 0; i < tableNum; i++ {
oldSchemaIDs[i] = res.SchemaID
}
oldTableIDs := make([]int64, tableNum)
for i := 0; i < tableNum; i++ {
oldTableIDs[i] = res.BinlogInfo.MultipleTableInfos[i].ID
}
newTableNames := make([]timodel.CIStr, tableNum)
for i := 0; i < tableNum; i++ {
newTableNames[i] = res.BinlogInfo.MultipleTableInfos[i].Name
}
oldSchemaNames := make([]timodel.CIStr, tableNum)
for i := 0; i < tableNum; i++ {
oldSchemaNames[i] = timodel.NewCIStr(schema)
}
newSchemaIDs := oldSchemaIDs

args := []interface{}{
oldSchemaIDs, newSchemaIDs,
newTableNames, oldTableIDs, oldSchemaNames,
}
rawArgs, err := json.Marshal(args)
require.NoError(s.t, err)
res.RawArgs = rawArgs
}

err = s.schemaStorage.HandleDDLJob(res)
require.NoError(s.t, err)

ver, err := s.storage.CurrentVersion(oracle.GlobalTxnScope)
require.NoError(s.t, err)
s.schemaStorage.AdvanceResolvedTs(ver.Ver)

tableInfo, ok := s.schemaStorage.GetLastSnapshot().TableByName(res.SchemaName, res.TableName)
require.True(s.t, ok)

event := &model.DDLEvent{
StartTs: res.StartTS,
CommitTs: res.BinlogInfo.FinishedTS,
TableInfo: tableInfo,
Query: res.Query,
Type: res.Type,
}

return event
}

// Storage returns the tikv storage
func (s *SchemaTestHelper) Storage() kv.Storage {
return s.storage
Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,12 @@ func (m *ddlManager) tick(
tableCheckpoint map[model.TableName]model.Ts,
) ([]model.TableID, *schedulepb.BarrierWithMinTs, error) {
if m.needSendBootstrapEvent {
ok, err := m.checkAndBootstrap(ctx)
finished, err := m.checkAndBootstrap(ctx)
if err != nil {
return nil, nil, err
}
if !ok {
return nil, nil, nil
if !finished {
return nil, schedulepb.NewBarrierWithMinTs(checkpointTs), nil
}
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/redo/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ func TestGCAndCleanup(t *testing.T) {
}

// write some log files
require.NoError(t, err)
maxCommitTs := 20
for i := 1; i <= maxCommitTs; i++ {
for _, logType := range []string{redo.RedoRowLogFileType, redo.RedoDDLLogFileType} {
Expand All @@ -294,6 +293,7 @@ func TestGCAndCleanup(t *testing.T) {
EncodingWorkerNum: redo.DefaultEncodingWorkerNum,
FlushWorkerNum: redo.DefaultFlushWorkerNum,
}

m := NewMetaManager(changefeedID, cfg, startTs)

var eg errgroup.Group
Expand Down
3 changes: 3 additions & 0 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/pingcap/tiflow/pkg/sink/codec/canal"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/codec/open"
"github.com/pingcap/tiflow/pkg/sink/codec/simple"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
Expand Down Expand Up @@ -628,6 +629,8 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
return cerror.Trace(err)
}
decoder = avro.NewDecoder(c.codecConfig, schemaM, c.option.topic, c.tz)
case config.ProtocolSimple:
decoder = simple.NewDecoder()
default:
log.Panic("Protocol not supported", zap.Any("Protocol", c.codecConfig.Protocol))
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sink/codec/canal/canal_json_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/codec/utils"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
"golang.org/x/text/encoding"
Expand Down Expand Up @@ -172,7 +173,7 @@ func (b *batchDecoder) buildData(holder *common.ColumnsHolder) (map[string]inter

var value string
rawValue := holder.Values[i].([]uint8)
if isBinaryMySQLType(mysqlType) {
if utils.IsBinaryMySQLType(mysqlType) {
rawValue, err := b.bytesDecoder.Bytes(rawValue)
if err != nil {
return nil, nil, errors.Trace(err)
Expand Down
6 changes: 1 addition & 5 deletions pkg/sink/codec/canal/canal_json_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func canalJSONFormatColumn(value interface{}, name string, mysqlTypeStr string)
}

var err error
if isBinaryMySQLType(mysqlTypeStr) {
if utils.IsBinaryMySQLType(mysqlTypeStr) {
// when encoding the `JavaSQLTypeBLOB`, use `ISO8859_1` decoder, now reverse it back.
encoder := charmap.ISO8859_1.NewEncoder()
value, err = encoder.String(data)
Expand All @@ -252,10 +252,6 @@ func canalJSONFormatColumn(value interface{}, name string, mysqlTypeStr string)
return result
}

func isBinaryMySQLType(mysqlType string) bool {
return strings.Contains(mysqlType, "blob") || strings.Contains(mysqlType, "binary")
}

func canalJSONMessage2DDLEvent(msg canalJSONMessageInterface) *model.DDLEvent {
result := new(model.DDLEvent)
// we lost the startTs from kafka message
Expand Down
Loading

0 comments on commit e13b048

Please sign in to comment.