Skip to content

Commit

Permalink
fix tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 26, 2023
1 parent dd441cc commit 105cfbc
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 17 deletions.
12 changes: 6 additions & 6 deletions cdc/sink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type EventRouter struct {
}

// NewEventRouter creates a new EventRouter.
func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string) (*EventRouter, error) {
func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string, protocol config.Protocol) (*EventRouter, error) {
// If an event does not match any dispatching rules in the config file,
// it will be dispatched by the default partition dispatcher and
// static topic dispatcher because it matches *.* rule.
Expand All @@ -103,7 +103,7 @@ func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string) (*EventRoute
}

d := getPartitionDispatcher(ruleConfig, cfg.EnableOldValue)
t, err := getTopicDispatcher(ruleConfig, defaultTopic, cfg.Sink.Protocol)
t, err := getTopicDispatcher(ruleConfig.TopicRule, defaultTopic, protocol)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -251,9 +251,9 @@ func getPartitionDispatcher(

// getTopicDispatcher returns the topic dispatcher for a specific topic rule (aka topic expression).
func getTopicDispatcher(
ruleConfig *config.DispatchRule, defaultTopic string, protocol string,
rule string, defaultTopic string, protocol config.Protocol,
) (topic.Dispatcher, error) {
if ruleConfig.TopicRule == "" {
if rule == "" {
return topic.NewStaticTopicDispatcher(defaultTopic), nil
}

Expand All @@ -263,14 +263,14 @@ func getTopicDispatcher(

// check if this rule is a valid topic expression
topicExpr := topic.Expression(rule)
err := validateTopicExpression(topicExpr, schema, protocol)
err := validateTopicExpression(topicExpr, protocol)
if err != nil {
return nil, err
}
return topic.NewDynamicTopicDispatcher(topicExpr), nil
}

func validateTopicExpression(expr topic.Expression, scheme string, protocol config.Protocol) error {
func validateTopicExpression(expr topic.Expression, protocol config.Protocol) error {
switch protocol {
case config.ProtocolAvro:
return expr.ValidateForAvro()
Expand Down
63 changes: 56 additions & 7 deletions cdc/sink/mq/dispatcher/event_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,60 @@ import (
"github.com/stretchr/testify/require"
)

func newReplicaConfig4DispatcherTest() *config.ReplicaConfig {
return &config.ReplicaConfig{
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
// rule-0
{
Matcher: []string{"test_default1.*"},
PartitionRule: "default",
},
// rule-1
{
Matcher: []string{"test_default2.*"},
PartitionRule: "unknown-dispatcher",
},
// rule-2
{
Matcher: []string{"test_table.*"},
PartitionRule: "table",
TopicRule: "hello_{schema}_world",
},
// rule-3
{
Matcher: []string{"test_index_value.*"},
PartitionRule: "index-value",
TopicRule: "{schema}_world",
},
// rule-4
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
TopicRule: "hello_{schema}",
},
// rule-5
{
Matcher: []string{"*.*", "!*.test"},
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
// rule-6: hard code the topic
{
Matcher: []string{"hard_code_schema.*"},
PartitionRule: "default",
TopicRule: "hard_code_topic",
},
},
},
}
}

func TestEventRouter(t *testing.T) {
t.Parallel()

replicaConfig := config.GetDefaultReplicaConfig()
d, err := NewEventRouter(replicaConfig, "test")
d, err := NewEventRouter(replicaConfig, "test", config.ProtocolCanalJSON)
require.NoError(t, err)
require.Equal(t, "test", d.GetDefaultTopic())

Expand All @@ -40,7 +89,7 @@ func TestEventRouter(t *testing.T) {
require.Equal(t, d.defaultTopic, actual)

replicaConfig = newReplicaConfig4DispatcherTest()
d, err = NewEventRouter(replicaConfig, "")
d, err = NewEventRouter(replicaConfig, "", config.ProtocolCanalJSON)
require.NoError(t, err)

// no matched, use the default
Expand Down Expand Up @@ -119,7 +168,7 @@ func TestGetActiveTopics(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)
names := []model.TableName{
{Schema: "test_default1", Table: "table"},
Expand Down Expand Up @@ -169,7 +218,7 @@ func TestGetTopicForRowChange(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

topicName := d.GetTopicForRowChange(&model.RowChangedEvent{
Expand Down Expand Up @@ -230,7 +279,7 @@ func TestGetPartitionForRowChange(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

p := d.GetPartitionForRowChange(&model.RowChangedEvent{
Expand Down Expand Up @@ -298,7 +347,7 @@ func TestGetDLLDispatchRuleByProtocol(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

tests := []struct {
Expand Down Expand Up @@ -359,7 +408,7 @@ func TestGetTopicForDDL(t *testing.T) {
},
},
},
}, "test")
}, "test", config.ProtocolCanalJSON)
require.Nil(t, err)

tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newMqSink(
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, encoderConfig.Protocol)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewKafkaDDLSink(
return nil, errors.Trace(err)
}

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, protocol)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/eventsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewKafkaDMLSink(
return nil, errors.Trace(err)
}

eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, protocol)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func NewConsumer(ctx context.Context) (*Consumer, error) {
// rule, make sure decoded `RowChangedEvent` contains information
// identical to the CDC side.
if replicaConfig != nil {
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, kafkaTopic)
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, kafkaTopic, protocol)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down

0 comments on commit 105cfbc

Please sign in to comment.