Skip to content

Commit

Permalink
codec(ticdc): canal-json support compatible content by output detaile…
Browse files Browse the repository at this point in the history
…d mysql type information (#10014) (#10128)

close #10106
  • Loading branch information
ti-chi-bot authored Dec 5, 2023
1 parent 6aa39d0 commit 3f6e111
Show file tree
Hide file tree
Showing 31 changed files with 2,557 additions and 571 deletions.
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
FileIndexWidth: c.Sink.FileIndexWidth,
EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns,
ContentCompatible: c.Sink.ContentCompatible,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
CloudStorageConfig: cloudStorageConfig,
Expand Down Expand Up @@ -619,6 +620,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
FileIndexWidth: cloned.Sink.FileIndexWidth,
EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns,
ContentCompatible: cloned.Sink.ContentCompatible,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
CloudStorageConfig: cloudStorageConfig,
Expand Down Expand Up @@ -779,6 +781,7 @@ type SinkConfig struct {
EnableKafkaSinkV2 bool `json:"enable_kafka_sink_v2"`
OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns"`
SafeMode *bool `json:"safe_mode,omitempty"`
ContentCompatible *bool `json:"content_compatible"`
KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"`
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
Expand Down
23 changes: 18 additions & 5 deletions cdc/sink/dmlsink/mq/mq_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build intest
// +build intest

package mq

import (
Expand All @@ -20,6 +23,8 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -56,8 +61,6 @@ func TestNewKafkaDMLSinkFailed(t *testing.T) {
}

func TestWriteEvents(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -79,11 +82,21 @@ func TestWriteEvents(t *testing.T) {
require.NotNil(t, s)
defer s.Close()

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

tableStatus := state.TableSinkSinking
row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
}

events := make([]*dmlsink.CallbackableEvent[*model.SingleTableTxn], 0, 3000)
Expand Down
131 changes: 92 additions & 39 deletions cdc/sink/dmlsink/mq/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build intest
// +build intest

package mq

import (
Expand All @@ -19,6 +22,8 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -57,7 +62,13 @@ func newNonBatchEncodeWorker(ctx context.Context, t *testing.T) (*worker, dmlpro
}

func TestNonBatchEncode_SendMessages(t *testing.T) {
t.Parallel()
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -70,9 +81,11 @@ func TestNonBatchEncode_SendMessages(t *testing.T) {
Partition: 1,
}
row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
}
tableStatus := state.TableSinkSinking

Expand Down Expand Up @@ -250,7 +263,13 @@ func TestBatchEncode_Group(t *testing.T) {
}

func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
t.Parallel()
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

key1 := TopicPartitionKey{
Topic: "test",
Expand All @@ -270,9 +289,11 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand Down Expand Up @@ -317,8 +338,6 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
}

func TestBatchEncode_SendMessages(t *testing.T) {
t.Parallel()

key1 := TopicPartitionKey{
Topic: "test",
Partition: 1,
Expand All @@ -337,13 +356,24 @@ func TestBatchEncode_SendMessages(t *testing.T) {
defer cancel()
worker, p := newBatchEncodeWorker(ctx, t)
defer worker.close()

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

events := []mqEvent{
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -353,9 +383,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -365,9 +397,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "cc"}},
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -377,9 +411,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "aa", Table: "bb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -389,9 +425,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "aaa", Table: "bbb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -401,9 +439,11 @@ func TestBatchEncode_SendMessages(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "aaa", Table: "bbb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &tableStatus,
Expand Down Expand Up @@ -461,8 +501,6 @@ func TestBatchEncodeWorker_Abort(t *testing.T) {
}

func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
t.Parallel()

key1 := TopicPartitionKey{
Topic: "test",
Partition: 1,
Expand All @@ -477,13 +515,24 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
defer worker.close()
replicatingStatus := state.TableSinkSinking
stoppedStatus := state.TableSinkStopping

helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t(a varchar(255) primary key)`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

events := []mqEvent{
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
CommitTs: 1,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand All @@ -493,9 +542,11 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
CommitTs: 2,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "a", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand All @@ -505,9 +556,11 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
{
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "cc"}},
CommitTs: 3,
Table: &model.TableName{Schema: "test", Table: "t"},
TableInfo: tableInfo,
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: colInfo,
},
Callback: func() {},
SinkState: &stoppedStatus,
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ const (
"terminator": "",
"date-separator": "month",
"enable-partition-separator": true,
"only-output-updated-columns": false,
"enable-kafka-sink-v2": true,
"only-output-updated-columns": true,
"content-compatible": true,
"safe-mode": true,
"kafka-config": {
"partition-num": 1,
Expand Down Expand Up @@ -349,9 +349,9 @@ const (
"large-message-handle-option": "handle-key-only"
}
},
"only-output-updated-columns": false,
"enable-kafka-sink-v2": true,
"only-output-updated-columns": true,
"content-compatible": true,
"safe-mode": true,
"kafka-config": {
"partition-num": 1,
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestReplicaConfigMarshal(t *testing.T) {
conf.Scheduler.WriteKeyThreshold = 100001

conf.Sink.OnlyOutputUpdatedColumns = aws.Bool(true)
conf.Sink.ContentCompatible = aws.Bool(true)
conf.Sink.SafeMode = aws.Bool(true)
conf.Sink.AdvanceTimeoutInSec = util.AddressOf(uint(150))
conf.Sink.KafkaConfig = &KafkaConfig{
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ type SinkConfig struct {

OnlyOutputUpdatedColumns *bool `toml:"only-output-updated-columns" json:"only-output-updated-columns"`

// ContentCompatible is only available when the downstream is MQ.
ContentCompatible *bool `toml:"content-compatible" json:"content-compatible,omitempty"`

// TiDBSourceID is the source ID of the upstream TiDB,
// which is used to set the `tidb_cdc_write_source` session variable.
// Note: This field is only used internally and only used in the MySQL sink.
Expand Down
Loading

0 comments on commit 3f6e111

Please sign in to comment.