Skip to content

Commit

Permalink
kafka(ticdc): large message handle key columns cherry-pick to release…
Browse files Browse the repository at this point in the history
…-7.1 (#9696)

close #9680
  • Loading branch information
3AceShowHand authored Sep 14, 2023
1 parent b9dcc64 commit f54f807
Show file tree
Hide file tree
Showing 43 changed files with 1,797 additions and 408 deletions.
28 changes: 28 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,15 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
AvroBigintUnsignedHandlingMode: oldConfig.AvroBigintUnsignedHandlingMode,
}
}

var largeMessageHandle *config.LargeMessageHandleConfig
if c.Sink.KafkaConfig.LargeMessageHandle != nil {
oldConfig := c.Sink.KafkaConfig.LargeMessageHandle
largeMessageHandle = &config.LargeMessageHandleConfig{
LargeMessageHandleOption: oldConfig.LargeMessageHandleOption,
}
}

kafkaConfig = &config.KafkaConfig{
PartitionNum: c.Sink.KafkaConfig.PartitionNum,
ReplicationFactor: c.Sink.KafkaConfig.ReplicationFactor,
Expand Down Expand Up @@ -320,6 +329,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
Key: c.Sink.KafkaConfig.Key,
InsecureSkipVerify: c.Sink.KafkaConfig.InsecureSkipVerify,
CodecConfig: codeConfig,
LargeMessageHandle: largeMessageHandle,
}
}
var mysqlConfig *config.MySQLConfig
Expand Down Expand Up @@ -491,6 +501,15 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
AvroBigintUnsignedHandlingMode: oldConfig.AvroBigintUnsignedHandlingMode,
}
}

var largeMessageHandle *LargeMessageHandleConfig
if cloned.Sink.KafkaConfig.LargeMessageHandle != nil {
oldConfig := cloned.Sink.KafkaConfig.LargeMessageHandle
largeMessageHandle = &LargeMessageHandleConfig{
LargeMessageHandleOption: oldConfig.LargeMessageHandleOption,
}
}

kafkaConfig = &KafkaConfig{
PartitionNum: cloned.Sink.KafkaConfig.PartitionNum,
ReplicationFactor: cloned.Sink.KafkaConfig.ReplicationFactor,
Expand Down Expand Up @@ -526,6 +545,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
Key: cloned.Sink.KafkaConfig.Key,
InsecureSkipVerify: cloned.Sink.KafkaConfig.InsecureSkipVerify,
CodecConfig: codeConfig,
LargeMessageHandle: largeMessageHandle,
}
}
var mysqlConfig *MySQLConfig
Expand Down Expand Up @@ -951,6 +971,8 @@ type KafkaConfig struct {
Key *string `json:"key,omitempty"`
InsecureSkipVerify *bool `json:"insecure_skip_verify,omitempty"`
CodecConfig *CodecConfig `json:"codec_config,omitempty"`

LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
}

// MySQLConfig represents a MySQL sink configuration
Expand Down Expand Up @@ -987,3 +1009,9 @@ type ChangefeedStatus struct {
LastError *RunningError `json:"last_error,omitempty"`
LastWarning *RunningError `json:"last_warning,omitempty"`
}

// LargeMessageHandleConfig denotes the large message handling config
// This is the same as config.LargeMessageHandleConfig
type LargeMessageHandleConfig struct {
LargeMessageHandleOption string `json:"large_message_handle_option"`
}
106 changes: 81 additions & 25 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package main

