Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka(ticdc): event router allow hard code topics and set the schema optional in the topic expression (#9755) #10355

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 22 additions & 23 deletions cdc/sink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type EventRouter struct {
}

// NewEventRouter creates a new EventRouter.
func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string) (*EventRouter, error) {
func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string, protocol config.Protocol) (*EventRouter, error) {
// If an event does not match any dispatching rules in the config file,
// it will be dispatched by the default partition dispatcher and
// static topic dispatcher because it matches *.* rule.
Expand All @@ -103,7 +103,7 @@ func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string) (*EventRoute
}

d := getPartitionDispatcher(ruleConfig, cfg.EnableOldValue)
t, err := getTopicDispatcher(ruleConfig, defaultTopic, cfg.Sink.Protocol)
t, err := getTopicDispatcher(ruleConfig.TopicRule, defaultTopic, protocol)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -251,32 +251,31 @@ func getPartitionDispatcher(

// getTopicDispatcher returns the topic dispatcher for a specific topic rule (aka topic expression).
func getTopicDispatcher(
ruleConfig *config.DispatchRule, defaultTopic string, protocol string,
rule string, defaultTopic string, protocol config.Protocol,
) (topic.Dispatcher, error) {
if ruleConfig.TopicRule == "" {
if rule == "" {
return topic.NewStaticTopicDispatcher(defaultTopic), nil
}

// check if this rule is a valid topic expression
topicExpr := topic.Expression(ruleConfig.TopicRule)

if protocol != "" {
p, err := config.ParseSinkProtocolFromString(protocol)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
if topic.IsHardCode(rule) {
return topic.NewStaticTopicDispatcher(rule), nil
}

if p == config.ProtocolAvro {
err := topicExpr.ValidateForAvro()
if err != nil {
return nil, err
}
} else {
err := topicExpr.Validate()
if err != nil {
return nil, err
}
}
// check if this rule is a valid topic expression
topicExpr := topic.Expression(rule)
err := validateTopicExpression(topicExpr, protocol)
if err != nil {
return nil, err
}
return topic.NewDynamicTopicDispatcher(topicExpr), nil
}

func validateTopicExpression(expr topic.Expression, protocol config.Protocol) error {
switch protocol {
case config.ProtocolAvro:
return expr.ValidateForAvro()
default:
}

return expr.Validate()
}
82 changes: 58 additions & 24 deletions cdc/sink/mq/dispatcher/event_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,78 +24,112 @@ import (
"github.com/stretchr/testify/require"
)

func TestEventRouter(t *testing.T) {
t.Parallel()

d, err := NewEventRouter(config.GetDefaultReplicaConfig(), "test")
require.Nil(t, err)
require.Equal(t, "test", d.GetDefaultTopic())
topicDispatcher, partitionDispatcher := d.matchDispatcher("test", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)

d, err = NewEventRouter(&config.ReplicaConfig{
func newReplicaConfig4DispatcherTest() *config.ReplicaConfig {
return &config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
// rule-0
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
// rule-1
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
// rule-2
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
// rule-3
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
// rule-4
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
// rule-5
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
// rule-6: hard code the topic
{
Matcher: []string{"hard_code_schema.*"},
PartitionRule: "default",
TopicRule: "hard_code_topic",
},
},
},
}, "")
require.Nil(t, err)
topicDispatcher, partitionDispatcher = d.matchDispatcher("test", "table1")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher)
}
}

topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "table2")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.TsDispatcher{}, partitionDispatcher)
func TestEventRouter(t *testing.T) {
t.Parallel()

replicaConfig := config.GetDefaultReplicaConfig()
d, err := NewEventRouter(replicaConfig, "test", config.ProtocolCanalJSON)
require.NoError(t, err)
require.Equal(t, "test", d.GetDefaultTopic())

topicDispatcher, partitionDispatcher := d.matchDispatcher("test", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)

actual := topicDispatcher.Substitute("test", "test")
require.Equal(t, d.defaultTopic, actual)

replicaConfig = newReplicaConfig4DispatcherTest()
d, err = NewEventRouter(replicaConfig, "", config.ProtocolCanalJSON)
require.NoError(t, err)

// no matched, use the default
topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)

// match rule-0
topicDispatcher, partitionDispatcher = d.matchDispatcher("test_default1", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)

// match rule-1
topicDispatcher, partitionDispatcher = d.matchDispatcher("test_default2", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)

// match rule-2
topicDispatcher, partitionDispatcher = d.matchDispatcher("test_table", "test")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.TableDispatcher{}, partitionDispatcher)

// match rule-4
topicDispatcher, partitionDispatcher = d.matchDispatcher("test_index_value", "test")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher)

// match rule-4
topicDispatcher, partitionDispatcher = d.matchDispatcher("test", "table1")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher)

topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "table2")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.TsDispatcher{}, partitionDispatcher)

// match rule-6
topicDispatcher, partitionDispatcher = d.matchDispatcher("hard_code_schema", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)
}

