Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec(ticdc): simple protocol support encode dml event #10112

Merged
merged 37 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0aae4fd
simple dml event, first version.
3AceShowHand Nov 20, 2023
abb2e21
add basic to the decoder.
3AceShowHand Nov 20, 2023
61f2cf4
adjust the code.
3AceShowHand Nov 20, 2023
c63c423
adjust simple protocol
3AceShowHand Nov 21, 2023
0f5534e
adjust decoder.
3AceShowHand Nov 21, 2023
6015028
tiny adjust.
3AceShowHand Nov 21, 2023
4f2086d
tiny adjust.
3AceShowHand Nov 21, 2023
323231b
fix make check
3AceShowHand Nov 21, 2023
d758a3c
Merge branch 'master' into simple-dml-event
3AceShowHand Nov 22, 2023
eb5a6aa
simple dml encode and decode.
3AceShowHand Nov 22, 2023
5c9159d
simple dml encode and decode.
3AceShowHand Nov 22, 2023
aff583b
support schema version, by using the update ts of the table info.
3AceShowHand Nov 27, 2023
2e46c16
Merge branch 'master' into simple-dml-event
3AceShowHand Nov 27, 2023
68a0b59
fix test.
3AceShowHand Nov 28, 2023
e67d445
Update pkg/sink/codec/simple/decoder.go
3AceShowHand Nov 28, 2023
6fef397
add new logs.
3AceShowHand Nov 28, 2023
0d41d0f
add simple basic integration test
3AceShowHand Nov 28, 2023
89ff61d
fix by the comment.
3AceShowHand Nov 28, 2023
17dfeca
fix by the comment.
3AceShowHand Nov 28, 2023
8367f26
add simple basic integration test
3AceShowHand Nov 28, 2023
3cc1265
fix kafka consumer support simple.
3AceShowHand Nov 28, 2023
f3fc132
fix the ddl manager panic
3AceShowHand Nov 28, 2023
c359341
fix panic.
3AceShowHand Nov 28, 2023
61f045f
fix panic.
3AceShowHand Nov 28, 2023
62e7ee1
fix decoder decode table schema.
3AceShowHand Nov 29, 2023
0cb5c24
fix decoder bit and set.
3AceShowHand Nov 29, 2023
7bd71fa
encode charset and collate into the simple message.
3AceShowHand Nov 29, 2023
6e86990
fix simple codec decode.
3AceShowHand Nov 29, 2023
360e2af
fix simple codec decode.
3AceShowHand Nov 29, 2023
8ffa881
fix simple codec decode.
3AceShowHand Nov 29, 2023
b65bb22
fix parse longlong failed.
3AceShowHand Nov 29, 2023
ccfc7bf
do not convert long value
3AceShowHand Nov 29, 2023
ad15067
revert some change in the meta manager test.
3AceShowHand Nov 30, 2023
0559d41
Update cdc/owner/ddl_manager.go
3AceShowHand Nov 30, 2023
d8d9ed9
fix by review.
3AceShowHand Nov 30, 2023
62c53ca
return the checkpoint if the bootstrap events not all sent.
3AceShowHand Nov 30, 2023
b665d24
simple not support pulsar.
3AceShowHand Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
ctx := context.Background()
pdClient := &mockPDClient{}
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
helper.Tk().MustExec("use test;")
storage := helper.Storage()
ctrl := mock_controller.NewMockController(gomock.NewController(t))
Expand Down
161 changes: 155 additions & 6 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package entry

import (
"context"
"encoding/json"
"strings"
"testing"
"time"

ticonfig "github.com/pingcap/tidb/config"
tiddl "github.com/pingcap/tidb/ddl"
Expand All @@ -27,6 +29,11 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)
Expand All @@ -37,10 +44,17 @@ type SchemaTestHelper struct {
tk *testkit.TestKit
storage kv.Storage
domain *domain.Domain

schemaStorage SchemaStorage
mounter Mounter
filter filter.Filter
}