import (
"context"
"database/sql"
"flag"
"fmt"
"math"
Expand Down Expand Up @@ -48,6 +49,7 @@ import (
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/canal"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/codec/open"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/util"
Expand All @@ -71,14 +73,17 @@ var (
protocol config.Protocol
enableTiDBExtension bool

// eventRouterReplicaConfig only used to initialize the consumer's eventRouter
// replicaConfig only used to initialize the consumer's eventRouter
// which then can be used to check RowChangedEvent dispatched correctness
eventRouterReplicaConfig *config.ReplicaConfig
replicaConfig *config.ReplicaConfig

logPath string
logLevel string
timezone string
ca, cert, key string

// upstreamTiDBDSN is the dsn of the upstream TiDB cluster
upstreamTiDBDSN string
)

func init() {
Expand All @@ -90,6 +95,7 @@ func init() {

flag.StringVar(&upstreamURIStr, "upstream-uri", "", "Kafka uri")
flag.StringVar(&downstreamURIStr, "downstream-uri", "", "downstream sink uri")
flag.StringVar(&upstreamTiDBDSN, "upstream-tidb-dsn", "", "upstream TiDB DSN")
flag.StringVar(&configFile, "config", "", "config file for changefeed")
flag.StringVar(&logPath, "log-file", "cdc_kafka_consumer.log", "log file path")
flag.StringVar(&logLevel, "log-level", "info", "log file path")
Expand Down Expand Up @@ -195,15 +201,15 @@ func init() {
}

if configFile != "" {
eventRouterReplicaConfig = config.GetDefaultReplicaConfig()
eventRouterReplicaConfig.Sink.Protocol = protocol.String()
err := cmdUtil.StrictDecodeFile(configFile, "kafka consumer", eventRouterReplicaConfig)
replicaConfig = config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = protocol.String()
err := cmdUtil.StrictDecodeFile(configFile, "kafka consumer", replicaConfig)
if err != nil {
log.Panic("invalid config file for kafka consumer",
zap.Error(err),
zap.String("config", configFile))
}
if _, err := filter.VerifyTableRules(eventRouterReplicaConfig.Filter); err != nil {
if _, err := filter.VerifyTableRules(replicaConfig.Filter); err != nil {
log.Panic("verify rule failed", zap.Error(err))
}
}
Expand Down Expand Up @@ -380,10 +386,13 @@ type Consumer struct {
// initialize to 0 by default
globalResolvedTs uint64

protocol config.Protocol
enableTiDBExtension bool
protocol config.Protocol

eventRouter *dispatcher.EventRouter

codecConfig *common.Config

upstreamTiDB *sql.DB
}

// NewConsumer creates a new cdc kafka consumer
Expand All @@ -400,7 +409,20 @@ func NewConsumer(ctx context.Context) (*Consumer, error) {
tableIDs: make(map[string]int64),
}
c.protocol = protocol
c.enableTiDBExtension = enableTiDBExtension

c.codecConfig = common.NewConfig(c.protocol)
c.codecConfig.EnableTiDBExtension = enableTiDBExtension
if replicaConfig != nil && replicaConfig.Sink != nil && replicaConfig.Sink.KafkaConfig != nil {
c.codecConfig.LargeMessageHandle = replicaConfig.Sink.KafkaConfig.LargeMessageHandle
}

if c.codecConfig.LargeMessageHandle.HandleKeyOnly() {
db, err := openDB(ctx, upstreamTiDBDSN)
if err != nil {
return nil, err
}
c.upstreamTiDB = db
}

// this means user has input config file to enable dispatcher check
// some protocol does not provide enough information to check the
Expand All @@ -410,8 +432,8 @@ func NewConsumer(ctx context.Context) (*Consumer, error) {
// when try to enable dispatcher check for any protocol and dispatch
// rule, make sure decoded `RowChangedEvent` contains information
// identical to the CDC side.
if eventRouterReplicaConfig != nil {
eventRouter, err := dispatcher.NewEventRouter(eventRouterReplicaConfig, kafkaTopic)
if replicaConfig != nil {
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, kafkaTopic)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -514,24 +536,37 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
panic("sink should initialized")
}

ctx := context.Background()
var (
decoder codec.RowEventDecoder
err error
)

switch c.protocol {
case config.ProtocolOpen, config.ProtocolDefault:
decoder, err = open.NewBatchDecoder(ctx, c.codecConfig, c.upstreamTiDB)
case config.ProtocolCanalJSON:
decoder, err = canal.NewBatchDecoder(ctx, c.codecConfig, c.upstreamTiDB)
if err != nil {
return err
}
default:
log.Panic("Protocol not supported", zap.Any("Protocol", c.protocol))
}
if err != nil {
return errors.Trace(err)
}

log.Info("start consume claim",
zap.String("topic", claim.Topic()), zap.Int32("partition", partition),
zap.Int64("initialOffset", claim.InitialOffset()), zap.Int64("highWaterMarkOffset", claim.HighWaterMarkOffset()))

eventGroups := make(map[int64]*eventsGroup)
for message := range claim.Messages() {
var (
decoder codec.RowEventDecoder
err error
)
switch c.protocol {
case config.ProtocolOpen, config.ProtocolDefault:
decoder, err = open.NewBatchDecoder(message.Key, message.Value)
case config.ProtocolCanalJSON:
decoder = canal.NewBatchDecoder(message.Value, c.enableTiDBExtension, "")
default:
log.Panic("Protocol not supported", zap.Any("Protocol", c.protocol))
}
if err != nil {
if err := decoder.AddKeyValue(message.Key, message.Value); err != nil {
log.Error("add key value to the decoder failed", zap.Error(err))
return errors.Trace(err)
}

counter := 0
for {
tp, hasNext, err := decoder.HasNext()
Expand Down Expand Up @@ -841,3 +876,24 @@ func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partiti
g.tableIDs[key] = g.currentTableID
return g.currentTableID
}

func openDB(ctx context.Context, dsn string) (*sql.DB, error) {
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Error("open db failed", zap.String("dsn", dsn), zap.Error(err))
return nil, errors.Trace(err)
}

db.SetMaxOpenConns(10)
db.SetMaxIdleConns(10)
db.SetConnMaxLifetime(10 * time.Minute)

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err = db.PingContext(ctx); err != nil {
log.Error("ping db failed", zap.String("dsn", dsn), zap.Error(err))
return nil, errors.Trace(err)
}
log.Info("open db success", zap.String("dsn", dsn))
return db, nil
}
11 changes: 10 additions & 1 deletion cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,16 @@ func (c *consumer) emitDMLEvents(
case config.ProtocolCanalJSON:
// Always enable tidb extension for canal-json protocol
// because we need to get the commit ts from the extension field.
decoder = canal.NewBatchDecoder(content, true, c.codecCfg.Terminator)
c.codecCfg.EnableTiDBExtension = true
decoder, err = canal.NewBatchDecoder(ctx, c.codecCfg, nil)
if err != nil {
return errors.Trace(err)
}

err = decoder.AddKeyValue(nil, content)
if err != nil {
return errors.Trace(err)
}
}

cnt := 0
Expand Down
22 changes: 22 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,9 @@ var doc = `{
"key": {
"type": "string"
},
"large-message-handle": {
"$ref": "#/definitions/config.LargeMessageHandleConfig"
},
"max-message-bytes": {
"type": "integer"
},
Expand Down Expand Up @@ -1508,6 +1511,14 @@ var doc = `{
}
}
},
"config.LargeMessageHandleConfig": {
"type": "object",
"properties": {
"large-message-handle-option": {
"type": "string"
}
}
},
"config.MySQLConfig": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2319,6 +2330,9 @@ var doc = `{
"key": {
"type": "string"
},
"large_message_handle": {
"$ref": "#/definitions/v2.LargeMessageHandleConfig"
},
"max_message_bytes": {
"type": "integer"
},
Expand Down Expand Up @@ -2393,6 +2407,14 @@ var doc = `{
}
}
},
"v2.LargeMessageHandleConfig": {
"type": "object",
"properties": {
"large_message_handle_option": {
"type": "string"
}
}
},
"v2.LogLevelReq": {
"type": "object",
"properties": {
Expand Down
22 changes: 22 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,9 @@
"key": {
"type": "string"
},
"large-message-handle": {
"$ref": "#/definitions/config.LargeMessageHandleConfig"
},
"max-message-bytes": {
"type": "integer"
},
Expand Down Expand Up @@ -1489,6 +1492,14 @@
}
}
},
"config.LargeMessageHandleConfig": {
"type": "object",
"properties": {
"large-message-handle-option": {
"type": "string"
}
}
},
"config.MySQLConfig": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2300,6 +2311,9 @@
"key": {
"type": "string"
},
"large_message_handle": {
"$ref": "#/definitions/v2.LargeMessageHandleConfig"
},
"max_message_bytes": {
"type": "integer"
},
Expand Down Expand Up @@ -2374,6 +2388,14 @@
}
}
},
"v2.LargeMessageHandleConfig": {
"type": "object",
"properties": {
"large_message_handle_option": {
"type": "string"
}
}
},
"v2.LogLevelReq": {
"type": "object",
"properties": {
Expand Down
Loading

0 comments on commit f54f807

Please sign in to comment.