func TestGetActiveTopics(t *testing.T) {
Expand Down Expand Up @@ -134,7 +168,7 @@ func TestGetActiveTopics(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)
names := []model.TableName{
{Schema: "test_default1", Table: "table"},
Expand Down Expand Up @@ -184,7 +218,7 @@ func TestGetTopicForRowChange(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

topicName := d.GetTopicForRowChange(&model.RowChangedEvent{
Expand Down Expand Up @@ -245,7 +279,7 @@ func TestGetPartitionForRowChange(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

p := d.GetPartitionForRowChange(&model.RowChangedEvent{
Expand Down Expand Up @@ -313,7 +347,7 @@ func TestGetDLLDispatchRuleByProtocol(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

tests := []struct {
Expand Down Expand Up @@ -374,7 +408,7 @@ func TestGetTopicForDDL(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

tests := []struct {
Expand Down
12 changes: 6 additions & 6 deletions cdc/sink/mq/dispatcher/topic/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,28 @@ type Dispatcher interface {
Substitute(schema, table string) string
}

// StaticTopicDispatcher is a topic dispatcher which dispatches rows and ddls to the default topic.
// StaticTopicDispatcher is a topic dispatcher which dispatches rows and DDL to the specific topic.
type StaticTopicDispatcher struct {
defaultTopic string
topic string
}

// NewStaticTopicDispatcher returns a StaticTopicDispatcher.
func NewStaticTopicDispatcher(defaultTopic string) *StaticTopicDispatcher {
return &StaticTopicDispatcher{
defaultTopic: defaultTopic,
topic: defaultTopic,
}
}

// Substitute converts schema/table name in a topic expression to kafka topic name.
func (s *StaticTopicDispatcher) Substitute(schema, table string) string {
return s.defaultTopic
return s.topic
}

func (s *StaticTopicDispatcher) String() string {
return s.defaultTopic
return s.topic
}

// DynamicTopicDispatcher is a topic dispatcher which dispatches rows and ddls
// DynamicTopicDispatcher is a topic dispatcher which dispatches rows and DDLs
// dynamically to the target topics.
type DynamicTopicDispatcher struct {
expression Expression
Expand Down
6 changes: 4 additions & 2 deletions cdc/sink/mq/dispatcher/topic/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func TestDynamicTopicDispatcherForSchema(t *testing.T) {

topicExpr := Expression("hello_{schema}_world")
err := topicExpr.Validate()
require.Nil(t, err)
require.NoError(t, err)

testCase := []struct {
schema string
table string
Expand Down Expand Up @@ -68,7 +69,8 @@ func TestDynamicTopicDispatcherForTable(t *testing.T) {

topicExpr := Expression("{schema}_{table}")
err := topicExpr.Validate()
require.Nil(t, err)
require.NoError(t, err)

testCases := []struct {
schema string
table string
Expand Down
17 changes: 13 additions & 4 deletions cdc/sink/mq/dispatcher/topic/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
)

var (
// hardCodeTopicNameRe is used to match a topic name which is hard code in the config
hardCodeTopicNameRe = regexp.MustCompile(`^([A-Za-z0-9\._\-]+)$`)

// topicNameRE is used to match a valid topic expression
topicNameRE = regexp.MustCompile(
`^[A-Za-z0-9\._\-]*\{schema\}([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`,
`^[A-Za-z0-9\._\-]*(\{schema\})?([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`,
)
// kafkaForbidRE is used to reject the characters which are forbidden in kafka topic name
kafkaForbidRE = regexp.MustCompile(`[^a-zA-Z0-9\._\-]`)
Expand All @@ -48,13 +51,14 @@ const kafkaTopicNameMaxLength = 249
type Expression string

// Validate checks whether a kafka topic name is valid or not.
// return true if the expression is hard coded.
func (e Expression) Validate() error {
// validate the topic expression
if ok := topicNameRE.MatchString(string(e)); !ok {
return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs()
if ok := topicNameRE.MatchString(string(e)); ok {
return nil
}

return nil
return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs()
}

// ValidateForAvro checks whether topic pattern is {schema}_{table}, the only allowed
Expand Down Expand Up @@ -94,3 +98,8 @@ func (e Expression) Substitute(schema, table string) string {
return topicName
}
}

// IsHardCode checks whether a topic name is hard code or not.
func IsHardCode(topicName string) bool {
return hardCodeTopicNameRe.MatchString(topicName)
}
24 changes: 24 additions & 0 deletions cdc/sink/mq/dispatcher/topic/expression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,30 @@ func TestSubstituteTopicExpression(t *testing.T) {
}
}

func TestSchemaOptional(t *testing.T) {
expression := "prefix_{table}"
topicExpr := Expression(expression)
err := topicExpr.Validate()
require.NoError(t, err)

schemaName := "test"
tableName := "table1"
topicName := topicExpr.Substitute(schemaName, tableName)
require.Equal(t, topicName, "prefix_table1")
}

func TestTableOptional(t *testing.T) {
expression := "prefix_{schema}"
topicExpr := Expression(expression)
err := topicExpr.Validate()
require.NoError(t, err)

schemaName := "test"
tableName := "abc"
topicName := topicExpr.Substitute(schemaName, tableName)
require.Equal(t, topicName, "prefix_test")
}

// cmd: go test -run='^$' -bench '^(BenchmarkSubstitute)$' github.com/pingcap/tiflow/cdc/sink/dispatcher/topic
// goos: linux
// goarch: amd64
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newMqSink(
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, encoderConfig.Protocol)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewKafkaDDLSink(
return nil, errors.Trace(err)
}

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, protocol)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading
Loading