// NewSchemaTestHelper creates a SchemaTestHelper
func NewSchemaTestHelper(t *testing.T) *SchemaTestHelper {
// NewSchemaTestHelperWithReplicaConfig creates a SchemaTestHelper
// by using the given replica config.
func NewSchemaTestHelperWithReplicaConfig(
t *testing.T, replicaConfig *config.ReplicaConfig,
) *SchemaTestHelper {
store, err := mockstore.NewMockStore()
require.Nil(t, err)
ticonfig.UpdateGlobal(func(conf *ticonfig.Config) {
Expand All @@ -52,14 +66,40 @@ func NewSchemaTestHelper(t *testing.T) *SchemaTestHelper {
require.Nil(t, err)
domain.SetStatsUpdating(true)
tk := testkit.NewTestKit(t, store)

filter, err := filter.NewFilter(replicaConfig, "")
require.NoError(t, err)

ver, err := store.CurrentVersion(oracle.GlobalTxnScope)
require.NoError(t, err)

changefeedID := model.DefaultChangeFeedID("changefeed-testkit")

meta := timeta.NewSnapshotMeta(store.GetSnapshot(ver))
schemaStorage, err := NewSchemaStorage(
meta, ver.Ver, replicaConfig.ForceReplicate,
changefeedID, util.RoleTester, filter)
require.NoError(t, err)

mounter := NewMounter(schemaStorage, changefeedID, time.Local,
filter, replicaConfig.Integrity)

return &SchemaTestHelper{
t: t,
tk: tk,
storage: store,
domain: domain,
t: t,
tk: tk,
storage: store,
domain: domain,
filter: filter,
schemaStorage: schemaStorage,
mounter: mounter,
}
}

// NewSchemaTestHelper creates a SchemaTestHelper
func NewSchemaTestHelper(t *testing.T) *SchemaTestHelper {
return NewSchemaTestHelperWithReplicaConfig(t, config.GetDefaultReplicaConfig())
}

// DDL2Job executes the DDL stmt and returns the DDL job
func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
s.tk.MustExec(ddl)
Expand Down Expand Up @@ -127,6 +167,115 @@ func (s *SchemaTestHelper) DDL2Jobs(ddl string, jobCnt int) []*timodel.Job {
return jobs
}

// DML2Event execute the dml and return the corresponding row changed event.
// caution: it does not support `delete` since the key value cannot be found
// after the query executed.
func (s *SchemaTestHelper) DML2Event(dml string, schema, table string) *model.RowChangedEvent {
s.tk.MustExec(dml)

tableInfo, ok := s.schemaStorage.GetLastSnapshot().TableByName(schema, table)
require.True(s.t, ok)

key, value := s.getLastKeyValue(tableInfo.ID)
ts := s.schemaStorage.GetLastSnapshot().CurrentTs()
rawKV := &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
OldValue: nil,
StartTs: ts - 1,
CRTs: ts + 1,
}
polymorphicEvent := model.NewPolymorphicEvent(rawKV)
err := s.mounter.DecodeEvent(context.Background(), polymorphicEvent)
require.NoError(s.t, err)
return polymorphicEvent.Row
}

func (s *SchemaTestHelper) getLastKeyValue(tableID int64) (key, value []byte) {
txn, err := s.storage.Begin()
require.NoError(s.t, err)
defer txn.Rollback() //nolint:errcheck

start, end := spanz.GetTableRange(tableID)
iter, err := txn.Iter(start, end)
require.NoError(s.t, err)
defer iter.Close()
for iter.Valid() {
key = iter.Key()
value = iter.Value()
err = iter.Next()
require.NoError(s.t, err)
}
return key, value
}

// DDL2Event executes the DDL and return the corresponding event.
func (s *SchemaTestHelper) DDL2Event(ddl string) *model.DDLEvent {
s.tk.MustExec(ddl)
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), 1)
require.NoError(s.t, err)
require.Len(s.t, jobs, 1)
// Set State from Synced to Done.
// Because jobs are put to history queue after TiDB alter its state from
// Done to Synced.
jobs[0].State = timodel.JobStateDone
res := jobs[0]
if res.Type == timodel.ActionRenameTables {
// the RawArgs field in job fetched from tidb snapshot meta is incorrent,
// so we manually construct `job.RawArgs` to do the workaround.
// we assume the old schema name is same as the new schema name here.
// for example, "ALTER TABLE RENAME test.t1 TO test.t1, test.t2 to test.t22", schema name is "test"
schema := strings.Split(strings.Split(strings.Split(res.Query, ",")[1], " ")[1], ".")[0]
tableNum := len(res.BinlogInfo.MultipleTableInfos)
oldSchemaIDs := make([]int64, tableNum)
for i := 0; i < tableNum; i++ {
oldSchemaIDs[i] = res.SchemaID
}
oldTableIDs := make([]int64, tableNum)
for i := 0; i < tableNum; i++ {
oldTableIDs[i] = res.BinlogInfo.MultipleTableInfos[i].ID
}
newTableNames := make([]timodel.CIStr, tableNum)
for i := 0; i < tableNum; i++ {
newTableNames[i] = res.BinlogInfo.MultipleTableInfos[i].Name
}
oldSchemaNames := make([]timodel.CIStr, tableNum)
for i := 0; i < tableNum; i++ {
oldSchemaNames[i] = timodel.NewCIStr(schema)
}
newSchemaIDs := oldSchemaIDs

args := []interface{}{
oldSchemaIDs, newSchemaIDs,
newTableNames, oldTableIDs, oldSchemaNames,
}
rawArgs, err := json.Marshal(args)
require.NoError(s.t, err)
res.RawArgs = rawArgs
}

err = s.schemaStorage.HandleDDLJob(res)
require.NoError(s.t, err)

ver, err := s.storage.CurrentVersion(oracle.GlobalTxnScope)
require.NoError(s.t, err)
s.schemaStorage.AdvanceResolvedTs(ver.Ver)

tableInfo, ok := s.schemaStorage.GetLastSnapshot().TableByName(res.SchemaName, res.TableName)
require.True(s.t, ok)

event := &model.DDLEvent{
StartTs: res.StartTS,
CommitTs: res.BinlogInfo.FinishedTS,
TableInfo: tableInfo,
Query: res.Query,
Type: res.Type,
}

return event
}

// Storage returns the tikv storage
func (s *SchemaTestHelper) Storage() kv.Storage {
return s.storage
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (m *ddlManager) tick(
return nil, nil, err
}
if !ok {
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil, nil
return nil, m.barrier(), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return an empty barrier here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return nil cause the panic, and the barrier is initialized as empty I think.

}
}

Expand Down
7 changes: 3 additions & 4 deletions cdc/redo/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,21 +270,18 @@ func TestGCAndCleanup(t *testing.T) {
}

// write some log files
require.NoError(t, err)
maxCommitTs := 20
for i := 1; i <= maxCommitTs; i++ {
for _, logType := range []string{redo.RedoRowLogFileType, redo.RedoDDLLogFileType} {
// fileName with different captureID and maxCommitTs
curCaptureID := fmt.Sprintf("%s%d", captureID, i%10)
maxCommitTs := i
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
fileName := fmt.Sprintf(redo.RedoLogFileFormatV1, curCaptureID, changefeedID.ID,
logType, maxCommitTs, uuid.NewGenerator().NewString(), redo.LogEXT)
logType, i, uuid.NewGenerator().NewString(), redo.LogEXT)
err := extStorage.WriteFile(ctx, fileName, []byte{})
require.NoError(t, err)
}
}

