From f54f807a86cd7fce17d382c7497e7f86a0eb37b7 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Thu, 14 Sep 2023 15:55:39 +0800 Subject: [PATCH] kafka(ticdc): large message handle key columns cherry-pick to release-7.1 (#9696) close pingcap/tiflow#9680 --- cdc/api/v2/model.go | 28 ++ cmd/kafka-consumer/main.go | 106 ++++-- cmd/storage-consumer/main.go | 11 +- docs/swagger/docs.go | 22 ++ docs/swagger/swagger.json | 22 ++ docs/swagger/swagger.yaml | 14 + pkg/config/config_test_data.go | 8 + pkg/config/large_message.go | 77 +++++ pkg/config/large_message_test.go | 78 +++++ pkg/config/replica_config_test.go | 23 +- pkg/config/sink.go | 21 ++ pkg/sink/codec/builder/codec_test.go | 44 +-- pkg/sink/codec/canal/canal_encoder.go | 2 +- pkg/sink/codec/canal/canal_json_decoder.go | 176 +++++++++- .../codec/canal/canal_json_decoder_test.go | 36 +- pkg/sink/codec/canal/canal_json_message.go | 31 +- .../canal/canal_json_row_event_encoder.go | 158 +++++---- .../canal_json_row_event_encoder_test.go | 64 +++- .../canal/canal_json_txn_event_encoder.go | 2 +- pkg/sink/codec/common/config.go | 25 +- pkg/sink/codec/common/config_test.go | 107 +++++- pkg/sink/codec/common/helper.go | 169 +++++++++ pkg/sink/codec/craft/craft_decoder.go | 32 +- pkg/sink/codec/craft/craft_encoder.go | 2 +- pkg/sink/codec/csv/csv_decoder.go | 5 + pkg/sink/codec/decoder.go | 5 + pkg/sink/codec/internal/column.go | 24 +- pkg/sink/codec/internal/message_key.go | 3 + pkg/sink/codec/maxwell/maxwell_encoder.go | 2 +- pkg/sink/codec/open/open_protocol_decoder.go | 324 ++++++++++-------- pkg/sink/codec/open/open_protocol_encoder.go | 59 +++- .../codec/open/open_protocol_encoder_test.go | 92 +++-- pkg/sink/codec/open/open_protocol_message.go | 26 +- .../codec/open/open_protocol_message_test.go | 47 ++- .../conf/changefeed.toml | 2 + .../conf/diff_config.toml | 29 ++ .../canal_json_handle_key_only/data/data.sql | 100 ++++++ .../canal_json_handle_key_only/run.sh | 48 +++ .../conf/changefeed.toml | 2 + .../conf/diff_config.toml | 29 ++ .../data/data.sql | 100 ++++++ .../open_protocol_handle_key_only/run.sh | 48 +++ tests/integration_tests/run_group.sh | 2 +- 43 files changed, 1797 insertions(+), 408 deletions(-) create mode 100644 pkg/config/large_message.go create mode 100644 pkg/config/large_message_test.go create mode 100644 pkg/sink/codec/common/helper.go create mode 100644 tests/integration_tests/canal_json_handle_key_only/conf/changefeed.toml create mode 100644 tests/integration_tests/canal_json_handle_key_only/conf/diff_config.toml create mode 100644 tests/integration_tests/canal_json_handle_key_only/data/data.sql create mode 100644 tests/integration_tests/canal_json_handle_key_only/run.sh create mode 100644 tests/integration_tests/open_protocol_handle_key_only/conf/changefeed.toml create mode 100644 tests/integration_tests/open_protocol_handle_key_only/conf/diff_config.toml create mode 100644 tests/integration_tests/open_protocol_handle_key_only/data/data.sql create mode 100644 tests/integration_tests/open_protocol_handle_key_only/run.sh diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index cb96a480b36..62af8026b7d 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -285,6 +285,15 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( AvroBigintUnsignedHandlingMode: oldConfig.AvroBigintUnsignedHandlingMode, } } + + var largeMessageHandle *config.LargeMessageHandleConfig + if c.Sink.KafkaConfig.LargeMessageHandle != nil { + oldConfig := c.Sink.KafkaConfig.LargeMessageHandle + largeMessageHandle = &config.LargeMessageHandleConfig{ + LargeMessageHandleOption: oldConfig.LargeMessageHandleOption, + } + } + kafkaConfig = &config.KafkaConfig{ PartitionNum: c.Sink.KafkaConfig.PartitionNum, ReplicationFactor: c.Sink.KafkaConfig.ReplicationFactor, @@ -320,6 +329,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( Key: c.Sink.KafkaConfig.Key, InsecureSkipVerify: c.Sink.KafkaConfig.InsecureSkipVerify, CodecConfig: codeConfig, + LargeMessageHandle: largeMessageHandle, } } var mysqlConfig *config.MySQLConfig @@ -491,6 +501,15 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { AvroBigintUnsignedHandlingMode: oldConfig.AvroBigintUnsignedHandlingMode, } } + + var largeMessageHandle *LargeMessageHandleConfig + if cloned.Sink.KafkaConfig.LargeMessageHandle != nil { + oldConfig := cloned.Sink.KafkaConfig.LargeMessageHandle + largeMessageHandle = &LargeMessageHandleConfig{ + LargeMessageHandleOption: oldConfig.LargeMessageHandleOption, + } + } + kafkaConfig = &KafkaConfig{ PartitionNum: cloned.Sink.KafkaConfig.PartitionNum, ReplicationFactor: cloned.Sink.KafkaConfig.ReplicationFactor, @@ -526,6 +545,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { Key: cloned.Sink.KafkaConfig.Key, InsecureSkipVerify: cloned.Sink.KafkaConfig.InsecureSkipVerify, CodecConfig: codeConfig, + LargeMessageHandle: largeMessageHandle, } } var mysqlConfig *MySQLConfig @@ -951,6 +971,8 @@ type KafkaConfig struct { Key *string `json:"key,omitempty"` InsecureSkipVerify *bool `json:"insecure_skip_verify,omitempty"` CodecConfig *CodecConfig `json:"codec_config,omitempty"` + + LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"` } // MySQLConfig represents a MySQL sink configuration @@ -987,3 +1009,9 @@ type ChangefeedStatus struct { LastError *RunningError `json:"last_error,omitempty"` LastWarning *RunningError `json:"last_warning,omitempty"` } + +// LargeMessageHandleConfig denotes the large message handling config +// This is the same as config.LargeMessageHandleConfig +type LargeMessageHandleConfig struct { + LargeMessageHandleOption string `json:"large_message_handle_option"` +} diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 51b92c68641..4e1c2aa9f1e 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -15,6 +15,7 @@ package main import ( "context" + "database/sql" "flag" "fmt" "math" @@ -48,6 +49,7 @@ import ( "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/sink/codec" "github.com/pingcap/tiflow/pkg/sink/codec/canal" + "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/codec/open" "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/util" @@ -71,14 +73,17 @@ var ( protocol config.Protocol enableTiDBExtension bool - // eventRouterReplicaConfig only used to initialize the consumer's eventRouter + // replicaConfig only used to initialize the consumer's eventRouter // which then can be used to check RowChangedEvent dispatched correctness - eventRouterReplicaConfig *config.ReplicaConfig + replicaConfig *config.ReplicaConfig logPath string logLevel string timezone string ca, cert, key string + + // upstreamTiDBDSN is the dsn of the upstream TiDB cluster + upstreamTiDBDSN string ) func init() { @@ -90,6 +95,7 @@ func init() { flag.StringVar(&upstreamURIStr, "upstream-uri", "", "Kafka uri") flag.StringVar(&downstreamURIStr, "downstream-uri", "", "downstream sink uri") + flag.StringVar(&upstreamTiDBDSN, "upstream-tidb-dsn", "", "upstream TiDB DSN") flag.StringVar(&configFile, "config", "", "config file for changefeed") flag.StringVar(&logPath, "log-file", "cdc_kafka_consumer.log", "log file path") flag.StringVar(&logLevel, "log-level", "info", "log file path") @@ -195,15 +201,15 @@ func init() { } if configFile != "" { - eventRouterReplicaConfig = config.GetDefaultReplicaConfig() - eventRouterReplicaConfig.Sink.Protocol = protocol.String() - err := cmdUtil.StrictDecodeFile(configFile, "kafka consumer", eventRouterReplicaConfig) + replicaConfig = config.GetDefaultReplicaConfig() + replicaConfig.Sink.Protocol = protocol.String() + err := cmdUtil.StrictDecodeFile(configFile, "kafka consumer", replicaConfig) if err != nil { log.Panic("invalid config file for kafka consumer", zap.Error(err), zap.String("config", configFile)) } - if _, err := filter.VerifyTableRules(eventRouterReplicaConfig.Filter); err != nil { + if _, err := filter.VerifyTableRules(replicaConfig.Filter); err != nil { log.Panic("verify rule failed", zap.Error(err)) } } @@ -380,10 +386,13 @@ type Consumer struct { // initialize to 0 by default globalResolvedTs uint64 - protocol config.Protocol - enableTiDBExtension bool + protocol config.Protocol eventRouter *dispatcher.EventRouter + + codecConfig *common.Config + + upstreamTiDB *sql.DB } // NewConsumer creates a new cdc kafka consumer @@ -400,7 +409,20 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { tableIDs: make(map[string]int64), } c.protocol = protocol - c.enableTiDBExtension = enableTiDBExtension + + c.codecConfig = common.NewConfig(c.protocol) + c.codecConfig.EnableTiDBExtension = enableTiDBExtension + if replicaConfig != nil && replicaConfig.Sink != nil && replicaConfig.Sink.KafkaConfig != nil { + c.codecConfig.LargeMessageHandle = replicaConfig.Sink.KafkaConfig.LargeMessageHandle + } + + if c.codecConfig.LargeMessageHandle.HandleKeyOnly() { + db, err := openDB(ctx, upstreamTiDBDSN) + if err != nil { + return nil, err + } + c.upstreamTiDB = db + } // this means user has input config file to enable dispatcher check // some protocol does not provide enough information to check the @@ -410,8 +432,8 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { // when try to enable dispatcher check for any protocol and dispatch // rule, make sure decoded `RowChangedEvent` contains information // identical to the CDC side. - if eventRouterReplicaConfig != nil { - eventRouter, err := dispatcher.NewEventRouter(eventRouterReplicaConfig, kafkaTopic) + if replicaConfig != nil { + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, kafkaTopic) if err != nil { return nil, errors.Trace(err) } @@ -514,24 +536,37 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram panic("sink should initialized") } + ctx := context.Background() + var ( + decoder codec.RowEventDecoder + err error + ) + + switch c.protocol { + case config.ProtocolOpen, config.ProtocolDefault: + decoder, err = open.NewBatchDecoder(ctx, c.codecConfig, c.upstreamTiDB) + case config.ProtocolCanalJSON: + decoder, err = canal.NewBatchDecoder(ctx, c.codecConfig, c.upstreamTiDB) + if err != nil { + return err + } + default: + log.Panic("Protocol not supported", zap.Any("Protocol", c.protocol)) + } + if err != nil { + return errors.Trace(err) + } + + log.Info("start consume claim", + zap.String("topic", claim.Topic()), zap.Int32("partition", partition), + zap.Int64("initialOffset", claim.InitialOffset()), zap.Int64("highWaterMarkOffset", claim.HighWaterMarkOffset())) + eventGroups := make(map[int64]*eventsGroup) for message := range claim.Messages() { - var ( - decoder codec.RowEventDecoder - err error - ) - switch c.protocol { - case config.ProtocolOpen, config.ProtocolDefault: - decoder, err = open.NewBatchDecoder(message.Key, message.Value) - case config.ProtocolCanalJSON: - decoder = canal.NewBatchDecoder(message.Value, c.enableTiDBExtension, "") - default: - log.Panic("Protocol not supported", zap.Any("Protocol", c.protocol)) - } - if err != nil { + if err := decoder.AddKeyValue(message.Key, message.Value); err != nil { + log.Error("add key value to the decoder failed", zap.Error(err)) return errors.Trace(err) } - counter := 0 for { tp, hasNext, err := decoder.HasNext() @@ -841,3 +876,24 @@ func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partiti g.tableIDs[key] = g.currentTableID return g.currentTableID } + +func openDB(ctx context.Context, dsn string) (*sql.DB, error) { + db, err := sql.Open("mysql", dsn) + if err != nil { + log.Error("open db failed", zap.String("dsn", dsn), zap.Error(err)) + return nil, errors.Trace(err) + } + + db.SetMaxOpenConns(10) + db.SetMaxIdleConns(10) + db.SetConnMaxLifetime(10 * time.Minute) + + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err = db.PingContext(ctx); err != nil { + log.Error("ping db failed", zap.String("dsn", dsn), zap.Error(err)) + return nil, errors.Trace(err) + } + log.Info("open db success", zap.String("dsn", dsn)) + return db, nil +} diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index eb5ec4de59b..3e881e79087 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -309,7 +309,16 @@ func (c *consumer) emitDMLEvents( case config.ProtocolCanalJSON: // Always enable tidb extension for canal-json protocol // because we need to get the commit ts from the extension field. - decoder = canal.NewBatchDecoder(content, true, c.codecCfg.Terminator) + c.codecCfg.EnableTiDBExtension = true + decoder, err = canal.NewBatchDecoder(ctx, c.codecCfg, nil) + if err != nil { + return errors.Trace(err) + } + + err = decoder.AddKeyValue(nil, content) + if err != nil { + return errors.Trace(err) + } } cnt := 0 diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 0bed86afd45..1a750b076dc 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1434,6 +1434,9 @@ var doc = `{ "key": { "type": "string" }, + "large-message-handle": { + "$ref": "#/definitions/config.LargeMessageHandleConfig" + }, "max-message-bytes": { "type": "integer" }, @@ -1508,6 +1511,14 @@ var doc = `{ } } }, + "config.LargeMessageHandleConfig": { + "type": "object", + "properties": { + "large-message-handle-option": { + "type": "string" + } + } + }, "config.MySQLConfig": { "type": "object", "properties": { @@ -2319,6 +2330,9 @@ var doc = `{ "key": { "type": "string" }, + "large_message_handle": { + "$ref": "#/definitions/v2.LargeMessageHandleConfig" + }, "max_message_bytes": { "type": "integer" }, @@ -2393,6 +2407,14 @@ var doc = `{ } } }, + "v2.LargeMessageHandleConfig": { + "type": "object", + "properties": { + "large_message_handle_option": { + "type": "string" + } + } + }, "v2.LogLevelReq": { "type": "object", "properties": { diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 0a7c107f387..b24d3f3cbd5 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1415,6 +1415,9 @@ "key": { "type": "string" }, + "large-message-handle": { + "$ref": "#/definitions/config.LargeMessageHandleConfig" + }, "max-message-bytes": { "type": "integer" }, @@ -1489,6 +1492,14 @@ } } }, + "config.LargeMessageHandleConfig": { + "type": "object", + "properties": { + "large-message-handle-option": { + "type": "string" + } + } + }, "config.MySQLConfig": { "type": "object", "properties": { @@ -2300,6 +2311,9 @@ "key": { "type": "string" }, + "large_message_handle": { + "$ref": "#/definitions/v2.LargeMessageHandleConfig" + }, "max_message_bytes": { "type": "integer" }, @@ -2374,6 +2388,14 @@ } } }, + "v2.LargeMessageHandleConfig": { + "type": "object", + "properties": { + "large_message_handle_option": { + "type": "string" + } + } + }, "v2.LogLevelReq": { "type": "object", "properties": { diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 5bb4d6a3ae7..a311949934f 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -91,6 +91,8 @@ definitions: type: string key: type: string + large-message-handle: + $ref: '#/definitions/config.LargeMessageHandleConfig' max-message-bytes: type: integer partition-num: @@ -140,6 +142,11 @@ definitions: write-timeout: type: string type: object + config.LargeMessageHandleConfig: + properties: + large-message-handle-option: + type: string + type: object config.MySQLConfig: properties: enable-batch-dml: @@ -688,6 +695,8 @@ definitions: type: string key: type: string + large_message_handle: + $ref: '#/definitions/v2.LargeMessageHandleConfig' max_message_bytes: type: integer partition_num: @@ -737,6 +746,11 @@ definitions: write_timeout: type: string type: object + v2.LargeMessageHandleConfig: + properties: + large_message_handle_option: + type: string + type: object v2.LogLevelReq: properties: log_level: diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 83d833d5745..32ffd71357b 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -237,6 +237,9 @@ const ( "avro-enable-watermark": true, "avro-decimal-handling-mode": "string", "avro-bigint-unsigned-handling-mode": "string" + }, + "large-message-handle": { + "large-message-handle-option": "handle-key-only" } }, "mysql-config": { @@ -327,6 +330,11 @@ const ( "terminator": "", "date-separator": "month", "enable-partition-separator": true, + "kafka-config": { + "large-message-handle": { + "large-message-handle-option": "handle-key-only" + } + }, "only-output-updated-columns": false, "enable-kafka-sink-v2": true, "only-output-updated-columns": true, diff --git a/pkg/config/large_message.go b/pkg/config/large_message.go new file mode 100644 index 00000000000..52f7c90a261 --- /dev/null +++ b/pkg/config/large_message.go @@ -0,0 +1,77 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import cerror "github.com/pingcap/tiflow/pkg/errors" + +const ( + // LargeMessageHandleOptionNone means not handling large message. + LargeMessageHandleOptionNone string = "none" + // LargeMessageHandleOptionHandleKeyOnly means handling large message by sending only handle key columns. + LargeMessageHandleOptionHandleKeyOnly string = "handle-key-only" +) + +// LargeMessageHandleConfig is the configuration for handling large message. +type LargeMessageHandleConfig struct { + LargeMessageHandleOption string `toml:"large-message-handle-option" json:"large-message-handle-option"` +} + +// NewDefaultLargeMessageHandleConfig return the default LargeMessageHandleConfig. +func NewDefaultLargeMessageHandleConfig() *LargeMessageHandleConfig { + return &LargeMessageHandleConfig{ + LargeMessageHandleOption: LargeMessageHandleOptionNone, + } +} + +// AdjustAndValidate the LargeMessageHandleConfig. +func (c *LargeMessageHandleConfig) AdjustAndValidate(protocol Protocol, enableTiDBExtension bool) error { + if c.LargeMessageHandleOption == "" { + c.LargeMessageHandleOption = LargeMessageHandleOptionNone + } + if c.LargeMessageHandleOption == LargeMessageHandleOptionNone { + return nil + } + + switch protocol { + case ProtocolOpen: + return nil + case ProtocolCanalJSON: + if !enableTiDBExtension { + return cerror.ErrInvalidReplicaConfig.GenWithStack( + "large message handle is set to %s, protocol is %s, but enable-tidb-extension is false", + c.LargeMessageHandleOption, protocol.String()) + } + return nil + default: + } + return cerror.ErrInvalidReplicaConfig.GenWithStack( + "large message handle is set to %s, protocol is %s, it's not supported", + c.LargeMessageHandleOption, protocol.String()) +} + +// HandleKeyOnly returns true if handle large message by encoding handle key only. +func (c *LargeMessageHandleConfig) HandleKeyOnly() bool { + if c == nil { + return false + } + return c.LargeMessageHandleOption == LargeMessageHandleOptionHandleKeyOnly +} + +// Disabled returns true if disable large message handle. +func (c *LargeMessageHandleConfig) Disabled() bool { + if c == nil { + return false + } + return c.LargeMessageHandleOption == LargeMessageHandleOptionNone +} diff --git a/pkg/config/large_message_test.go b/pkg/config/large_message_test.go new file mode 100644 index 00000000000..254b49b9385 --- /dev/null +++ b/pkg/config/large_message_test.go @@ -0,0 +1,78 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "testing" + + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestLargeMessageHandle4NotSupportedProtocol(t *testing.T) { + t.Parallel() + + largeMessageHandle := NewDefaultLargeMessageHandleConfig() + + err := largeMessageHandle.AdjustAndValidate(ProtocolCanal, true) + require.NoError(t, err) + + largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionHandleKeyOnly + err = largeMessageHandle.AdjustAndValidate(ProtocolCanal, true) + require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig) +} + +func TestLargeMessageHandle4CanalJSON(t *testing.T) { + t.Parallel() + + // large-message-handle not set, always no error + largeMessageHandle := NewDefaultLargeMessageHandleConfig() + + err := largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false) + require.NoError(t, err) + require.True(t, largeMessageHandle.Disabled()) + + largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionHandleKeyOnly + + // `enable-tidb-extension` is false, return error + err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false) + require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig) + + // `enable-tidb-extension` is true, no error + err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, true) + require.NoError(t, err) + require.Equal(t, LargeMessageHandleOptionHandleKeyOnly, largeMessageHandle.LargeMessageHandleOption) +} + +func TestLargeMessageHandle4OpenProtocol(t *testing.T) { + t.Parallel() + + // large-message-handle not set, always no error + largeMessageHandle := NewDefaultLargeMessageHandleConfig() + + err := largeMessageHandle.AdjustAndValidate(ProtocolOpen, false) + require.NoError(t, err) + require.True(t, largeMessageHandle.Disabled()) + + largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionHandleKeyOnly + + // `enable-tidb-extension` is false, no error + err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, false) + require.NoError(t, err) + + // `enable-tidb-extension` is true, no error + err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, true) + require.NoError(t, err) + require.Equal(t, LargeMessageHandleOptionHandleKeyOnly, largeMessageHandle.LargeMessageHandleOption) +} diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index f1456c731f2..3fccd047165 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -100,6 +100,9 @@ func TestReplicaConfigMarshal(t *testing.T) { AvroDecimalHandlingMode: aws.String("string"), AvroBigintUnsignedHandlingMode: aws.String("string"), }, + LargeMessageHandle: &LargeMessageHandleConfig{ + LargeMessageHandleOption: LargeMessageHandleOptionHandleKeyOnly, + }, } conf.Sink.MySQLConfig = &MySQLConfig{ WorkerCount: aws.Int(8), @@ -126,7 +129,8 @@ func TestReplicaConfigMarshal(t *testing.T) { b, err := conf.Marshal() require.Nil(t, err) - require.JSONEq(t, testCfgTestReplicaConfigMarshal1, mustIndentJSON(t, b)) + b = mustIndentJSON(t, b) + require.JSONEq(t, testCfgTestReplicaConfigMarshal1, b) conf2 := new(ReplicaConfig) err = conf2.UnmarshalJSON([]byte(testCfgTestReplicaConfigMarshal2)) require.Nil(t, err) @@ -413,3 +417,20 @@ func TestMaskSensitiveData(t *testing.T) { require.Equal(t, "******", *config.Sink.KafkaConfig.SASLPassword) require.Equal(t, "******", *config.Sink.KafkaConfig.SASLGssAPIPassword) } + +func TestValidateAndAdjustLargeMessageHandle(t *testing.T) { + cfg := GetDefaultReplicaConfig() + cfg.Sink.KafkaConfig = &KafkaConfig{ + LargeMessageHandle: NewDefaultLargeMessageHandleConfig(), + } + cfg.Sink.KafkaConfig.LargeMessageHandle.LargeMessageHandleOption = "" + + rawURL := "kafka://127.0.0.1:9092/canal-json-test?protocol=canal-json&enable-tidb-extension=true" + sinkURL, err := url.Parse(rawURL) + require.NoError(t, err) + + err = cfg.ValidateAndAdjust(sinkURL) + require.NoError(t, err) + + require.Equal(t, LargeMessageHandleOptionNone, cfg.Sink.KafkaConfig.LargeMessageHandle.LargeMessageHandleOption) +} diff --git a/pkg/config/sink.go b/pkg/config/sink.go index e8b1a889cdf..4eb50ff215b 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -16,6 +16,7 @@ package config import ( "fmt" "net/url" + "strconv" "strings" "github.com/aws/aws-sdk-go/aws" @@ -324,6 +325,8 @@ type KafkaConfig struct { Key *string `toml:"key" json:"key,omitempty"` InsecureSkipVerify *bool `toml:"insecure-skip-verify" json:"insecure-skip-verify,omitempty"` CodecConfig *CodecConfig `toml:"codec-config" json:"codec-config,omitempty"` + + LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` } // MaskSensitiveData masks sensitive data in KafkaConfig @@ -368,6 +371,24 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { return err } + protocol, _ := ParseSinkProtocolFromString(s.Protocol) + if s.KafkaConfig != nil && s.KafkaConfig.LargeMessageHandle != nil { + var ( + enableTiDBExtension bool + err error + ) + if s := sinkURI.Query().Get("enable-tidb-extension"); s != "" { + enableTiDBExtension, err = strconv.ParseBool(s) + if err != nil { + return errors.Trace(err) + } + } + err = s.KafkaConfig.LargeMessageHandle.AdjustAndValidate(protocol, enableTiDBExtension) + if err != nil { + return err + } + } + for _, rule := range s.DispatchRules { if rule.DispatcherRule != "" && rule.PartitionRule != "" { log.Error("dispatcher and partition cannot be configured both", zap.Any("rule", rule)) diff --git a/pkg/sink/codec/builder/codec_test.go b/pkg/sink/codec/builder/codec_test.go index 846146ccbec..83f02bf0428 100644 --- a/pkg/sink/codec/builder/codec_test.go +++ b/pkg/sink/codec/builder/codec_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/codec/craft" @@ -282,19 +283,18 @@ func BenchmarkProtobuf2Encoding(b *testing.B) { func BenchmarkCraftDecoding(b *testing.B) { allocator := craft.NewSliceAllocator(128) for i := 0; i < b.N; i++ { + decoder := craft.NewBatchDecoderWithAllocator(allocator) for _, message := range codecCraftEncodedRowChanges { - if decoder, err := craft.NewBatchDecoderWithAllocator( - message.Value, allocator); err != nil { + if err := decoder.AddKeyValue(message.Key, message.Value); err != nil { panic(err) - } else { - for { - if _, hasNext, err := decoder.HasNext(); err != nil { - panic(err) - } else if hasNext { - _, _ = decoder.NextRowChangedEvent() - } else { - break - } + } + for { + if _, hasNext, err := decoder.HasNext(); err != nil { + panic(err) + } else if hasNext { + _, _ = decoder.NextRowChangedEvent() + } else { + break } } } @@ -304,17 +304,19 @@ func BenchmarkCraftDecoding(b *testing.B) { func BenchmarkJsonDecoding(b *testing.B) { for i := 0; i < b.N; i++ { for _, message := range codecJSONEncodedRowChanges { - if decoder, err := open.NewBatchDecoder(message.Key, message.Value); err != nil { + codecConfig := common.NewConfig(config.ProtocolOpen) + decoder, err := open.NewBatchDecoder(context.Background(), codecConfig, nil) + require.NoError(b, err) + if err := decoder.AddKeyValue(message.Key, message.Value); err != nil { panic(err) - } else { - for { - if _, hasNext, err := decoder.HasNext(); err != nil { - panic(err) - } else if hasNext { - _, _ = decoder.NextRowChangedEvent() - } else { - break - } + } + for { + if _, hasNext, err := decoder.HasNext(); err != nil { + panic(err) + } else if hasNext { + _, _ = decoder.NextRowChangedEvent() + } else { + break } } } diff --git a/pkg/sink/codec/canal/canal_encoder.go b/pkg/sink/codec/canal/canal_encoder.go index 38a4746374f..6e75e79d109 100644 --- a/pkg/sink/codec/canal/canal_encoder.go +++ b/pkg/sink/codec/canal/canal_encoder.go @@ -52,7 +52,7 @@ func (d *BatchEncoder) AppendRowChangedEvent( e *model.RowChangedEvent, callback func(), ) error { - entry, err := d.entryBuilder.fromRowEvent(e, d.config.OnlyHandleKeyColumns) + entry, err := d.entryBuilder.fromRowEvent(e, d.config.DeleteOnlyHandleKeyColumns) if err != nil { return errors.Trace(err) } diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index 60697914c11..6f09a1bf23c 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -15,34 +15,54 @@ package canal import ( "bytes" + "context" + "database/sql" + "strconv" + "strings" "github.com/goccy/go-json" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/common" "go.uber.org/zap" + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/charmap" ) // batchDecoder decodes the byte into the original message. type batchDecoder struct { - data []byte - msg canalJSONMessageInterface - enableTiDBExtension bool - terminator string + data []byte + msg canalJSONMessageInterface + + config *common.Config + + upstreamTiDB *sql.DB + bytesDecoder *encoding.Decoder } // NewBatchDecoder return a decoder for canal-json -func NewBatchDecoder(data []byte, - enableTiDBExtension bool, - terminator string, -) codec.RowEventDecoder { - return &batchDecoder{ - data: data, - msg: nil, - enableTiDBExtension: enableTiDBExtension, - terminator: terminator, +func NewBatchDecoder( + _ context.Context, codecConfig *common.Config, db *sql.DB, +) (codec.RowEventDecoder, error) { + if codecConfig.LargeMessageHandle.HandleKeyOnly() && db == nil { + return nil, cerror.ErrCodecDecode. + GenWithStack("handle-key-only is enabled, but upstream TiDB is not provided") } + + return &batchDecoder{ + config: codecConfig, + upstreamTiDB: db, + bytesDecoder: charmap.ISO8859_1.NewDecoder(), + }, nil +} + +// AddKeyValue implements the EventBatchDecoder interface +func (b *batchDecoder) AddKeyValue(_, value []byte) error { + b.data = value + return nil } // HasNext implements the RowEventDecoder interface @@ -52,17 +72,17 @@ func (b *batchDecoder) HasNext() (model.MessageType, bool, error) { encodedData []byte ) - if b.enableTiDBExtension { + if b.config.EnableTiDBExtension { msg = &canalJSONMessageWithTiDBExtension{ JSONMessage: &JSONMessage{}, Extensions: &tidbExtension{}, } } - if len(b.terminator) > 0 { - idx := bytes.IndexAny(b.data, b.terminator) + if len(b.config.Terminator) > 0 { + idx := bytes.IndexAny(b.data, b.config.Terminator) if idx >= 0 { encodedData = b.data[:idx] - b.data = b.data[idx+len(b.terminator):] + b.data = b.data[idx+len(b.config.Terminator):] } else { encodedData = b.data b.data = nil @@ -86,6 +106,119 @@ func (b *batchDecoder) HasNext() (model.MessageType, bool, error) { return b.msg.messageType(), true, nil } +func (b *batchDecoder) buildData(holder *common.ColumnsHolder) (map[string]interface{}, map[string]string, error) { + columnsCount := holder.Length() + data := make(map[string]interface{}, columnsCount) + mysqlTypeMap := make(map[string]string, columnsCount) + + for i := 0; i < columnsCount; i++ { + t := holder.Types[i] + name := holder.Types[i].Name() + mysqlType := strings.ToLower(t.DatabaseTypeName()) + + var value string + rawValue := holder.Values[i].([]uint8) + if isBinaryMySQLType(mysqlType) { + rawValue, err := b.bytesDecoder.Bytes(rawValue) + if err != nil { + return nil, nil, errors.Trace(err) + } + value = string(rawValue) + } else if strings.Contains(mysqlType, "bit") || strings.Contains(mysqlType, "set") { + bitValue, err := common.BinaryLiteralToInt(rawValue) + if err != nil { + return nil, nil, errors.Trace(err) + } + value = strconv.FormatUint(bitValue, 10) + } else { + value = string(rawValue) + } + mysqlTypeMap[name] = mysqlType + data[name] = value + } + + return data, mysqlTypeMap, nil +} + +func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent( + ctx context.Context, message *canalJSONMessageWithTiDBExtension, +) (*model.RowChangedEvent, error) { + var ( + commitTs = message.Extensions.CommitTs + schema = message.Schema + table = message.Table + eventType = message.EventType + ) + + handleKeyData := message.getData() + pkNames := make([]string, 0, len(handleKeyData)) + for name := range handleKeyData { + pkNames = append(pkNames, name) + } + + result := &canalJSONMessageWithTiDBExtension{ + JSONMessage: &JSONMessage{ + Schema: schema, + Table: table, + PKNames: pkNames, + + EventType: eventType, + }, + Extensions: &tidbExtension{ + CommitTs: commitTs, + }, + } + + switch eventType { + case "INSERT": + holder, err := common.SnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, handleKeyData) + if err != nil { + return nil, err + } + data, mysqlType, err := b.buildData(holder) + if err != nil { + return nil, err + } + result.MySQLType = mysqlType + result.Data = []map[string]interface{}{data} + case "UPDATE": + holder, err := common.SnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, handleKeyData) + if err != nil { + return nil, err + } + data, mysqlType, err := b.buildData(holder) + if err != nil { + return nil, err + } + result.MySQLType = mysqlType + result.Data = []map[string]interface{}{data} + + holder, err = common.SnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, message.getOld()) + if err != nil { + return nil, err + } + old, _, err := b.buildData(holder) + if err != nil { + return nil, err + } + result.Old = []map[string]interface{}{old} + case "DELETE": + holder, err := common.SnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, handleKeyData) + if err != nil { + return nil, err + } + data, mysqlType, err := b.buildData(holder) + if err != nil { + return nil, err + } + result.MySQLType = mysqlType + result.Data = []map[string]interface{}{data} + } + + b.msg = result + return b.NextRowChangedEvent() +} + // NextRowChangedEvent implements the RowEventDecoder interface // `HasNext` should be called before this. func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { @@ -93,6 +226,15 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { return nil, cerror.ErrCanalDecodeFailed. GenWithStack("not found row changed event message") } + + message, withExtension := b.msg.(*canalJSONMessageWithTiDBExtension) + if withExtension { + ctx := context.Background() + if message.Extensions.OnlyHandleKey { + return b.assembleHandleKeyOnlyRowChangedEvent(ctx, message) + } + } + result, err := canalJSONMessage2RowChange(b.msg) if err != nil { return nil, err diff --git a/pkg/sink/codec/canal/canal_json_decoder_test.go b/pkg/sink/codec/canal/canal_json_decoder_test.go index 3314884274b..d3ec988f432 100644 --- a/pkg/sink/codec/canal/canal_json_decoder_test.go +++ b/pkg/sink/codec/canal/canal_json_decoder_test.go @@ -25,6 +25,8 @@ import ( func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { t.Parallel() + + ctx := context.Background() expectedDecodedValue := collectExpectedDecodedValue(testColumnsTable) for _, encodeEnable := range []bool{false, true} { encoder := newJSONRowEventEncoder(&common.Config{ @@ -42,7 +44,12 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { msg := messages[0] for _, decodeEnable := range []bool{false, true} { - decoder := NewBatchDecoder(msg.Value, decodeEnable, "") + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = decodeEnable + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + err = decoder.AddKeyValue(msg.Key, msg.Value) + require.NoError(t, err) ty, hasNext, err := decoder.HasNext() require.Nil(t, err) @@ -83,11 +90,12 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { func TestNewCanalJSONBatchDecoder4DDLMessage(t *testing.T) { t.Parallel() + + ctx := context.Background() for _, encodeEnable := range []bool{false, true} { - encoder := &JSONRowEventEncoder{ - builder: newCanalEntryBuilder(), - config: &common.Config{EnableTiDBExtension: encodeEnable}, - } + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = encodeEnable + encoder := newJSONRowEventEncoder(codecConfig) require.NotNil(t, encoder) result, err := encoder.EncodeDDLEvent(testCaseDDL) @@ -95,7 +103,12 @@ func TestNewCanalJSONBatchDecoder4DDLMessage(t *testing.T) { require.NotNil(t, result) for _, decodeEnable := range []bool{false, true} { - decoder := NewBatchDecoder(result.Value, decodeEnable, "") + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = decodeEnable + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + err = decoder.AddKeyValue(nil, result.Value) + require.NoError(t, err) ty, hasNext, err := decoder.HasNext() require.Nil(t, err) @@ -130,7 +143,16 @@ func TestCanalJSONBatchDecoderWithTerminator(t *testing.T) { encodedValue := `{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1668067205238,"ts":1668067206650,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}],"old":null} {"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1668067229137,"ts":1668067230720,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}]} {"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"DELETE","es":1668067230388,"ts":1668067231725,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":null}` - decoder := NewBatchDecoder([]byte(encodedValue), false, "\n") + ctx := context.Background() + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.LargeMessageHandle = config.NewDefaultLargeMessageHandleConfig() + codecConfig.Terminator = "\n" + + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + err = decoder.AddKeyValue(nil, []byte(encodedValue)) + require.NoError(t, err) cnt := 0 for { tp, hasNext, err := decoder.HasNext() diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index 09ecf894d55..3b089f5e615 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -131,8 +131,9 @@ func (c *JSONMessage) pkNameSet() map[string]struct{} { } type tidbExtension struct { - CommitTs uint64 `json:"commitTs,omitempty"` - WatermarkTs uint64 `json:"watermarkTs,omitempty"` + CommitTs uint64 `json:"commitTs,omitempty"` + WatermarkTs uint64 `json:"watermarkTs,omitempty"` + OnlyHandleKey bool `json:"onlyHandleKey,omitempty"` } type canalJSONMessageWithTiDBExtension struct { @@ -157,13 +158,11 @@ func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChange } mysqlType := msg.getMySQLType() - javaSQLType := msg.getJavaSQLType() var err error if msg.eventType() == canal.EventType_DELETE { // for `DELETE` event, `data` contain the old data, set it as the `PreColumns` - result.PreColumns, err = canalJSONColumnMap2RowChangeColumns( - msg.getData(), mysqlType, javaSQLType) + result.PreColumns, err = canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType) // canal-json encoder does not encode `Flag` information into the result, // we have to set the `Flag` to make it can be handled by MySQL Sink. // see https://github.com/pingcap/tiflow/blob/7bfce98/cdc/sink/mysql.go#L869-L888 @@ -172,16 +171,14 @@ func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChange } // for `INSERT` and `UPDATE`, `data` contain fresh data, set it as the `Columns` - result.Columns, err = canalJSONColumnMap2RowChangeColumns(msg.getData(), - mysqlType, javaSQLType) + result.Columns, err = canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType) if err != nil { return nil, err } // for `UPDATE`, `old` contain old data, set it as the `PreColumns` if msg.eventType() == canal.EventType_UPDATE { - result.PreColumns, err = canalJSONColumnMap2RowChangeColumns(msg.getOld(), - mysqlType, javaSQLType) + result.PreColumns, err = canalJSONColumnMap2RowChangeColumns(msg.getOld(), mysqlType) if err != nil { return nil, err } @@ -191,15 +188,9 @@ func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*model.RowChange return result, nil } -func canalJSONColumnMap2RowChangeColumns(cols map[string]interface{}, mysqlType map[string]string, javaSQLType map[string]int32) ([]*model.Column, error) { +func canalJSONColumnMap2RowChangeColumns(cols map[string]interface{}, mysqlType map[string]string) ([]*model.Column, error) { result := make([]*model.Column, 0, len(cols)) for name, value := range cols { - javaType, ok := javaSQLType[name] - if !ok { - // this should not happen, else we have to check encoding for javaSQLType. - return nil, cerrors.ErrCanalDecodeFailed.GenWithStack( - "java sql type does not found, column: %+v, mysqlType: %+v", name, javaSQLType) - } mysqlTypeStr, ok := mysqlType[name] if !ok { // this should not happen, else we have to check encoding for mysqlType. @@ -207,9 +198,9 @@ func canalJSONColumnMap2RowChangeColumns(cols map[string]interface{}, mysqlType "mysql type does not found, column: %+v, mysqlType: %+v", name, mysqlType) } mysqlTypeStr = trimUnsignedFromMySQLType(mysqlTypeStr) + isBinary := isBinaryMySQLType(mysqlTypeStr) mysqlType := types.StrToType(mysqlTypeStr) - col := internal.NewColumn(value, mysqlType). - ToCanalJSONFormatColumn(name, internal.JavaSQLType(javaType)) + col := internal.NewColumn(value, mysqlType).ToCanalJSONFormatColumn(name, isBinary) result = append(result, col) } if len(result) == 0 { @@ -254,3 +245,7 @@ func getDDLActionType(query string) timodel.ActionType { return timodel.ActionNone } + +func isBinaryMySQLType(mysqlType string) bool { + return strings.Contains(mysqlType, "blob") || strings.Contains(mysqlType, "binary") +} diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder.go b/pkg/sink/codec/canal/canal_json_row_event_encoder.go index aa2f0dee4a7..618fae95c7b 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder.go @@ -29,61 +29,68 @@ import ( "go.uber.org/zap" ) -func newJSONMessageForDML( +func fillColumns(columns []*model.Column, out *jwriter.Writer, + onlyOutputUpdatedColumn bool, + onlyHandleKeyColumns bool, + newColumnMap map[string]*model.Column, builder *canalEntryBuilder, +) error { + if len(columns) == 0 { + out.RawString("null") + return nil + } + out.RawByte('[') + out.RawByte('{') + isFirst := true + for _, col := range columns { + if col != nil { + // column equal, do not output it + if onlyOutputUpdatedColumn && shouldIgnoreColumn(col, newColumnMap) { + continue + } + if onlyHandleKeyColumns && !col.Flag.IsHandleKey() { + continue + } + if isFirst { + isFirst = false + } else { + out.RawByte(',') + } + mysqlType := getMySQLType(col) + javaType, err := getJavaSQLType(col, mysqlType) + if err != nil { + return cerror.WrapError(cerror.ErrCanalEncodeFailed, err) + } + value, err := builder.formatValue(col.Value, javaType) + if err != nil { + return cerror.WrapError(cerror.ErrCanalEncodeFailed, err) + } + out.String(col.Name) + out.RawByte(':') + if col.Value == nil { + out.RawString("null") + } else { + out.String(value) + } + } + } + out.RawByte('}') + out.RawByte(']') + return nil +} + +func newJSONMessageForDML( e *model.RowChangedEvent, config *common.Config, + builder *canalEntryBuilder, + messageTooLarge bool, ) ([]byte, error) { isDelete := e.IsDelete() mysqlTypeMap := make(map[string]string, len(e.Columns)) - filling := func(columns []*model.Column, out *jwriter.Writer, - onlyOutputUpdatedColumn bool, - onlyHandleKeyColumns bool, - newColumnMap map[string]*model.Column, - ) error { - if len(columns) == 0 { - out.RawString("null") - return nil - } - out.RawByte('[') - out.RawByte('{') - isFirst := true - for _, col := range columns { - if col != nil { - // column equal, do not output it - if onlyOutputUpdatedColumn && shouldIgnoreColumn(col, newColumnMap) { - continue - } - if onlyHandleKeyColumns && !col.Flag.IsHandleKey() { - continue - } - if isFirst { - isFirst = false - } else { - out.RawByte(',') - } - mysqlType := getMySQLType(col) - javaType, err := getJavaSQLType(col, mysqlType) - if err != nil { - return cerror.WrapError(cerror.ErrCanalEncodeFailed, err) - } - value, err := builder.formatValue(col.Value, javaType) - if err != nil { - return cerror.WrapError(cerror.ErrCanalEncodeFailed, err) - } - out.String(col.Name) - out.RawByte(':') - if col.Value == nil { - out.RawString("null") - } else { - out.String(value) - } - } - } - out.RawByte('}') - out.RawByte(']') - return nil + onlyHandleKey := messageTooLarge + if isDelete && config.DeleteOnlyHandleKeyColumns { + onlyHandleKey = true } out := &jwriter.Writer{} @@ -155,7 +162,7 @@ func newJSONMessageForDML( emptyColumn := true for _, col := range columns { if col != nil { - if isDelete && config.OnlyHandleKeyColumns && !col.Flag.IsHandleKey() { + if onlyHandleKey && !col.Flag.IsHandleKey() { continue } if emptyColumn { @@ -206,13 +213,13 @@ func newJSONMessageForDML( if e.IsDelete() { out.RawString(",\"old\":null") out.RawString(",\"data\":") - if err := filling(e.PreColumns, out, false, config.OnlyHandleKeyColumns, nil); err != nil { + if err := fillColumns(e.PreColumns, out, false, onlyHandleKey, nil, builder); err != nil { return nil, err } } else if e.IsInsert() { out.RawString(",\"old\":null") out.RawString(",\"data\":") - if err := filling(e.Columns, out, false, false, nil); err != nil { + if err := fillColumns(e.Columns, out, false, onlyHandleKey, nil, builder); err != nil { return nil, err } } else if e.IsUpdate() { @@ -224,11 +231,11 @@ func newJSONMessageForDML( } } out.RawString(",\"old\":") - if err := filling(e.PreColumns, out, config.OnlyOutputUpdatedColumns, false, newColsMap); err != nil { + if err := fillColumns(e.PreColumns, out, config.OnlyOutputUpdatedColumns, onlyHandleKey, newColsMap, builder); err != nil { return nil, err } out.RawString(",\"data\":") - if err := filling(e.Columns, out, false, false, nil); err != nil { + if err := fillColumns(e.Columns, out, false, onlyHandleKey, nil, builder); err != nil { return nil, err } } else { @@ -241,6 +248,14 @@ func newJSONMessageForDML( out.RawByte('{') out.RawString("\"commitTs\":") out.Uint64(e.CommitTs) + + if messageTooLarge { + if config.LargeMessageHandle.HandleKeyOnly() { + out.RawByte(',') + out.RawString("\"onlyHandleKey\":true") + } + } + out.RawByte('}') } out.RawByte('}') @@ -335,20 +350,11 @@ func (c *JSONRowEventEncoder) AppendRowChangedEvent( e *model.RowChangedEvent, callback func(), ) error { - value, err := newJSONMessageForDML(c.builder, e, c.config) + value, err := newJSONMessageForDML(e, c.config, c.builder, false) if err != nil { return errors.Trace(err) } - length := len(value) + common.MaxRecordOverhead - // for single message that is longer than max-message-bytes, do not send it. - if length > c.config.MaxMessageBytes { - log.Warn("Single message is too large for canal-json", - zap.Int("maxMessageBytes", c.config.MaxMessageBytes), - zap.Int("length", length), - zap.Any("table", e.Table)) - return cerror.ErrMessageTooLarge.GenWithStackByArgs() - } m := &common.Message{ Key: nil, Value: value, @@ -361,6 +367,32 @@ func (c *JSONRowEventEncoder) AppendRowChangedEvent( } m.IncRowsCount() + if m.Length() > c.config.MaxMessageBytes { + // for single message that is longer than max-message-bytes, do not send it. + if c.config.LargeMessageHandle.Disabled() { + log.Error("Single message is too large for canal-json", + zap.Int("maxMessageBytes", c.config.MaxMessageBytes), + zap.Int("length", m.Length()), + zap.Any("table", e.Table)) + return cerror.ErrMessageTooLarge.GenWithStackByArgs() + } + + if c.config.LargeMessageHandle.HandleKeyOnly() { + value, err = newJSONMessageForDML(e, c.config, c.builder, true) + if err != nil { + return cerror.ErrMessageTooLarge.GenWithStackByArgs() + } + m.Value = value + if m.Length() > c.config.MaxMessageBytes { + log.Error("Single message is too large for canal-json, only encode handle-key columns", + zap.Int("maxMessageBytes", c.config.MaxMessageBytes), + zap.Int("length", m.Length()), + zap.Any("table", e.Table)) + return cerror.ErrMessageTooLarge.GenWithStackByArgs() + } + } + } + c.messages = append(c.messages, m) return nil } diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index 6fd0f21700d..75651f08899 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -47,7 +47,7 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { encoder, ok := e.(*JSONRowEventEncoder) require.True(t, ok) - data, err := newJSONMessageForDML(encoder.builder, testCaseInsert, encoder.config) + data, err := newJSONMessageForDML(testCaseInsert, encoder.config, encoder.builder, false) require.Nil(t, err) var msg canalJSONMessageInterface = &JSONMessage{} err = json.Unmarshal(data, msg) @@ -96,7 +96,7 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.Equal(t, item.expectedEncodedValue, obtainedValue) } - data, err = newJSONMessageForDML(encoder.builder, testCaseUpdate, encoder.config) + data, err = newJSONMessageForDML(testCaseUpdate, encoder.config, encoder.builder, false) require.Nil(t, err) jsonMsg = &JSONMessage{} err = json.Unmarshal(data, jsonMsg) @@ -114,7 +114,7 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.Contains(t, jsonMsg.Old[0], col.Name) } - data, err = newJSONMessageForDML(encoder.builder, testCaseDelete, encoder.config) + data, err = newJSONMessageForDML(testCaseDelete, encoder.config, encoder.builder, false) require.Nil(t, err) jsonMsg = &JSONMessage{} err = json.Unmarshal(data, jsonMsg) @@ -127,7 +127,7 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.Contains(t, jsonMsg.Data[0], col.Name) } - data, err = newJSONMessageForDML(encoder.builder, testCaseDelete, &common.Config{OnlyHandleKeyColumns: true}) + data, err = newJSONMessageForDML(testCaseDelete, &common.Config{DeleteOnlyHandleKeyColumns: true}, encoder.builder, false) require.NoError(t, err) jsonMsg = &JSONMessage{} err = json.Unmarshal(data, jsonMsg) @@ -156,7 +156,7 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { encoder, ok = e.(*JSONRowEventEncoder) require.True(t, ok) - data, err = newJSONMessageForDML(encoder.builder, testCaseUpdate, encoder.config) + data, err = newJSONMessageForDML(testCaseUpdate, encoder.config, encoder.builder, false) require.Nil(t, err) withExtension := &canalJSONMessageWithTiDBExtension{} @@ -168,7 +168,7 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { encoder, ok = e.(*JSONRowEventEncoder) require.True(t, ok) - data, err = newJSONMessageForDML(encoder.builder, testCaseUpdate, encoder.config) + data, err = newJSONMessageForDML(testCaseUpdate, encoder.config, encoder.builder, false) require.Nil(t, err) withExtension = &canalJSONMessageWithTiDBExtension{} @@ -180,6 +180,38 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.Equal(t, testCaseUpdate.CommitTs, withExtension.Extensions.CommitTs) } +func TestNewCanalJSONMessageHandleKeyOnly4LargeMessage(t *testing.T) { + t.Parallel() + + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = true + codecConfig.LargeMessageHandle.LargeMessageHandleOption = config.LargeMessageHandleOptionHandleKeyOnly + codecConfig.MaxMessageBytes = 500 + encoder := newJSONRowEventEncoder(codecConfig) + + err := encoder.AppendRowChangedEvent(context.Background(), "", testCaseInsert, func() {}) + require.NoError(t, err) + + message := encoder.Build()[0] + + var decoded canalJSONMessageWithTiDBExtension + err = json.Unmarshal(message.Value, &decoded) + require.NoError(t, err) + require.True(t, decoded.Extensions.OnlyHandleKey) + + for _, col := range testCaseInsert.Columns { + if col.Flag.IsHandleKey() { + require.Contains(t, decoded.Data[0], col.Name) + require.Contains(t, decoded.SQLType, col.Name) + require.Contains(t, decoded.MySQLType, col.Name) + } else { + require.NotContains(t, decoded.Data[0], col.Name) + require.NotContains(t, decoded.SQLType, col.Name) + require.NotContains(t, decoded.MySQLType, col.Name) + } + } +} + func TestNewCanalJSONMessageFromDDL(t *testing.T) { t.Parallel() @@ -252,14 +284,9 @@ func TestEncodeCheckpointEvent(t *testing.T) { t.Parallel() var watermark uint64 = 2333 for _, enable := range []bool{false, true} { - config := &common.Config{ - EnableTiDBExtension: enable, - } - encoder := &JSONRowEventEncoder{ - builder: newCanalEntryBuilder(), - config: config, - } - + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + codecConfig.EnableTiDBExtension = enable + encoder := newJSONRowEventEncoder(codecConfig).(*JSONRowEventEncoder) require.NotNil(t, encoder) msg, err := encoder.EncodeCheckpointEvent(watermark) @@ -271,7 +298,13 @@ func TestEncodeCheckpointEvent(t *testing.T) { } require.NotNil(t, msg) - decoder := NewBatchDecoder(msg.Value, enable, "") + + ctx := context.Background() + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + err = decoder.AddKeyValue(msg.Key, msg.Value) + require.NoError(t, err) ty, hasNext, err := decoder.HasNext() require.Nil(t, err) @@ -487,6 +520,7 @@ func TestMaxMessageBytes(t *testing.T) { // the test message length is larger than max-message-bytes cfg = cfg.WithMaxMessageBytes(100) + cfg.LargeMessageHandle = config.NewDefaultLargeMessageHandleConfig() encoder = NewJSONRowEventEncoderBuilder(cfg).Build() err = encoder.AppendRowChangedEvent(ctx, topic, testEvent, nil) require.NotNil(t, err) diff --git a/pkg/sink/codec/canal/canal_json_txn_event_encoder.go b/pkg/sink/codec/canal/canal_json_txn_event_encoder.go index 67d1055c937..bc47ba89abd 100644 --- a/pkg/sink/codec/canal/canal_json_txn_event_encoder.go +++ b/pkg/sink/codec/canal/canal_json_txn_event_encoder.go @@ -50,7 +50,7 @@ func (j *JSONTxnEventEncoder) AppendTxnEvent( callback func(), ) error { for _, row := range txn.Rows { - value, err := newJSONMessageForDML(j.builder, row, j.config) + value, err := newJSONMessageForDML(row, j.config, j.builder, false) if err != nil { return errors.Trace(err) } diff --git a/pkg/sink/codec/common/config.go b/pkg/sink/codec/common/config.go index 45acc0984c4..5c9e59ab27c 100644 --- a/pkg/sink/codec/common/config.go +++ b/pkg/sink/codec/common/config.go @@ -38,7 +38,7 @@ type Config struct { MaxBatchSize int // onlyHandleKeyColumns is true, for the delete event only output the handle key columns. - OnlyHandleKeyColumns bool + DeleteOnlyHandleKeyColumns bool EnableTiDBExtension bool EnableRowChecksum bool @@ -58,6 +58,8 @@ type Config struct { // for open protocol OnlyOutputUpdatedColumns bool + + LargeMessageHandle *config.LargeMessageHandleConfig } // NewConfig return a Config for codec @@ -75,6 +77,8 @@ func NewConfig(protocol config.Protocol) *Config { AvroBigintUnsignedHandlingMode: "long", OnlyOutputUpdatedColumns: false, + + LargeMessageHandle: config.NewDefaultLargeMessageHandleConfig(), } } @@ -155,6 +159,16 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er c.IncludeCommitTs = replicaConfig.Sink.CSVConfig.IncludeCommitTs c.BinaryEncodingMethod = replicaConfig.Sink.CSVConfig.BinaryEncodingMethod } + + if replicaConfig.Sink.KafkaConfig != nil { + if replicaConfig.Sink.KafkaConfig.LargeMessageHandle != nil { + c.LargeMessageHandle = replicaConfig.Sink.KafkaConfig.LargeMessageHandle + } + if c.LargeMessageHandle.HandleKeyOnly() && replicaConfig.ForceReplicate { + return cerror.ErrCodecInvalidConfig.GenWithStack( + `force-replicate must be disabled, when the large message handle option is set to "handle-key-only"`) + } + } } if urlParameter.OnlyOutputUpdatedColumns != nil { c.OnlyOutputUpdatedColumns = *urlParameter.OnlyOutputUpdatedColumns @@ -170,7 +184,7 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er c.EnableRowChecksum = replicaConfig.Integrity.Enabled() } - c.OnlyHandleKeyColumns = !replicaConfig.EnableOldValue + c.DeleteOnlyHandleKeyColumns = !replicaConfig.EnableOldValue return nil } @@ -269,5 +283,12 @@ func (c *Config) Validate() error { ) } + if c.LargeMessageHandle != nil { + err := c.LargeMessageHandle.AdjustAndValidate(c.Protocol, c.EnableTiDBExtension) + if err != nil { + return err + } + } + return nil } diff --git a/pkg/sink/codec/common/config_test.go b/pkg/sink/codec/common/config_test.go index e55e73029cf..85d1d46e8d8 100644 --- a/pkg/sink/codec/common/config_test.go +++ b/pkg/sink/codec/common/config_test.go @@ -19,6 +19,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/integrity" "github.com/stretchr/testify/require" ) @@ -113,7 +114,7 @@ func TestConfigApplyValidate(t *testing.T) { err = c.Apply(sinkURI, replicaConfig) require.NoError(t, err) require.True(t, c.EnableTiDBExtension) - require.False(t, c.OnlyHandleKeyColumns) + require.False(t, c.DeleteOnlyHandleKeyColumns) err = c.Validate() require.NoError(t, err) @@ -121,7 +122,7 @@ func TestConfigApplyValidate(t *testing.T) { replicaConfig.EnableOldValue = false err = c.Apply(sinkURI, replicaConfig) require.NoError(t, err) - require.True(t, c.OnlyHandleKeyColumns) + require.True(t, c.DeleteOnlyHandleKeyColumns) uri = "kafka://127.0.0.1:9092/abc?protocol=canal-json&enable-tidb-extension=a" sinkURI, err = url.Parse(uri) @@ -348,3 +349,105 @@ func TestMergeConfig(t *testing.T) { require.Equal(t, 123, c.MaxMessageBytes) require.Equal(t, 456, c.MaxBatchSize) } + +func TestCanalJSONHandleKeyOnly(t *testing.T) { + t.Parallel() + + // handle-key-only not enabled, always no error + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.KafkaConfig = &config.KafkaConfig{ + LargeMessageHandle: config.NewDefaultLargeMessageHandleConfig(), + } + + uri := "kafka://127.0.0.1:9092/canal-json?protocol=canal-json" + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + + codecConfig := NewConfig(config.ProtocolCanalJSON) + err = codecConfig.Apply(sinkURI, replicaConfig) + require.NoError(t, err) + + err = codecConfig.Validate() + require.NoError(t, err) + require.True(t, codecConfig.LargeMessageHandle.Disabled()) + + // enable handle-key only + replicaConfig.Sink.KafkaConfig.LargeMessageHandle.LargeMessageHandleOption = config.LargeMessageHandleOptionHandleKeyOnly + + // `enable-tidb-extension` is false, return error + uri = "kafka://127.0.0.1:9092/large-message-handle?protocol=canal-json" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + + codecConfig = NewConfig(config.ProtocolCanal) + err = codecConfig.Apply(sinkURI, replicaConfig) + require.NoError(t, err) + err = codecConfig.Validate() + require.Error(t, err) + + // canal-json, `enable-tidb-extension` is true, no error + uri = "kafka://127.0.0.1:9092/large-message-handle?protocol=canal-json&enable-tidb-extension=true" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + + codecConfig = NewConfig(config.ProtocolCanalJSON) + err = codecConfig.Apply(sinkURI, replicaConfig) + require.NoError(t, err) + err = codecConfig.Validate() + require.NoError(t, err) + + require.True(t, codecConfig.LargeMessageHandle.HandleKeyOnly()) + + // force-replicate is set to true, should return error + replicaConfig.ForceReplicate = true + err = codecConfig.Apply(sinkURI, replicaConfig) + require.ErrorIs(t, err, cerror.ErrCodecInvalidConfig) +} + +func TestOpenProtocolHandleKeyOnly(t *testing.T) { + t.Parallel() + + // large message handle is set to default, none. + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.KafkaConfig = &config.KafkaConfig{ + LargeMessageHandle: config.NewDefaultLargeMessageHandleConfig(), + } + + // enable-tidb-extension is false, should always success, no error + uri := "kafka://127.0.0.1:9092/large-message-handle?protocol=open-protocol" + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + + codecConfig := NewConfig(config.ProtocolOpen) + err = codecConfig.Apply(sinkURI, replicaConfig) + require.NoError(t, err) + err = codecConfig.Validate() + require.NoError(t, err) + require.True(t, codecConfig.LargeMessageHandle.Disabled()) + + // enable-tidb-extension is true, should always success, no error + uri = "kafka://127.0.0.1:9092/large-message-handle?protocol=open-protocol&enable-tidb-extension=true" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + + codecConfig = NewConfig(config.ProtocolOpen) + err = codecConfig.Apply(sinkURI, replicaConfig) + require.NoError(t, err) + err = codecConfig.Validate() + require.NoError(t, err) + require.True(t, codecConfig.LargeMessageHandle.Disabled()) + + // enable handle-key only as the large message handle option + replicaConfig.Sink.KafkaConfig.LargeMessageHandle.LargeMessageHandleOption = config.LargeMessageHandleOptionHandleKeyOnly + + // no matter enable-tidb-extension, always no error + uri = "kafka://127.0.0.1:9092/large-message-handle?protocol=open-protocol" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + + codecConfig = NewConfig(config.ProtocolOpen) + err = codecConfig.Apply(sinkURI, replicaConfig) + require.NoError(t, err) + err = codecConfig.Validate() + require.NoError(t, err) +} diff --git a/pkg/sink/codec/common/helper.go b/pkg/sink/codec/common/helper.go new file mode 100644 index 00000000000..489384f2dc6 --- /dev/null +++ b/pkg/sink/codec/common/helper.go @@ -0,0 +1,169 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "context" + "database/sql" + "fmt" + "math" + + "github.com/go-sql-driver/mysql" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" +) + +// ColumnsHolder read columns from sql.Rows +type ColumnsHolder struct { + Values []interface{} + ValuePointers []interface{} + Types []*sql.ColumnType +} + +func newColumnHolder(rows *sql.Rows) (*ColumnsHolder, error) { + columnTypes, err := rows.ColumnTypes() + if err != nil { + return nil, errors.Trace(err) + } + + values := make([]interface{}, len(columnTypes)) + valuePointers := make([]interface{}, len(columnTypes)) + for i := range values { + valuePointers[i] = &values[i] + } + + return &ColumnsHolder{ + Values: values, + ValuePointers: valuePointers, + Types: columnTypes, + }, nil +} + +// Length return the column count +func (h *ColumnsHolder) Length() int { + return len(h.Values) +} + +// SnapshotQuery query the db by the snapshot read with the given commitTs +func SnapshotQuery( + ctx context.Context, db *sql.DB, commitTs uint64, schema, table string, conditions map[string]interface{}, +) (*ColumnsHolder, error) { + // 1. set snapshot read + query := fmt.Sprintf("set @@tidb_snapshot=%d", commitTs) + conn, err := db.Conn(ctx) + if err != nil { + log.Error("establish connection to the upstream tidb failed", + zap.String("query", query), + zap.String("schema", schema), zap.String("table", table), + zap.Uint64("commitTs", commitTs), zap.Error(err)) + return nil, errors.Trace(err) + } + defer conn.Close() + + _, err = conn.ExecContext(ctx, query) + if err != nil { + mysqlErr, ok := errors.Cause(err).(*mysql.MySQLError) + if ok { + // Error 8055 (HY000): snapshot is older than GC safe point + if mysqlErr.Number == 8055 { + log.Error("set snapshot read failed, since snapshot is older than GC safe point") + } + } + + log.Error("set snapshot read failed", + zap.String("query", query), + zap.String("schema", schema), zap.String("table", table), + zap.Uint64("commitTs", commitTs), zap.Error(err)) + return nil, errors.Trace(err) + } + + // 2. query the whole row + query = fmt.Sprintf("select * from `%s`.`%s` where ", schema, table) + var whereClause string + for name, value := range conditions { + if whereClause != "" { + whereClause += " and " + } + whereClause += fmt.Sprintf("`%s` = '%v'", name, value) + } + query += whereClause + + rows, err := conn.QueryContext(ctx, query) + if err != nil { + log.Error("query row failed", + zap.String("query", query), + zap.String("schema", schema), zap.String("table", table), + zap.Uint64("commitTs", commitTs), zap.Error(err)) + return nil, errors.Trace(err) + } + defer rows.Close() + + holder, err := newColumnHolder(rows) + if err != nil { + log.Error("obtain the columns holder failed", + zap.String("query", query), + zap.String("schema", schema), zap.String("table", table), + zap.Uint64("commitTs", commitTs), zap.Error(err)) + return nil, err + } + for rows.Next() { + err = rows.Scan(holder.ValuePointers...) + if err != nil { + log.Error("scan row failed", + zap.String("query", query), + zap.String("schema", schema), zap.String("table", table), + zap.Uint64("commitTs", commitTs), zap.Error(err)) + return nil, errors.Trace(err) + } + } + + return holder, nil +} + +// BinaryLiteralToInt convert bytes into uint64, +// by follow https://github.com/pingcap/tidb/blob/e3417913f58cdd5a136259b902bf177eaf3aa637/types/binary_literal.go#L105 +func BinaryLiteralToInt(bytes []byte) (uint64, error) { + bytes = trimLeadingZeroBytes(bytes) + length := len(bytes) + + if length > 8 { + log.Error("invalid bit value found", zap.ByteString("value", bytes)) + return math.MaxUint64, errors.New("invalid bit value") + } + + if length == 0 { + return 0, nil + } + + // Note: the byte-order is BigEndian. + val := uint64(bytes[0]) + for i := 1; i < length; i++ { + val = (val << 8) | uint64(bytes[i]) + } + return val, nil +} + +func trimLeadingZeroBytes(bytes []byte) []byte { + if len(bytes) == 0 { + return bytes + } + pos, posMax := 0, len(bytes)-1 + for ; pos < posMax; pos++ { + if bytes[pos] != 0 { + break + } + } + return bytes[pos:] +} diff --git a/pkg/sink/codec/craft/craft_decoder.go b/pkg/sink/codec/craft/craft_decoder.go index fc351c31487..0dddcb50610 100644 --- a/pkg/sink/codec/craft/craft_decoder.go +++ b/pkg/sink/codec/craft/craft_decoder.go @@ -58,8 +58,7 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { return nil, errors.Trace(err) } if !hasNext || ty != model.MessageTypeRow { - return nil, - cerror.ErrCraftCodecInvalidData.GenWithStack("not found row changed event message") + return nil, cerror.ErrCraftCodecInvalidData.GenWithStack("not found row changed event message") } oldValue, newValue, err := b.decoder.RowChangedEvent(b.index) if err != nil { @@ -120,25 +119,32 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) { // newBatchDecoder creates a new batchDecoder. func newBatchDecoder(bits []byte) (codec.RowEventDecoder, error) { - return NewBatchDecoderWithAllocator(bits, NewSliceAllocator(64)) + decoder := NewBatchDecoderWithAllocator(NewSliceAllocator(64)) + err := decoder.AddKeyValue(nil, bits) + return decoder, err } // NewBatchDecoderWithAllocator creates a new batchDecoder with given allocator. func NewBatchDecoderWithAllocator( - bits []byte, allocator *SliceAllocator, -) (codec.RowEventDecoder, error) { - decoder, err := NewMessageDecoder(bits, allocator) + allocator *SliceAllocator, +) codec.RowEventDecoder { + return &batchDecoder{ + allocator: allocator, + } +} + +// AddKeyValue implements the EventBatchDecoder interface +func (b *batchDecoder) AddKeyValue(_, value []byte) error { + decoder, err := NewMessageDecoder(value, b.allocator) if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } headers, err := decoder.Headers() if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } + b.decoder = decoder + b.headers = headers - return &batchDecoder{ - headers: headers, - decoder: decoder, - allocator: allocator, - }, nil + return nil } diff --git a/pkg/sink/codec/craft/craft_encoder.go b/pkg/sink/codec/craft/craft_encoder.go index 3057a04c98e..5de08131741 100644 --- a/pkg/sink/codec/craft/craft_encoder.go +++ b/pkg/sink/codec/craft/craft_encoder.go @@ -47,7 +47,7 @@ func (e *BatchEncoder) AppendRowChangedEvent( ev *model.RowChangedEvent, callback func(), ) error { - rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev, e.config.OnlyHandleKeyColumns) + rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev, e.config.DeleteOnlyHandleKeyColumns) if callback != nil { e.callbackBuf = append(e.callbackBuf, callback) } diff --git a/pkg/sink/codec/csv/csv_decoder.go b/pkg/sink/codec/csv/csv_decoder.go index 6198a52dad2..0b2442a68f0 100644 --- a/pkg/sink/codec/csv/csv_decoder.go +++ b/pkg/sink/codec/csv/csv_decoder.go @@ -74,6 +74,11 @@ func NewBatchDecoder(ctx context.Context, }, nil } +// AddKeyValue implements the EventBatchDecoder interface. +func (b *batchDecoder) AddKeyValue(_, _ []byte) error { + return nil +} + // HasNext implements the RowEventDecoder interface. func (b *batchDecoder) HasNext() (model.MessageType, bool, error) { err := b.parser.ReadRow() diff --git a/pkg/sink/codec/decoder.go b/pkg/sink/codec/decoder.go index 963cbee4f2e..acfdb7ee216 100644 --- a/pkg/sink/codec/decoder.go +++ b/pkg/sink/codec/decoder.go @@ -18,6 +18,11 @@ import "github.com/pingcap/tiflow/cdc/model" // RowEventDecoder is an abstraction for events decoder // this interface is only for testing now type RowEventDecoder interface { + // AddKeyValue add the received key and values to the decoder, + // should be called before `HasNext` + // decoder decode the key and value into the event format. + AddKeyValue(key, value []byte) error + // HasNext returns // 1. the type of the next event // 2. a bool if the next event is exist diff --git a/pkg/sink/codec/internal/column.go b/pkg/sink/codec/internal/column.go index 28e968575d3..ec12be8725f 100644 --- a/pkg/sink/codec/internal/column.go +++ b/pkg/sink/codec/internal/column.go @@ -103,13 +103,13 @@ func (c *Column) ToRowChangeColumn(name string) *model.Column { } // ToCanalJSONFormatColumn converts from a codec column to a row changed column in canal-json format. -func (c *Column) ToCanalJSONFormatColumn(name string, javaType JavaSQLType) *model.Column { +func (c *Column) ToCanalJSONFormatColumn(name string, isBlob bool) *model.Column { col := new(model.Column) col.Type = c.Type col.Flag = c.Flag col.Name = name col.Value = c.Value - if c.Value == nil { + if col.Value == nil { return col } @@ -118,7 +118,7 @@ func (c *Column) ToCanalJSONFormatColumn(name string, javaType JavaSQLType) *mod log.Panic("canal-json encoded message should have type in `string`") } - if javaType == JavaSQLTypeBIT { + if col.Type == mysql.TypeBit || col.Type == mysql.TypeSet { val, err := strconv.ParseUint(value, 10, 64) if err != nil { log.Panic("invalid column value for bit", zap.Any("col", c), zap.Error(err)) @@ -127,16 +127,14 @@ func (c *Column) ToCanalJSONFormatColumn(name string, javaType JavaSQLType) *mod return col } - if javaType != JavaSQLTypeBLOB { - col.Value = value - return col - } - - // when encoding the `JavaSQLTypeBLOB`, use `ISO8859_1` decoder, now reverse it back. - encoder := charmap.ISO8859_1.NewEncoder() - value, err := encoder.String(value) - if err != nil { - log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err)) + var err error + if isBlob { + // when encoding the `JavaSQLTypeBLOB`, use `ISO8859_1` decoder, now reverse it back. + encoder := charmap.ISO8859_1.NewEncoder() + value, err = encoder.String(value) + if err != nil { + log.Panic("invalid column value, please report a bug", zap.Any("col", col), zap.Error(err)) + } } col.Value = value diff --git a/pkg/sink/codec/internal/message_key.go b/pkg/sink/codec/internal/message_key.go index fe98aa5995d..6e37d3a72a5 100644 --- a/pkg/sink/codec/internal/message_key.go +++ b/pkg/sink/codec/internal/message_key.go @@ -28,6 +28,9 @@ type MessageKey struct { RowID int64 `json:"rid,omitempty"` Partition *int64 `json:"ptn,omitempty"` Type model.MessageType `json:"t"` + + // Only Handle Key Columns encoded in the message's value part. + OnlyHandleKey bool `json:"ohk,omitempty"` } // Encode encodes the message key to a byte slice. diff --git a/pkg/sink/codec/maxwell/maxwell_encoder.go b/pkg/sink/codec/maxwell/maxwell_encoder.go index 661fd5a30ff..1f175ac132e 100644 --- a/pkg/sink/codec/maxwell/maxwell_encoder.go +++ b/pkg/sink/codec/maxwell/maxwell_encoder.go @@ -49,7 +49,7 @@ func (d *BatchEncoder) AppendRowChangedEvent( e *model.RowChangedEvent, callback func(), ) error { - _, valueMsg := rowChangeToMaxwellMsg(e, d.config.OnlyHandleKeyColumns) + _, valueMsg := rowChangeToMaxwellMsg(e, d.config.DeleteOnlyHandleKeyColumns) value, err := valueMsg.encode() if err != nil { return errors.Trace(err) diff --git a/pkg/sink/codec/open/open_protocol_decoder.go b/pkg/sink/codec/open/open_protocol_decoder.go index 34f82adaf23..112d9fcd064 100644 --- a/pkg/sink/codec/open/open_protocol_decoder.go +++ b/pkg/sink/codec/open/open_protocol_decoder.go @@ -14,123 +14,95 @@ package open import ( + "context" + "database/sql" "encoding/binary" + "strings" "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/codec/internal" + "go.uber.org/zap" ) -// BatchMixedDecoder decodes the byte of a batch into the original messages. -type BatchMixedDecoder struct { - mixedBytes []byte - nextKey *internal.MessageKey - nextKeyLen uint64 -} +// BatchDecoder decodes the byte of a batch into the original messages. +type BatchDecoder struct { + keyBytes []byte + valueBytes []byte -// HasNext implements the RowEventDecoder interface -func (b *BatchMixedDecoder) HasNext() (model.MessageType, bool, error) { - if !b.hasNext() { - return 0, false, nil - } - if err := b.decodeNextKey(); err != nil { - return 0, false, err - } - return b.nextKey.Type, true, nil + nextKey *internal.MessageKey + nextEvent *model.RowChangedEvent + + upstreamTiDB *sql.DB } -// NextResolvedEvent implements the RowEventDecoder interface -func (b *BatchMixedDecoder) NextResolvedEvent() (uint64, error) { - if b.nextKey == nil { - if err := b.decodeNextKey(); err != nil { - return 0, err - } - } - b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MessageTypeResolved { - return 0, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found resolved event message") +// NewBatchDecoder creates a new BatchDecoder. +func NewBatchDecoder( + _ context.Context, config *common.Config, db *sql.DB) (codec.RowEventDecoder, error) { + if config.LargeMessageHandle.HandleKeyOnly() && db == nil { + return nil, cerror.ErrCodecDecode. + GenWithStack("handle-key-only is enabled, but upstream TiDB is not provided") } - valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) - b.mixedBytes = b.mixedBytes[valueLen+8:] - resolvedTs := b.nextKey.Ts - b.nextKey = nil - return resolvedTs, nil + + return &BatchDecoder{ + upstreamTiDB: db, + }, nil } -// NextRowChangedEvent implements the RowEventDecoder interface -func (b *BatchMixedDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { - if b.nextKey == nil { - if err := b.decodeNextKey(); err != nil { - return nil, err - } - } - b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MessageTypeRow { - return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found row event message") +// AddKeyValue implements the EventBatchDecoder interface +func (b *BatchDecoder) AddKeyValue(key, value []byte) error { + if len(b.keyBytes) != 0 || len(b.valueBytes) != 0 { + return cerror.ErrOpenProtocolCodecInvalidData. + GenWithStack("decoder key and value not nil") } - valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) - value := b.mixedBytes[8 : valueLen+8] - b.mixedBytes = b.mixedBytes[valueLen+8:] - rowMsg := new(messageRow) - if err := rowMsg.decode(value); err != nil { - return nil, errors.Trace(err) + version := binary.BigEndian.Uint64(key[:8]) + key = key[8:] + if version != codec.BatchVersion1 { + return cerror.ErrOpenProtocolCodecInvalidData. + GenWithStack("unexpected key format version") } - rowEvent := msgToRowChange(b.nextKey, rowMsg) - b.nextKey = nil - return rowEvent, nil + + b.keyBytes = key + b.valueBytes = value + + return nil } -// NextDDLEvent implements the RowEventDecoder interface -func (b *BatchMixedDecoder) NextDDLEvent() (*model.DDLEvent, error) { - if b.nextKey == nil { - if err := b.decodeNextKey(); err != nil { - return nil, err - } - } - b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MessageTypeDDL { - return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found ddl event message") +func (b *BatchDecoder) hasNext() bool { + keyLen := len(b.keyBytes) + valueLen := len(b.valueBytes) + + if keyLen > 0 && valueLen > 0 { + return true } - valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) - value := b.mixedBytes[8 : valueLen+8] - b.mixedBytes = b.mixedBytes[valueLen+8:] - ddlMsg := new(messageDDL) - if err := ddlMsg.decode(value); err != nil { - return nil, errors.Trace(err) + + if keyLen == 0 && valueLen != 0 || keyLen != 0 && valueLen == 0 { + log.Panic("open-protocol meet invalid data", + zap.Int("keyLen", keyLen), zap.Int("valueLen", valueLen)) } - ddlEvent := msgToDDLEvent(b.nextKey, ddlMsg) - b.nextKey = nil - return ddlEvent, nil -} -func (b *BatchMixedDecoder) hasNext() bool { - return len(b.mixedBytes) > 0 + return false } -func (b *BatchMixedDecoder) decodeNextKey() error { - keyLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) - key := b.mixedBytes[8 : keyLen+8] - // drop value bytes +func (b *BatchDecoder) decodeNextKey() error { + keyLen := binary.BigEndian.Uint64(b.keyBytes[:8]) + key := b.keyBytes[8 : keyLen+8] msgKey := new(internal.MessageKey) err := msgKey.Decode(key) if err != nil { return errors.Trace(err) } b.nextKey = msgKey - b.nextKeyLen = keyLen + b.keyBytes = b.keyBytes[keyLen+8:] return nil } -// BatchDecoder decodes the byte of a batch into the original messages. -type BatchDecoder struct { - keyBytes []byte - valueBytes []byte - nextKey *internal.MessageKey - nextKeyLen uint64 -} - // HasNext implements the RowEventDecoder interface func (b *BatchDecoder) HasNext() (model.MessageType, bool, error) { if !b.hasNext() { @@ -139,105 +111,157 @@ func (b *BatchDecoder) HasNext() (model.MessageType, bool, error) { if err := b.decodeNextKey(); err != nil { return 0, false, err } + + if b.nextKey.Type == model.MessageTypeRow { + valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) + value := b.valueBytes[8 : valueLen+8] + b.valueBytes = b.valueBytes[valueLen+8:] + + rowMsg := new(messageRow) + if err := rowMsg.decode(value); err != nil { + return b.nextKey.Type, false, errors.Trace(err) + } + b.nextEvent = msgToRowChange(b.nextKey, rowMsg) + } + return b.nextKey.Type, true, nil } // NextResolvedEvent implements the RowEventDecoder interface func (b *BatchDecoder) NextResolvedEvent() (uint64, error) { - if b.nextKey == nil { - if err := b.decodeNextKey(); err != nil { - return 0, err - } - } - b.keyBytes = b.keyBytes[b.nextKeyLen+8:] if b.nextKey.Type != model.MessageTypeResolved { return 0, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found resolved event message") } - valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) - b.valueBytes = b.valueBytes[valueLen+8:] resolvedTs := b.nextKey.Ts b.nextKey = nil + // resolved ts event's value part is empty, can be ignored. + b.valueBytes = nil return resolvedTs, nil } -// NextRowChangedEvent implements the RowEventDecoder interface -func (b *BatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { - if b.nextKey == nil { - if err := b.decodeNextKey(); err != nil { - return nil, err - } - } - b.keyBytes = b.keyBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MessageTypeRow { - return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found row event message") - } - valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) - value := b.valueBytes[8 : valueLen+8] - b.valueBytes = b.valueBytes[valueLen+8:] - rowMsg := new(messageRow) - if err := rowMsg.decode(value); err != nil { - return nil, errors.Trace(err) - } - rowEvent := msgToRowChange(b.nextKey, rowMsg) - b.nextKey = nil - return rowEvent, nil -} - // NextDDLEvent implements the RowEventDecoder interface func (b *BatchDecoder) NextDDLEvent() (*model.DDLEvent, error) { - if b.nextKey == nil { - if err := b.decodeNextKey(); err != nil { - return nil, err - } - } - b.keyBytes = b.keyBytes[b.nextKeyLen+8:] if b.nextKey.Type != model.MessageTypeDDL { return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found ddl event message") } valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) value := b.valueBytes[8 : valueLen+8] - b.valueBytes = b.valueBytes[valueLen+8:] ddlMsg := new(messageDDL) if err := ddlMsg.decode(value); err != nil { return nil, errors.Trace(err) } ddlEvent := msgToDDLEvent(b.nextKey, ddlMsg) b.nextKey = nil + b.valueBytes = nil return ddlEvent, nil } -func (b *BatchDecoder) hasNext() bool { - return len(b.keyBytes) > 0 && len(b.valueBytes) > 0 -} +// NextRowChangedEvent implements the EventBatchDecoder interface +func (b *BatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { + if b.nextKey.Type != model.MessageTypeRow { + return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found row event message") + } -func (b *BatchDecoder) decodeNextKey() error { - keyLen := binary.BigEndian.Uint64(b.keyBytes[:8]) - key := b.keyBytes[8 : keyLen+8] - msgKey := new(internal.MessageKey) - err := msgKey.Decode(key) - if err != nil { - return errors.Trace(err) + event := b.nextEvent + ctx := context.Background() + if b.nextKey.OnlyHandleKey { + var err error + event, err = b.assembleHandleKeyOnlyEvent(ctx, event) + if err != nil { + return nil, errors.Trace(err) + } } - b.nextKey = msgKey - b.nextKeyLen = keyLen - return nil + + b.nextKey = nil + return event, nil } -// NewBatchDecoder creates a new BatchDecoder. -func NewBatchDecoder(key []byte, value []byte) (codec.RowEventDecoder, error) { - version := binary.BigEndian.Uint64(key[:8]) - key = key[8:] - if version != codec.BatchVersion1 { - return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("unexpected key format version") +func (b *BatchDecoder) buildColumns( + holder *common.ColumnsHolder, handleKeyColumns map[string]interface{}, +) []*model.Column { + columnsCount := holder.Length() + columns := make([]*model.Column, 0, columnsCount) + for i := 0; i < columnsCount; i++ { + columnType := holder.Types[i] + name := columnType.Name() + mysqlType := types.StrToType(strings.ToLower(columnType.DatabaseTypeName())) + + var value interface{} + value = holder.Values[i].([]uint8) + + switch mysqlType { + case mysql.TypeJSON: + value = string(value.([]uint8)) + } + + column := &model.Column{ + Name: name, + Type: mysqlType, + Value: value, + } + + if _, ok := handleKeyColumns[name]; ok { + column.Flag = model.PrimaryKeyFlag | model.HandleKeyFlag + } + columns = append(columns, column) } - // if only decode one byte slice, we choose MixedDecoder - if len(key) > 0 && len(value) == 0 { - return &BatchMixedDecoder{ - mixedBytes: key, - }, nil + return columns +} + +func (b *BatchDecoder) assembleHandleKeyOnlyEvent( + ctx context.Context, handleKeyOnlyEvent *model.RowChangedEvent, +) (*model.RowChangedEvent, error) { + var ( + schema = handleKeyOnlyEvent.Table.Schema + table = handleKeyOnlyEvent.Table.Table + commitTs = handleKeyOnlyEvent.CommitTs + ) + + if handleKeyOnlyEvent.IsInsert() { + conditions := make(map[string]interface{}, len(handleKeyOnlyEvent.Columns)) + for _, col := range handleKeyOnlyEvent.Columns { + conditions[col.Name] = col.Value + } + holder, err := common.SnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, conditions) + if err != nil { + return nil, err + } + columns := b.buildColumns(holder, conditions) + handleKeyOnlyEvent.Columns = columns + } else if handleKeyOnlyEvent.IsDelete() { + conditions := make(map[string]interface{}, len(handleKeyOnlyEvent.PreColumns)) + for _, col := range handleKeyOnlyEvent.PreColumns { + conditions[col.Name] = col.Value + } + holder, err := common.SnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, conditions) + if err != nil { + return nil, err + } + preColumns := b.buildColumns(holder, conditions) + handleKeyOnlyEvent.PreColumns = preColumns + } else if handleKeyOnlyEvent.IsUpdate() { + conditions := make(map[string]interface{}, len(handleKeyOnlyEvent.Columns)) + for _, col := range handleKeyOnlyEvent.Columns { + conditions[col.Name] = col.Value + } + holder, err := common.SnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, conditions) + if err != nil { + return nil, err + } + columns := b.buildColumns(holder, conditions) + handleKeyOnlyEvent.Columns = columns + + conditions = make(map[string]interface{}, len(handleKeyOnlyEvent.PreColumns)) + for _, col := range handleKeyOnlyEvent.PreColumns { + conditions[col.Name] = col.Value + } + holder, err = common.SnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, conditions) + if err != nil { + return nil, err + } + preColumns := b.buildColumns(holder, conditions) + handleKeyOnlyEvent.PreColumns = preColumns } - return &BatchDecoder{ - keyBytes: key, - valueBytes: value, - }, nil + + return handleKeyOnlyEvent, nil } diff --git a/pkg/sink/codec/open/open_protocol_encoder.go b/pkg/sink/codec/open/open_protocol_encoder.go index 3c60c77ac64..abd35e18a24 100644 --- a/pkg/sink/codec/open/open_protocol_encoder.go +++ b/pkg/sink/codec/open/open_protocol_encoder.go @@ -37,6 +37,33 @@ type BatchEncoder struct { config *common.Config } +func (d *BatchEncoder) buildMessageOnlyHandleKeyColumns(e *model.RowChangedEvent) ([]byte, []byte, error) { + // set the `largeMessageOnlyHandleKeyColumns` to true to only encode handle key columns. + keyMsg, valueMsg := rowChangeToMsg(e, d.config, true) + key, err := keyMsg.Encode() + if err != nil { + return nil, nil, errors.Trace(err) + } + value, err := valueMsg.encode(d.config.OnlyOutputUpdatedColumns) + if err != nil { + return nil, nil, errors.Trace(err) + } + + // for single message that is longer than max-message-bytes + // 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead` + length := len(key) + len(value) + common.MaxRecordOverhead + 16 + 8 + if length > d.config.MaxMessageBytes { + log.Error("Single message is too large for open-protocol, only encode handle key columns", + zap.Int("maxMessageBytes", d.config.MaxMessageBytes), + zap.Int("length", length), + zap.Any("table", e.Table), + zap.Any("key", key)) + return nil, nil, cerror.ErrMessageTooLarge.GenWithStackByArgs() + } + + return key, value, nil +} + // AppendRowChangedEvent implements the RowEventEncoder interface func (d *BatchEncoder) AppendRowChangedEvent( _ context.Context, @@ -44,7 +71,7 @@ func (d *BatchEncoder) AppendRowChangedEvent( e *model.RowChangedEvent, callback func(), ) error { - keyMsg, valueMsg := rowChangeToMsg(e, d.config.OnlyHandleKeyColumns) + keyMsg, valueMsg := rowChangeToMsg(e, d.config, false) key, err := keyMsg.Encode() if err != nil { return errors.Trace(err) @@ -54,21 +81,22 @@ func (d *BatchEncoder) AppendRowChangedEvent( return errors.Trace(err) } - var keyLenByte [8]byte - binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key))) - var valueLenByte [8]byte - binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value))) - // for single message that is longer than max-message-bytes, do not send it. // 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead` length := len(key) + len(value) + common.MaxRecordOverhead + 16 + 8 if length > d.config.MaxMessageBytes { - log.Warn("Single message is too large for open-protocol", - zap.Int("maxMessageBytes", d.config.MaxMessageBytes), - zap.Int("length", length), - zap.Any("table", e.Table), - zap.Any("key", key)) - return cerror.ErrMessageTooLarge.GenWithStackByArgs() + if d.config.LargeMessageHandle.Disabled() { + log.Error("Single message is too large for open-protocol", + zap.Int("maxMessageBytes", d.config.MaxMessageBytes), + zap.Int("length", length), + zap.Any("table", e.Table), + zap.Any("key", key)) + return cerror.ErrMessageTooLarge.GenWithStackByArgs() + } + key, value, err = d.buildMessageOnlyHandleKeyColumns(e) + if err != nil { + return errors.Trace(err) + } } if len(d.messageBuf) == 0 || @@ -84,6 +112,13 @@ func (d *BatchEncoder) AppendRowChangedEvent( d.curBatchSize = 0 } + var ( + keyLenByte [8]byte + valueLenByte [8]byte + ) + binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key))) + binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value))) + message := d.messageBuf[len(d.messageBuf)-1] message.Key = append(message.Key, keyLenByte[:]...) message.Key = append(message.Key, key...) diff --git a/pkg/sink/codec/open/open_protocol_encoder_test.go b/pkg/sink/codec/open/open_protocol_encoder_test.go index aba2a2af731..118ed2402fb 100644 --- a/pkg/sink/codec/open/open_protocol_encoder_test.go +++ b/pkg/sink/codec/open/open_protocol_encoder_test.go @@ -15,16 +15,38 @@ package open import ( "context" + "database/sql" "testing" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/sink/codec" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/codec/internal" "github.com/stretchr/testify/require" ) +var ( + testEvent = &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{ + { + Name: "col1", + Type: mysql.TypeVarchar, + Value: []byte("aa"), + Flag: model.HandleKeyFlag, + }, + { + Name: "col2", + Type: mysql.TypeVarchar, + Value: []byte("bb"), + }, + }, + } +) + func TestBuildOpenProtocolBatchEncoder(t *testing.T) { t.Parallel() config := common.NewConfig(config.ProtocolOpen) @@ -78,30 +100,22 @@ func TestMaxMessageBytes(t *testing.T) { func TestMaxBatchSize(t *testing.T) { t.Parallel() - config := common.NewConfig(config.ProtocolOpen).WithMaxMessageBytes(1048576) - config.MaxBatchSize = 64 - encoder := NewBatchEncoderBuilder(config).Build() - - testEvent := &model.RowChangedEvent{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{ - Name: "col1", - Type: mysql.TypeVarchar, - Value: []byte("aa"), - }}, - } + codecConfig := common.NewConfig(config.ProtocolOpen).WithMaxMessageBytes(1048576) + codecConfig.MaxBatchSize = 64 + encoder := NewBatchEncoderBuilder(codecConfig).Build() for i := 0; i < 10000; i++ { err := encoder.AppendRowChangedEvent(context.Background(), "", testEvent, nil) - require.Nil(t, err) + require.NoError(t, err) } - messages := encoder.Build() + + decoder, err := NewBatchDecoder(context.Background(), codecConfig, nil) + require.NoError(t, err) sum := 0 for _, msg := range messages { - decoder, err := NewBatchDecoder(msg.Key, msg.Value) - require.Nil(t, err) + err := decoder.AddKeyValue(msg.Key, msg.Value) + require.NoError(t, err) count := 0 for { v, hasNext, err := decoder.HasNext() @@ -112,7 +126,7 @@ func TestMaxBatchSize(t *testing.T) { require.Equal(t, model.MessageTypeRow, v) _, err = decoder.NextRowChangedEvent() - require.Nil(t, err) + require.NoError(t, err) count++ } require.LessOrEqual(t, count, 64) @@ -201,8 +215,44 @@ func TestOpenProtocolAppendRowChangedEventWithCallback(t *testing.T) { } func TestOpenProtocolBatchCodec(t *testing.T) { - config := common.NewConfig(config.ProtocolOpen).WithMaxMessageBytes(8192) - config.MaxBatchSize = 64 + codecConfig := common.NewConfig(config.ProtocolOpen).WithMaxMessageBytes(8192) + codecConfig.LargeMessageHandle = config.NewDefaultLargeMessageHandleConfig() + codecConfig.MaxBatchSize = 64 tester := internal.NewDefaultBatchTester() - tester.TestBatchCodec(t, NewBatchEncoderBuilder(config), NewBatchDecoder) + tester.TestBatchCodec(t, NewBatchEncoderBuilder(codecConfig), + func(key []byte, value []byte) (codec.RowEventDecoder, error) { + decoder, err := NewBatchDecoder(context.Background(), codecConfig, nil) + require.NoError(t, err) + err = decoder.AddKeyValue(key, value) + return decoder, err + }) +} + +func TestAppendMessageOnlyHandleKeyColumns(t *testing.T) { + t.Parallel() + + ctx := context.Background() + topic := "" + + // cannot hold one message + a := 171 + codecConfig := common.NewConfig(config.ProtocolOpen).WithMaxMessageBytes(a) + codecConfig.LargeMessageHandle.LargeMessageHandleOption = config.LargeMessageHandleOptionHandleKeyOnly + encoder := NewBatchEncoderBuilder(codecConfig).Build() + + // only handle key is encoded into the message + err := encoder.AppendRowChangedEvent(ctx, topic, testEvent, func() {}) + require.NoError(t, err) + + message := encoder.Build()[0] + + decoder, err := NewBatchDecoder(context.Background(), codecConfig, &sql.DB{}) + require.NoError(t, err) + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + + batchDecoder := decoder.(*BatchDecoder) + err = batchDecoder.decodeNextKey() + require.NoError(t, err) + require.True(t, batchDecoder.nextKey.OnlyHandleKey) } diff --git a/pkg/sink/codec/open/open_protocol_message.go b/pkg/sink/codec/open/open_protocol_message.go index 0c5f356bbde..d4287ae0260 100644 --- a/pkg/sink/codec/open/open_protocol_message.go +++ b/pkg/sink/codec/open/open_protocol_message.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/codec/internal" ) @@ -94,25 +95,30 @@ func newResolvedMessage(ts uint64) *internal.MessageKey { } } -func rowChangeToMsg(e *model.RowChangedEvent, onlyHandleKeyColumns bool) (*internal.MessageKey, *messageRow) { +func rowChangeToMsg( + e *model.RowChangedEvent, + config *common.Config, + largeMessageOnlyHandleKeyColumns bool) (*internal.MessageKey, *messageRow) { var partition *int64 if e.Table.IsPartition { partition = &e.Table.TableID } key := &internal.MessageKey{ - Ts: e.CommitTs, - Schema: e.Table.Schema, - Table: e.Table.Table, - RowID: e.RowID, - Partition: partition, - Type: model.MessageTypeRow, + Ts: e.CommitTs, + Schema: e.Table.Schema, + Table: e.Table.Table, + RowID: e.RowID, + Partition: partition, + Type: model.MessageTypeRow, + OnlyHandleKey: largeMessageOnlyHandleKeyColumns, } value := &messageRow{} if e.IsDelete() { - value.Delete = rowChangeColumns2CodecColumns(e.PreColumns, onlyHandleKeyColumns) + handleKeyOnly := config.DeleteOnlyHandleKeyColumns || largeMessageOnlyHandleKeyColumns + value.Delete = rowChangeColumns2CodecColumns(e.PreColumns, handleKeyOnly) } else { - value.Update = rowChangeColumns2CodecColumns(e.Columns, false) - value.PreColumns = rowChangeColumns2CodecColumns(e.PreColumns, false) + value.Update = rowChangeColumns2CodecColumns(e.Columns, largeMessageOnlyHandleKeyColumns) + value.PreColumns = rowChangeColumns2CodecColumns(e.PreColumns, largeMessageOnlyHandleKeyColumns) } return key, value } diff --git a/pkg/sink/codec/open/open_protocol_message_test.go b/pkg/sink/codec/open/open_protocol_message_test.go index 945bb629f1f..0ebec297cff 100644 --- a/pkg/sink/codec/open/open_protocol_message_test.go +++ b/pkg/sink/codec/open/open_protocol_message_test.go @@ -19,6 +19,8 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/codec/internal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -174,9 +176,18 @@ func TestRowChanged2MsgOnlyHandleKeyColumns(t *testing.T) { {Name: "a", Type: mysql.TypeLonglong, Value: 1}, }, } - _, value := rowChangeToMsg(insertEvent, true) - _, ok := value.Update["a"] - require.True(t, ok) + codecConfig := common.NewConfig(config.ProtocolOpen) + codecConfig.DeleteOnlyHandleKeyColumns = true + + _, value := rowChangeToMsg(insertEvent, codecConfig, false) + require.Contains(t, value.Update, "id") + require.Contains(t, value.Update, "a") + + codecConfig.DeleteOnlyHandleKeyColumns = false + key, value := rowChangeToMsg(insertEvent, codecConfig, true) + require.True(t, key.OnlyHandleKey) + require.Contains(t, value.Update, "id") + require.NotContains(t, value.Update, "a") updateEvent := &model.RowChangedEvent{ CommitTs: 417318403368288260, @@ -193,9 +204,15 @@ func TestRowChanged2MsgOnlyHandleKeyColumns(t *testing.T) { {Name: "a", Type: mysql.TypeLonglong, Value: 1}, }, } - _, value = rowChangeToMsg(updateEvent, true) - _, ok = value.PreColumns["a"] - require.True(t, ok) + codecConfig.DeleteOnlyHandleKeyColumns = true + _, value = rowChangeToMsg(updateEvent, codecConfig, false) + require.Contains(t, value.PreColumns, "a") + + codecConfig.DeleteOnlyHandleKeyColumns = false + key, value = rowChangeToMsg(updateEvent, codecConfig, true) + require.True(t, key.OnlyHandleKey) + require.NotContains(t, value.PreColumns, "a") + require.NotContains(t, value.Update, "a") deleteEvent := &model.RowChangedEvent{ CommitTs: 417318403368288260, @@ -208,11 +225,17 @@ func TestRowChanged2MsgOnlyHandleKeyColumns(t *testing.T) { {Name: "a", Type: mysql.TypeLonglong, Value: 2}, }, } - _, value = rowChangeToMsg(deleteEvent, true) - _, ok = value.Delete["a"] - require.False(t, ok) - _, value = rowChangeToMsg(deleteEvent, false) - _, ok = value.Delete["a"] - require.True(t, ok) + codecConfig.DeleteOnlyHandleKeyColumns = true + _, value = rowChangeToMsg(deleteEvent, codecConfig, false) + require.NotContains(t, value.Delete, "a") + + codecConfig.DeleteOnlyHandleKeyColumns = false + _, value = rowChangeToMsg(deleteEvent, codecConfig, false) + require.Contains(t, value.Delete, "a") + + codecConfig.DeleteOnlyHandleKeyColumns = false + key, value = rowChangeToMsg(deleteEvent, codecConfig, true) + require.True(t, key.OnlyHandleKey) + require.NotContains(t, value.Delete, "a") } diff --git a/tests/integration_tests/canal_json_handle_key_only/conf/changefeed.toml b/tests/integration_tests/canal_json_handle_key_only/conf/changefeed.toml new file mode 100644 index 00000000000..7e22f266781 --- /dev/null +++ b/tests/integration_tests/canal_json_handle_key_only/conf/changefeed.toml @@ -0,0 +1,2 @@ +[sink.kafka-config.large-message-handle] +large-message-handle-option = "handle-key-only" \ No newline at end of file diff --git a/tests/integration_tests/canal_json_handle_key_only/conf/diff_config.toml b/tests/integration_tests/canal_json_handle_key_only/conf/diff_config.toml new file mode 100644 index 00000000000..4832c66334f --- /dev/null +++ b/tests/integration_tests/canal_json_handle_key_only/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/canal_json_handle_key_only/output" + +source-instances = ["mysql1"] + +target-instance = "tidb0" + +target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" \ No newline at end of file diff --git a/tests/integration_tests/canal_json_handle_key_only/data/data.sql b/tests/integration_tests/canal_json_handle_key_only/data/data.sql new file mode 100644 index 00000000000..44e401275c0 --- /dev/null +++ b/tests/integration_tests/canal_json_handle_key_only/data/data.sql @@ -0,0 +1,100 @@ +drop database if exists test; +create database test; +use test; + +create table t ( + id int primary key auto_increment, + + c_tinyint tinyint null, + c_smallint smallint null, + c_mediumint mediumint null, + c_int int null, + c_bigint bigint null, + + c_unsigned_tinyint tinyint unsigned null, + c_unsigned_smallint smallint unsigned null, + c_unsigned_mediumint mediumint unsigned null, + c_unsigned_int int unsigned null, + c_unsigned_bigint bigint unsigned null, + + c_float float null, + c_double double null, + c_decimal decimal null, + c_decimal_2 decimal(10, 4) null, + + c_unsigned_float float unsigned null, + c_unsigned_double double unsigned null, + c_unsigned_decimal decimal unsigned null, + c_unsigned_decimal_2 decimal(10, 4) unsigned null, + + c_date date null, + c_datetime datetime null, + c_timestamp timestamp null, + c_time time null, + c_year year null, + + c_tinytext tinytext null, + c_text text null, + c_mediumtext mediumtext null, + c_longtext longtext null, + + c_tinyblob tinyblob null, + c_blob blob null, + c_mediumblob mediumblob null, + c_longblob longblob null, + + c_char char(16) null, + c_varchar varchar(16) null, + c_binary binary(16) null, + c_varbinary varbinary(16) null, + + c_enum enum ('a','b','c') null, + c_set set ('a','b','c') null, + c_bit bit(64) null, + c_json json null +); + +insert into t values ( + 1, + 1, 2, 3, 4, 5, + 1, 2, 3, 4, 5, + 2020.0202, 2020.0303, 2020.0404, 2021.1208, + 3.1415, 2.7182, 8000, 179394.233, + '2020-02-20', '2020-02-20 02:20:20', '2020-02-20 02:20:20', '02:20:20', '2020', + '89504E470D0A1A0A', '89504E470D0A1A0A', '89504E470D0A1A0A', '89504E470D0A1A0A', + x'89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', + '89504E470D0A1A0A', '89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', + 'b', 'b,c', b'1000001', '{ + "key1": "value1", + "key2": "value2", + "key3": "123" + }' + ); + +update t set c_float = 3.1415, c_double = 2.7182, c_decimal = 8000, c_decimal_2 = 179394.233 where id = 1; + +delete from t where id = 1; + +insert into t values ( + 2, + 1, 2, 3, 4, 5, + 1, 2, 3, 4, 5, + 2020.0202, 2020.0303, 2020.0404, 2021.1208, + 3.1415, 2.7182, 8000, 179394.233, + '2020-02-20', '2020-02-20 02:20:20', '2020-02-20 02:20:20', '02:20:20', '2020', + '89504E470D0A1A0A', '89504E470D0A1A0A', '89504E470D0A1A0A', '89504E470D0A1A0A', + x'89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', + '89504E470D0A1A0A', '89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', + 'b', 'b,c', b'1000001', '{ + "key1": "value1", + "key2": "value2", + "key3": "123" + }' + ); + +update t set c_float = 3.1415, c_double = 2.7182, c_decimal = 8000, c_decimal_2 = 179394.233 where id = 2; + +create table finish_mark +( + id int PRIMARY KEY +); \ No newline at end of file diff --git a/tests/integration_tests/canal_json_handle_key_only/run.sh b/tests/integration_tests/canal_json_handle_key_only/run.sh new file mode 100644 index 00000000000..26c38fe96de --- /dev/null +++ b/tests/integration_tests/canal_json_handle_key_only/run.sh @@ -0,0 +1,48 @@ +#!/bin/bash + +set -e + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +# use kafka-consumer with canal-json decoder to sync data from kafka to mysql +function run() { + if [ "$SINK_TYPE" != "kafka" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + TOPIC_NAME="canal-json-handle-key-only" + + # record tso before we create tables to skip the system table DDLs + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true&max-message-bytes=2000&kafka-version=${KAFKA_VERSION}" + + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" + + cdc_kafka_consumer --upstream-uri $SINK_URI --downstream-uri="mysql://root@127.0.0.1:3306/?safe-mode=true&batch-dml-enable=false" --upstream-tidb-dsn="root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/?" --config="$CUR/conf/changefeed.toml" 2>&1 & + + run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + check_table_exists test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 200 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/open_protocol_handle_key_only/conf/changefeed.toml b/tests/integration_tests/open_protocol_handle_key_only/conf/changefeed.toml new file mode 100644 index 00000000000..7e22f266781 --- /dev/null +++ b/tests/integration_tests/open_protocol_handle_key_only/conf/changefeed.toml @@ -0,0 +1,2 @@ +[sink.kafka-config.large-message-handle] +large-message-handle-option = "handle-key-only" \ No newline at end of file diff --git a/tests/integration_tests/open_protocol_handle_key_only/conf/diff_config.toml b/tests/integration_tests/open_protocol_handle_key_only/conf/diff_config.toml new file mode 100644 index 00000000000..bc90175c3bb --- /dev/null +++ b/tests/integration_tests/open_protocol_handle_key_only/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/open_protocol_handle_key_only/output" + +source-instances = ["mysql1"] + +target-instance = "tidb0" + +target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" \ No newline at end of file diff --git a/tests/integration_tests/open_protocol_handle_key_only/data/data.sql b/tests/integration_tests/open_protocol_handle_key_only/data/data.sql new file mode 100644 index 00000000000..44e401275c0 --- /dev/null +++ b/tests/integration_tests/open_protocol_handle_key_only/data/data.sql @@ -0,0 +1,100 @@ +drop database if exists test; +create database test; +use test; + +create table t ( + id int primary key auto_increment, + + c_tinyint tinyint null, + c_smallint smallint null, + c_mediumint mediumint null, + c_int int null, + c_bigint bigint null, + + c_unsigned_tinyint tinyint unsigned null, + c_unsigned_smallint smallint unsigned null, + c_unsigned_mediumint mediumint unsigned null, + c_unsigned_int int unsigned null, + c_unsigned_bigint bigint unsigned null, + + c_float float null, + c_double double null, + c_decimal decimal null, + c_decimal_2 decimal(10, 4) null, + + c_unsigned_float float unsigned null, + c_unsigned_double double unsigned null, + c_unsigned_decimal decimal unsigned null, + c_unsigned_decimal_2 decimal(10, 4) unsigned null, + + c_date date null, + c_datetime datetime null, + c_timestamp timestamp null, + c_time time null, + c_year year null, + + c_tinytext tinytext null, + c_text text null, + c_mediumtext mediumtext null, + c_longtext longtext null, + + c_tinyblob tinyblob null, + c_blob blob null, + c_mediumblob mediumblob null, + c_longblob longblob null, + + c_char char(16) null, + c_varchar varchar(16) null, + c_binary binary(16) null, + c_varbinary varbinary(16) null, + + c_enum enum ('a','b','c') null, + c_set set ('a','b','c') null, + c_bit bit(64) null, + c_json json null +); + +insert into t values ( + 1, + 1, 2, 3, 4, 5, + 1, 2, 3, 4, 5, + 2020.0202, 2020.0303, 2020.0404, 2021.1208, + 3.1415, 2.7182, 8000, 179394.233, + '2020-02-20', '2020-02-20 02:20:20', '2020-02-20 02:20:20', '02:20:20', '2020', + '89504E470D0A1A0A', '89504E470D0A1A0A', '89504E470D0A1A0A', '89504E470D0A1A0A', + x'89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', + '89504E470D0A1A0A', '89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', + 'b', 'b,c', b'1000001', '{ + "key1": "value1", + "key2": "value2", + "key3": "123" + }' + ); + +update t set c_float = 3.1415, c_double = 2.7182, c_decimal = 8000, c_decimal_2 = 179394.233 where id = 1; + +delete from t where id = 1; + +insert into t values ( + 2, + 1, 2, 3, 4, 5, + 1, 2, 3, 4, 5, + 2020.0202, 2020.0303, 2020.0404, 2021.1208, + 3.1415, 2.7182, 8000, 179394.233, + '2020-02-20', '2020-02-20 02:20:20', '2020-02-20 02:20:20', '02:20:20', '2020', + '89504E470D0A1A0A', '89504E470D0A1A0A', '89504E470D0A1A0A', '89504E470D0A1A0A', + x'89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', + '89504E470D0A1A0A', '89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', + 'b', 'b,c', b'1000001', '{ + "key1": "value1", + "key2": "value2", + "key3": "123" + }' + ); + +update t set c_float = 3.1415, c_double = 2.7182, c_decimal = 8000, c_decimal_2 = 179394.233 where id = 2; + +create table finish_mark +( + id int PRIMARY KEY +); \ No newline at end of file diff --git a/tests/integration_tests/open_protocol_handle_key_only/run.sh b/tests/integration_tests/open_protocol_handle_key_only/run.sh new file mode 100644 index 00000000000..45ac96bce0e --- /dev/null +++ b/tests/integration_tests/open_protocol_handle_key_only/run.sh @@ -0,0 +1,48 @@ +#!/bin/bash + +set -e + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +# use kafka-consumer with open-protocol decoder to sync data from kafka to mysql +function run() { + if [ "$SINK_TYPE" != "kafka" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + TOPIC_NAME="open-protocol-handle-key-only" + + # record tso before we create tables to skip the system table DDLs + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&max-message-bytes=2000&kafka-version=${KAFKA_VERSION}" + + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" + + cdc_kafka_consumer --upstream-uri $SINK_URI --downstream-uri="mysql://root@127.0.0.1:3306/?safe-mode=true&batch-dml-enable=false" --upstream-tidb-dsn="root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/?" --config="$CUR/conf/changefeed.toml" 2>&1 & + + run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + check_table_exists test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 200 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 8689b0052c0..c692698acbf 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -15,7 +15,7 @@ mysql_only_http="http_api http_api_tls api_v2" mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table" kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume" -kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic multi_topics" +kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic multi_topics canal_json_handle_key_only open_protocol_handle_key_only" kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2" storage_only_csv="csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table"