Skip to content

Commit

Permalink
kafka(ticdc): event router allow hard code topics and set the schema …
Browse files Browse the repository at this point in the history
…optional in the topic expression (#9755) (#10356)

close #9763
  • Loading branch information
ti-chi-bot authored Feb 22, 2024
1 parent e833e43 commit f5e2fd9
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 63 deletions.
2 changes: 1 addition & 1 deletion cdc/sink/ddlsink/mq/kafka_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewKafkaDDLSink(
return nil, errors.Trace(err)
}

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, protocol)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
45 changes: 22 additions & 23 deletions cdc/sink/dmlsink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type EventRouter struct {
}

// NewEventRouter creates a new EventRouter.
func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string) (*EventRouter, error) {
func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string, protocol config.Protocol) (*EventRouter, error) {
// If an event does not match any dispatching rules in the config file,
// it will be dispatched by the default partition dispatcher and
// static topic dispatcher because it matches *.* rule.
Expand All @@ -103,7 +103,7 @@ func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string) (*EventRoute
}

d := getPartitionDispatcher(ruleConfig, cfg.EnableOldValue)
t, err := getTopicDispatcher(ruleConfig, defaultTopic, cfg.Sink.Protocol)
t, err := getTopicDispatcher(ruleConfig.TopicRule, defaultTopic, protocol)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -251,32 +251,31 @@ func getPartitionDispatcher(

// getTopicDispatcher returns the topic dispatcher for a specific topic rule (aka topic expression).
func getTopicDispatcher(
ruleConfig *config.DispatchRule, defaultTopic string, protocol string,
rule string, defaultTopic string, protocol config.Protocol,
) (topic.Dispatcher, error) {
if ruleConfig.TopicRule == "" {
if rule == "" {
return topic.NewStaticTopicDispatcher(defaultTopic), nil
}

// check if this rule is a valid topic expression
topicExpr := topic.Expression(ruleConfig.TopicRule)

if protocol != "" {
p, err := config.ParseSinkProtocolFromString(protocol)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
if topic.IsHardCode(rule) {
return topic.NewStaticTopicDispatcher(rule), nil
}

if p == config.ProtocolAvro {
err := topicExpr.ValidateForAvro()
if err != nil {
return nil, err
}
} else {
err := topicExpr.Validate()
if err != nil {
return nil, err
}
}
// check if this rule is a valid topic expression
topicExpr := topic.Expression(rule)
err := validateTopicExpression(topicExpr, protocol)
if err != nil {
return nil, err
}
return topic.NewDynamicTopicDispatcher(topicExpr), nil
}

func validateTopicExpression(expr topic.Expression, protocol config.Protocol) error {
switch protocol {
case config.ProtocolAvro:
return expr.ValidateForAvro()
default:
}

return expr.Validate()
}
82 changes: 58 additions & 24 deletions cdc/sink/dmlsink/mq/dispatcher/event_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,78 +24,112 @@ import (
"github.com/stretchr/testify/require"
)

func TestEventRouter(t *testing.T) {
t.Parallel()

d, err := NewEventRouter(config.GetDefaultReplicaConfig(), "test")
require.Nil(t, err)
require.Equal(t, "test", d.GetDefaultTopic())
topicDispatcher, partitionDispatcher := d.matchDispatcher("test", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)

d, err = NewEventRouter(&config.ReplicaConfig{
func newReplicaConfig4DispatcherTest() *config.ReplicaConfig {
return &config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
// rule-0
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
// rule-1
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
// rule-2
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
// rule-3
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
// rule-4
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
// rule-5
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
// rule-6: hard code the topic
{
Matcher: []string{"hard_code_schema.*"},
PartitionRule: "default",
TopicRule: "hard_code_topic",
},
},
},
}, "")
require.Nil(t, err)
topicDispatcher, partitionDispatcher = d.matchDispatcher("test", "table1")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher)
}
}

topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "table2")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.TsDispatcher{}, partitionDispatcher)
func TestEventRouter(t *testing.T) {
t.Parallel()

replicaConfig := config.GetDefaultReplicaConfig()
d, err := NewEventRouter(replicaConfig, "test", config.ProtocolCanalJSON)
require.NoError(t, err)
require.Equal(t, "test", d.GetDefaultTopic())

topicDispatcher, partitionDispatcher := d.matchDispatcher("test", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)

actual := topicDispatcher.Substitute("test", "test")
require.Equal(t, d.defaultTopic, actual)

replicaConfig = newReplicaConfig4DispatcherTest()
d, err = NewEventRouter(replicaConfig, "", config.ProtocolCanalJSON)
require.NoError(t, err)

// no matched, use the default
topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)

// match rule-0
topicDispatcher, partitionDispatcher = d.matchDispatcher("test_default1", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)

// match rule-1
topicDispatcher, partitionDispatcher = d.matchDispatcher("test_default2", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)

// match rule-2
topicDispatcher, partitionDispatcher = d.matchDispatcher("test_table", "test")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.TableDispatcher{}, partitionDispatcher)

// match rule-4
topicDispatcher, partitionDispatcher = d.matchDispatcher("test_index_value", "test")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher)

// match rule-4
topicDispatcher, partitionDispatcher = d.matchDispatcher("test", "table1")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher)

topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "table2")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.TsDispatcher{}, partitionDispatcher)

// match rule-6
topicDispatcher, partitionDispatcher = d.matchDispatcher("hard_code_schema", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)
}