startTs := uint64(3)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Expand All @@ -294,6 +291,8 @@ func TestGCAndCleanup(t *testing.T) {
EncodingWorkerNum: redo.DefaultEncodingWorkerNum,
FlushWorkerNum: redo.DefaultFlushWorkerNum,
}

startTs := uint64(3)
m := NewMetaManager(changefeedID, cfg, startTs)

var eg errgroup.Group
Expand Down
3 changes: 3 additions & 0 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/pingcap/tiflow/pkg/sink/codec/canal"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/codec/open"
"github.com/pingcap/tiflow/pkg/sink/codec/simple"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
Expand Down Expand Up @@ -628,6 +629,8 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
return cerror.Trace(err)
}
decoder = avro.NewDecoder(c.codecConfig, schemaM, c.option.topic, c.tz)
case config.ProtocolSimple:
decoder = simple.NewDecoder()
default:
log.Panic("Protocol not supported", zap.Any("Protocol", c.codecConfig.Protocol))
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sink/codec/canal/canal_json_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/codec/utils"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
"golang.org/x/text/encoding"
Expand Down Expand Up @@ -172,7 +173,7 @@ func (b *batchDecoder) buildData(holder *common.ColumnsHolder) (map[string]inter

var value string
rawValue := holder.Values[i].([]uint8)
if isBinaryMySQLType(mysqlType) {
if utils.IsBinaryMySQLType(mysqlType) {
rawValue, err := b.bytesDecoder.Bytes(rawValue)
if err != nil {
return nil, nil, errors.Trace(err)
Expand Down
6 changes: 1 addition & 5 deletions pkg/sink/codec/canal/canal_json_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func canalJSONFormatColumn(value interface{}, name string, mysqlTypeStr string)
}

var err error
if isBinaryMySQLType(mysqlTypeStr) {
if utils.IsBinaryMySQLType(mysqlTypeStr) {
// when encoding the `JavaSQLTypeBLOB`, use `ISO8859_1` decoder, now reverse it back.
encoder := charmap.ISO8859_1.NewEncoder()
value, err = encoder.String(data)
Expand All @@ -252,10 +252,6 @@ func canalJSONFormatColumn(value interface{}, name string, mysqlTypeStr string)
return result
}

func isBinaryMySQLType(mysqlType string) bool {
return strings.Contains(mysqlType, "blob") || strings.Contains(mysqlType, "binary")
}

func canalJSONMessage2DDLEvent(msg canalJSONMessageInterface) *model.DDLEvent {
result := new(model.DDLEvent)
// we lost the startTs from kafka message
Expand Down
Loading
Loading