Skip to content

Commit

Permalink
codec(ticdc): simple protocol DDL message encode 2 table schema (#10294)
Browse files Browse the repository at this point in the history
close #10293
  • Loading branch information
3AceShowHand authored Dec 20, 2023
1 parent f0ab88a commit 964df36
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 116 deletions.
27 changes: 20 additions & 7 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,28 @@ func (s *SchemaTestHelper) DDL2Event(ddl string) *model.DDLEvent {
require.NoError(s.t, err)
s.schemaStorage.AdvanceResolvedTs(ver.Ver)

tableInfo, ok := s.schemaStorage.GetLastSnapshot().TableByName(res.SchemaName, res.TableName)
require.True(s.t, ok)
var tableInfo *model.TableInfo
if res.BinlogInfo != nil && res.BinlogInfo.TableInfo != nil {
tableInfo = model.WrapTableInfo(res.SchemaID, res.SchemaName, res.BinlogInfo.FinishedTS, res.BinlogInfo.TableInfo)
} else {
tableInfo = &model.TableInfo{
TableName: model.TableName{Schema: res.SchemaName},
Version: res.BinlogInfo.FinishedTS,
}
}
ctx := context.Background()
snap, err := s.schemaStorage.GetSnapshot(ctx, res.BinlogInfo.FinishedTS-1)
require.NoError(s.t, err)
preTableInfo, err := snap.PreTableInfo(res)
require.NoError(s.t, err)

event := &model.DDLEvent{
StartTs: res.StartTS,
CommitTs: res.BinlogInfo.FinishedTS,
TableInfo: tableInfo,
Query: res.Query,
Type: res.Type,
StartTs: res.StartTS,
CommitTs: res.BinlogInfo.FinishedTS,
TableInfo: tableInfo,
PreTableInfo: preTableInfo,
Query: res.Query,
Type: res.Type,
}

return event
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func TestRemovePausedChangefeed(t *testing.T) {
info.State = model.StateStopped
dir := t.TempDir()
// Field `Consistent` is valid only when the downstream
// is MySQL compatible Database
// is MySQL compatible Schema
info.SinkURI = "mysql://"
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Expand Down
24 changes: 14 additions & 10 deletions pkg/sink/codec/simple/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
return d.assembleHandleKeyOnlyRowChangedEvent(d.msg)
}

tableInfo := d.memo.Read(d.msg.Database, d.msg.Table, d.msg.SchemaVersion)
tableInfo := d.memo.Read(d.msg.Schema, d.msg.Table, d.msg.SchemaVersion)
if tableInfo == nil {
return nil, cerror.ErrCodecDecode.GenWithStack(
"cannot found the table info, schema: %s, table: %s, version: %d",
d.msg.Database, d.msg.Table, d.msg.SchemaVersion)
d.msg.Schema, d.msg.Table, d.msg.SchemaVersion)
}

event, err := buildRowChangedEvent(d.msg, tableInfo)
Expand Down Expand Up @@ -180,11 +180,11 @@ func (d *decoder) assembleClaimCheckRowChangedEvent(claimCheckLocation string) (
}

func (d *decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowChangedEvent, error) {
tableInfo := d.memo.Read(m.Database, m.Table, m.SchemaVersion)
tableInfo := d.memo.Read(m.Schema, m.Table, m.SchemaVersion)
if tableInfo == nil {
return nil, cerror.ErrCodecDecode.GenWithStack(
"cannot found the table info, schema: %s, table: %s, version: %d",
m.Database, m.Table, m.SchemaVersion)
m.Schema, m.Table, m.SchemaVersion)
}

fieldTypeMap := make(map[string]*types.FieldType, len(tableInfo.Columns))
Expand All @@ -194,7 +194,7 @@ func (d *decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowCh

result := &message{
Version: defaultVersion,
Database: m.Database,
Schema: m.Schema,
Table: m.Table,
Type: m.Type,
CommitTs: m.CommitTs,
Expand All @@ -204,7 +204,7 @@ func (d *decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowCh
ctx := context.Background()
switch m.Type {
case InsertType:
holder, err := common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs, m.Database, m.Table, m.Data)
holder, err := common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs, m.Schema, m.Table, m.Data)
if err != nil {
return nil, err
}
Expand All @@ -214,7 +214,7 @@ func (d *decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowCh
}
result.Data = data
case UpdateType:
holder, err := common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs, m.Database, m.Table, m.Data)
holder, err := common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs, m.Schema, m.Table, m.Data)
if err != nil {
return nil, err
}
Expand All @@ -224,7 +224,7 @@ func (d *decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowCh
}
result.Data = data

holder, err = common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs-1, m.Database, m.Table, m.Old)
holder, err = common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs-1, m.Schema, m.Table, m.Old)
if err != nil {
return nil, err
}
Expand All @@ -234,7 +234,7 @@ func (d *decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowCh
}
result.Old = old
case DeleteType:
holder, err := common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs-1, m.Database, m.Table, m.Old)
holder, err := common.SnapshotQuery(ctx, d.upstreamTiDB, m.CommitTs-1, m.Schema, m.Table, m.Old)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -263,7 +263,7 @@ func (d *decoder) buildData(
if !ok {
return nil, cerror.ErrCodecDecode.GenWithStack(
"cannot found the field type, schema: %s, table: %s, column: %s",
d.msg.Database, d.msg.Table, col.Name())
d.msg.Schema, d.msg.Table, col.Name())
}
value, err := encodeValue(value, fieldType)
if err != nil {
Expand All @@ -285,6 +285,7 @@ func (d *decoder) NextDDLEvent() (*model.DDLEvent, error) {
d.msg = nil

d.memo.Write(ddl.TableInfo)
d.memo.Write(ddl.PreTableInfo)

return ddl, nil
}
Expand All @@ -306,6 +307,9 @@ func newMemoryTableInfoProvider() *memoryTableInfoProvider {
}

func (m *memoryTableInfoProvider) Write(info *model.TableInfo) {
if info == nil {
return
}
key := cacheKey{
schema: info.TableName.Schema,
table: info.TableName.Table,
Expand Down
133 changes: 92 additions & 41 deletions pkg/sink/codec/simple/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestEncodeCheckpoint(t *testing.T) {
require.NoError(t, err)
enc := builder.Build()

checkpoint := 23
checkpoint := 446266400629063682
m, err := enc.EncodeCheckpointEvent(uint64(checkpoint))
require.NoError(t, err)

Expand All @@ -69,6 +69,20 @@ func TestEncodeDDLEvent(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(id int primary key, name varchar(255) not null, gender enum('male', 'female'), email varchar(255) not null, key idx_name_email(name, email))`
createTableDDLEvent := helper.DDL2Event(sql)

sql = `insert into test.t values (1, "jack", "male", "[email protected]")`
insertEvent := helper.DML2Event(sql, "test", "t")

sql = `rename table test.t to test.abc`
renameTableDDLEvent := helper.DDL2Event(sql)

sql = `insert into test.abc values (2, "anna", "female", "[email protected]")`
insertEvent2 := helper.DML2Event(sql, "test", "abc")

helper.Tk().MustExec("drop table test.abc")

ctx := context.Background()
for _, compressionType := range []string{
compression.None,
Expand All @@ -81,18 +95,10 @@ func TestEncodeDDLEvent(t *testing.T) {
require.NoError(t, err)
enc := builder.Build()

sql := `create table test.t(
id int primary key,
name varchar(255) not null,
gender enum('male', 'female'),
email varchar(255) not null,
key idx_name_email(name, email))`
ddlEvent := helper.DDL2Event(sql)

m, err := enc.EncodeDDLEvent(ddlEvent)
dec, err := NewDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

dec, err := NewDecoder(ctx, codecConfig, nil)
m, err := enc.EncodeDDLEvent(createTableDDLEvent)
require.NoError(t, err)

err = dec.AddKeyValue(m.Key, m.Value)
Expand All @@ -106,22 +112,20 @@ func TestEncodeDDLEvent(t *testing.T) {

event, err := dec.NextDDLEvent()
require.NoError(t, err)
require.Equal(t, ddlEvent.CommitTs, event.CommitTs)
require.Equal(t, createTableDDLEvent.CommitTs, event.CommitTs)
// because we don't we don't set startTs in the encoded message,
// so the startTs is equal to commitTs
require.Equal(t, ddlEvent.CommitTs, event.StartTs)
require.Equal(t, ddlEvent.Query, event.Query)
require.Equal(t, len(ddlEvent.TableInfo.Columns), len(event.TableInfo.Columns))
require.Equal(t, len(ddlEvent.TableInfo.Indices)+1, len(event.TableInfo.Indices))

item := dec.memo.Read(ddlEvent.TableInfo.TableName.Schema,
ddlEvent.TableInfo.TableName.Table, ddlEvent.TableInfo.UpdateTS)
require.Equal(t, createTableDDLEvent.CommitTs, event.StartTs)
require.Equal(t, createTableDDLEvent.Query, event.Query)
require.Equal(t, len(createTableDDLEvent.TableInfo.Columns), len(event.TableInfo.Columns))
require.Equal(t, len(createTableDDLEvent.TableInfo.Indices)+1, len(event.TableInfo.Indices))
require.Nil(t, event.PreTableInfo)

item := dec.memo.Read(createTableDDLEvent.TableInfo.TableName.Schema,
createTableDDLEvent.TableInfo.TableName.Table, createTableDDLEvent.TableInfo.UpdateTS)
require.NotNil(t, item)

sql = `insert into test.t values (1, "jack", "male", "[email protected]")`
row := helper.DML2Event(sql, "test", "t")

err = enc.AppendRowChangedEvent(context.Background(), "", row, func() {})
err = enc.AppendRowChangedEvent(context.Background(), "", insertEvent, func() {})
require.NoError(t, err)

messages := enc.Build()
Expand All @@ -138,12 +142,59 @@ func TestEncodeDDLEvent(t *testing.T) {

decodedRow, err := dec.NextRowChangedEvent()
require.NoError(t, err)
require.Equal(t, decodedRow.CommitTs, row.CommitTs)
require.Equal(t, decodedRow.Table.Schema, row.Table.Schema)
require.Equal(t, decodedRow.Table.Table, row.Table.Table)
require.Equal(t, decodedRow.CommitTs, insertEvent.CommitTs)
require.Equal(t, decodedRow.Table.Schema, insertEvent.Table.Schema)
require.Equal(t, decodedRow.Table.Table, insertEvent.Table.Table)
require.Nil(t, decodedRow.PreColumns)

helper.Tk().MustExec("drop table test.t")
m, err = enc.EncodeDDLEvent(renameTableDDLEvent)
require.NoError(t, err)

err = dec.AddKeyValue(m.Key, m.Value)
require.NoError(t, err)

messageType, hasNext, err = dec.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, messageType)
require.NotEqual(t, 0, dec.msg.BuildTs)

event, err = dec.NextDDLEvent()
require.NoError(t, err)
require.Equal(t, renameTableDDLEvent.CommitTs, event.CommitTs)
// because we don't we don't set startTs in the encoded message,
// so the startTs is equal to commitTs
require.Equal(t, renameTableDDLEvent.CommitTs, event.StartTs)
require.Equal(t, renameTableDDLEvent.Query, event.Query)
require.Equal(t, len(renameTableDDLEvent.TableInfo.Columns), len(event.TableInfo.Columns))
require.Equal(t, len(renameTableDDLEvent.TableInfo.Indices)+1, len(event.TableInfo.Indices))
require.NotNil(t, event.PreTableInfo)

item = dec.memo.Read(renameTableDDLEvent.TableInfo.TableName.Schema,
renameTableDDLEvent.TableInfo.TableName.Table, renameTableDDLEvent.TableInfo.UpdateTS)
require.NotNil(t, item)

err = enc.AppendRowChangedEvent(context.Background(), "", insertEvent2, func() {})
require.NoError(t, err)

messages = enc.Build()
require.Len(t, messages, 1)

err = dec.AddKeyValue(messages[0].Key, messages[0].Value)
require.NoError(t, err)

messageType, hasNext, err = dec.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, messageType)
require.NotEqual(t, 0, dec.msg.BuildTs)

decodedRow, err = dec.NextRowChangedEvent()
require.NoError(t, err)
require.Equal(t, decodedRow.CommitTs, insertEvent2.CommitTs)
require.Equal(t, decodedRow.Table.Schema, insertEvent2.Table.Schema)
require.Equal(t, decodedRow.Table.Table, insertEvent2.Table.Table)
require.Nil(t, decodedRow.PreColumns)
}
}

Expand Down Expand Up @@ -223,6 +274,20 @@ func TestEncodeBootstrapEvent(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(
id int primary key,
name varchar(255) not null,
age int,
email varchar(255) not null,
key idx_name_email(name, email))`
ddlEvent := helper.DDL2Event(sql)
ddlEvent.IsBootstrap = true

sql = `insert into test.t values (1, "jack", 23, "[email protected]")`
row := helper.DML2Event(sql, "test", "t")

helper.Tk().MustExec("drop table test.t")

ctx := context.Background()
for _, compressionType := range []string{
compression.None,
Expand All @@ -235,15 +300,6 @@ func TestEncodeBootstrapEvent(t *testing.T) {
require.NoError(t, err)
enc := builder.Build()

sql := `create table test.t(
id int primary key,
name varchar(255) not null,
age int,
email varchar(255) not null,
key idx_name_email(name, email))`
ddlEvent := helper.DDL2Event(sql)
ddlEvent.IsBootstrap = true

m, err := enc.EncodeDDLEvent(ddlEvent)
require.NoError(t, err)

Expand Down Expand Up @@ -274,9 +330,6 @@ func TestEncodeBootstrapEvent(t *testing.T) {
ddlEvent.TableInfo.TableName.Table, ddlEvent.TableInfo.UpdateTS)
require.NotNil(t, item)

sql = `insert into test.t values (1, "jack", 23, "[email protected]")`
row := helper.DML2Event(sql, "test", "t")

err = enc.AppendRowChangedEvent(context.Background(), "", row, func() {})
require.NoError(t, err)

Expand All @@ -298,8 +351,6 @@ func TestEncodeBootstrapEvent(t *testing.T) {
require.Equal(t, decodedRow.Table.Schema, row.Table.Schema)
require.Equal(t, decodedRow.Table.Table, row.Table.Table)
require.Nil(t, decodedRow.PreColumns)

helper.Tk().MustExec("drop table test.t")
}
}

Expand Down
Loading

0 comments on commit 964df36

Please sign in to comment.