func TestGetActiveTopics(t *testing.T) {
Expand Down Expand Up @@ -134,7 +168,7 @@ func TestGetActiveTopics(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)
names := []model.TableName{
{Schema: "test_default1", Table: "table"},
Expand Down Expand Up @@ -184,7 +218,7 @@ func TestGetTopicForRowChange(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

topicName := d.GetTopicForRowChange(&model.RowChangedEvent{
Expand Down Expand Up @@ -245,7 +279,7 @@ func TestGetPartitionForRowChange(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

p := d.GetPartitionForRowChange(&model.RowChangedEvent{
Expand Down Expand Up @@ -313,7 +347,7 @@ func TestGetDLLDispatchRuleByProtocol(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

tests := []struct {
Expand Down Expand Up @@ -374,7 +408,7 @@ func TestGetTopicForDDL(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

tests := []struct {
Expand Down
12 changes: 6 additions & 6 deletions cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,28 @@ type Dispatcher interface {
Substitute(schema, table string) string
}

// StaticTopicDispatcher is a topic dispatcher which dispatches rows and ddls to the default topic.
// StaticTopicDispatcher is a topic dispatcher which dispatches rows and DDL to the specific topic.
type StaticTopicDispatcher struct {
defaultTopic string
topic string
}

// NewStaticTopicDispatcher returns a StaticTopicDispatcher.
func NewStaticTopicDispatcher(defaultTopic string) *StaticTopicDispatcher {
return &StaticTopicDispatcher{
defaultTopic: defaultTopic,
topic: defaultTopic,
}
}

// Substitute converts schema/table name in a topic expression to kafka topic name.
func (s *StaticTopicDispatcher) Substitute(schema, table string) string {
return s.defaultTopic
return s.topic
}

func (s *StaticTopicDispatcher) String() string {
return s.defaultTopic
return s.topic
}

// DynamicTopicDispatcher is a topic dispatcher which dispatches rows and ddls
// DynamicTopicDispatcher is a topic dispatcher which dispatches rows and DDLs
// dynamically to the target topics.
type DynamicTopicDispatcher struct {
expression Expression
Expand Down
6 changes: 4 additions & 2 deletions cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func TestDynamicTopicDispatcherForSchema(t *testing.T) {

topicExpr := Expression("hello_{schema}_world")
err := topicExpr.Validate()
require.Nil(t, err)
require.NoError(t, err)

testCase := []struct {
schema string
table string
Expand Down Expand Up @@ -68,7 +69,8 @@ func TestDynamicTopicDispatcherForTable(t *testing.T) {

topicExpr := Expression("{schema}_{table}")
err := topicExpr.Validate()
require.Nil(t, err)
require.NoError(t, err)

testCases := []struct {
schema string
table string
Expand Down
17 changes: 13 additions & 4 deletions cdc/sink/dmlsink/mq/dispatcher/topic/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
)

var (
// hardCodeTopicNameRe is used to match a topic name which is hard code in the config
hardCodeTopicNameRe = regexp.MustCompile(`^([A-Za-z0-9\._\-]+)$`)

// topicNameRE is used to match a valid topic expression
topicNameRE = regexp.MustCompile(
`^[A-Za-z0-9\._\-]*\{schema\}([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`,
`^[A-Za-z0-9\._\-]*(\{schema\})?([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`,
)
// kafkaForbidRE is used to reject the characters which are forbidden in kafka topic name
kafkaForbidRE = regexp.MustCompile(`[^a-zA-Z0-9\._\-]`)
Expand All @@ -48,13 +51,14 @@ const kafkaTopicNameMaxLength = 249
type Expression string

// Validate checks whether a kafka topic name is valid or not.
// return true if the expression is hard coded.
func (e Expression) Validate() error {
// validate the topic expression
if ok := topicNameRE.MatchString(string(e)); !ok {
return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs()
if ok := topicNameRE.MatchString(string(e)); ok {
return nil
}

return nil
return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs()
}

// ValidateForAvro checks whether topic pattern is {schema}_{table}, the only allowed
Expand Down Expand Up @@ -94,3 +98,8 @@ func (e Expression) Substitute(schema, table string) string {
return topicName
}
}

// IsHardCode checks whether a topic name is hard code or not.
func IsHardCode(topicName string) bool {
return hardCodeTopicNameRe.MatchString(topicName)
}
24 changes: 24 additions & 0 deletions cdc/sink/dmlsink/mq/dispatcher/topic/expression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,30 @@ func TestSubstituteTopicExpression(t *testing.T) {
}
}

func TestSchemaOptional(t *testing.T) {
expression := "prefix_{table}"
topicExpr := Expression(expression)
err := topicExpr.Validate()
require.NoError(t, err)

schemaName := "test"
tableName := "table1"
topicName := topicExpr.Substitute(schemaName, tableName)
require.Equal(t, topicName, "prefix_table1")
}

func TestTableOptional(t *testing.T) {
expression := "prefix_{schema}"
topicExpr := Expression(expression)
err := topicExpr.Validate()
require.NoError(t, err)

schemaName := "test"
tableName := "abc"
topicName := topicExpr.Substitute(schemaName, tableName)
require.Equal(t, topicName, "prefix_test")
}

// cmd: go test -run='^$' -bench '^(BenchmarkSubstitute)$' github.com/pingcap/tiflow/cdc/sink/dispatcher/topic
// goos: linux
// goarch: amd64
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewKafkaDMLSink(
return nil, errors.Trace(err)
}

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, protocol)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit f5e2fd9

Please sign in to comment.