Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka(ticdc): event router allow hard code topics and set the schema optional in the topic expression (#9755) #10355

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fix tests.
  • Loading branch information
3AceShowHand committed Dec 26, 2023
commit 105cfbc48e46ad66afb6cb377714222d0ed5bbec
12 changes: 6 additions & 6 deletions cdc/sink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type EventRouter struct {
}

// NewEventRouter creates a new EventRouter.
func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string) (*EventRouter, error) {
func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string, protocol config.Protocol) (*EventRouter, error) {
// If an event does not match any dispatching rules in the config file,
// it will be dispatched by the default partition dispatcher and
// static topic dispatcher because it matches *.* rule.
Expand All @@ -103,7 +103,7 @@ func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string) (*EventRoute
}

d := getPartitionDispatcher(ruleConfig, cfg.EnableOldValue)
t, err := getTopicDispatcher(ruleConfig, defaultTopic, cfg.Sink.Protocol)
t, err := getTopicDispatcher(ruleConfig.TopicRule, defaultTopic, protocol)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -251,9 +251,9 @@ func getPartitionDispatcher(

// getTopicDispatcher returns the topic dispatcher for a specific topic rule (aka topic expression).
func getTopicDispatcher(
ruleConfig *config.DispatchRule, defaultTopic string, protocol string,
rule string, defaultTopic string, protocol config.Protocol,
) (topic.Dispatcher, error) {
if ruleConfig.TopicRule == "" {
if rule == "" {
return topic.NewStaticTopicDispatcher(defaultTopic), nil
}

Expand All @@ -263,14 +263,14 @@ func getTopicDispatcher(

// check if this rule is a valid topic expression
topicExpr := topic.Expression(rule)
err := validateTopicExpression(topicExpr, schema, protocol)
err := validateTopicExpression(topicExpr, protocol)
if err != nil {
return nil, err
}
return topic.NewDynamicTopicDispatcher(topicExpr), nil
}

func validateTopicExpression(expr topic.Expression, scheme string, protocol config.Protocol) error {
func validateTopicExpression(expr topic.Expression, protocol config.Protocol) error {
switch protocol {
case config.ProtocolAvro:
return expr.ValidateForAvro()
Expand Down
63 changes: 56 additions & 7 deletions cdc/sink/mq/dispatcher/event_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,60 @@ import (
"github.com/stretchr/testify/require"
)

func newReplicaConfig4DispatcherTest() *config.ReplicaConfig {
return &config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
// rule-0
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
// rule-1
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
// rule-2
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
// rule-3
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
// rule-4
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
// rule-5
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
// rule-6: hard code the topic
{
Matcher: []string{"hard_code_schema.*"},
PartitionRule: "default",
TopicRule: "hard_code_topic",
},
},
},
}
}

func TestEventRouter(t *testing.T) {
t.Parallel()

replicaConfig := config.GetDefaultReplicaConfig()
d, err := NewEventRouter(replicaConfig, "test")
d, err := NewEventRouter(replicaConfig, "test", config.ProtocolCanalJSON)
require.NoError(t, err)
require.Equal(t, "test", d.GetDefaultTopic())

Expand All @@ -40,7 +89,7 @@ func TestEventRouter(t *testing.T) {
require.Equal(t, d.defaultTopic, actual)

replicaConfig = newReplicaConfig4DispatcherTest()
d, err = NewEventRouter(replicaConfig, "")
d, err = NewEventRouter(replicaConfig, "", config.ProtocolCanalJSON)
require.NoError(t, err)

// no matched, use the default
Expand Down Expand Up @@ -119,7 +168,7 @@ func TestGetActiveTopics(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)
names := []model.TableName{
{Schema: "test_default1", Table: "table"},
Expand Down Expand Up @@ -169,7 +218,7 @@ func TestGetTopicForRowChange(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

topicName := d.GetTopicForRowChange(&model.RowChangedEvent{
Expand Down Expand Up @@ -230,7 +279,7 @@ func TestGetPartitionForRowChange(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

p := d.GetPartitionForRowChange(&model.RowChangedEvent{
Expand Down Expand Up @@ -298,7 +347,7 @@ func TestGetDLLDispatchRuleByProtocol(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

tests := []struct {
Expand Down Expand Up @@ -359,7 +408,7 @@ func TestGetTopicForDDL(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newMqSink(
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, encoderConfig.Protocol)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewKafkaDDLSink(
return nil, errors.Trace(err)
}

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, protocol)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/eventsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewKafkaDMLSink(
return nil, errors.Trace(err)
}

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, protocol)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func NewConsumer(ctx context.Context) (*Consumer, error) {
// rule, make sure decoded `RowChangedEvent` contains information
// identical to the CDC side.
if replicaConfig != nil {
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, kafkaTopic)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, kafkaTopic, protocol)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading