diff --git a/cdc/sink/ddlsink/mq/kafka_ddl_sink.go b/cdc/sink/ddlsink/mq/kafka_ddl_sink.go index e38b5f07dfe..853d076fd4b 100644 --- a/cdc/sink/ddlsink/mq/kafka_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/kafka_ddl_sink.go @@ -86,7 +86,7 @@ func NewKafkaDDLSink( return nil, errors.Trace(err) } - eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic) + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, protocol) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/sink/dmlsink/mq/dispatcher/event_router.go b/cdc/sink/dmlsink/mq/dispatcher/event_router.go index 7e6146b5e10..5366f719e4e 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/event_router.go +++ b/cdc/sink/dmlsink/mq/dispatcher/event_router.go @@ -78,7 +78,7 @@ type EventRouter struct { } // NewEventRouter creates a new EventRouter. -func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string) (*EventRouter, error) { +func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string, protocol config.Protocol) (*EventRouter, error) { // If an event does not match any dispatching rules in the config file, // it will be dispatched by the default partition dispatcher and // static topic dispatcher because it matches *.* rule. @@ -103,7 +103,7 @@ func NewEventRouter(cfg *config.ReplicaConfig, defaultTopic string) (*EventRoute } d := getPartitionDispatcher(ruleConfig, cfg.EnableOldValue) - t, err := getTopicDispatcher(ruleConfig, defaultTopic, cfg.Sink.Protocol) + t, err := getTopicDispatcher(ruleConfig.TopicRule, defaultTopic, protocol) if err != nil { return nil, err } @@ -251,32 +251,31 @@ func getPartitionDispatcher( // getTopicDispatcher returns the topic dispatcher for a specific topic rule (aka topic expression). func getTopicDispatcher( - ruleConfig *config.DispatchRule, defaultTopic string, protocol string, + rule string, defaultTopic string, protocol config.Protocol, ) (topic.Dispatcher, error) { - if ruleConfig.TopicRule == "" { + if rule == "" { return topic.NewStaticTopicDispatcher(defaultTopic), nil } - // check if this rule is a valid topic expression - topicExpr := topic.Expression(ruleConfig.TopicRule) - - if protocol != "" { - p, err := config.ParseSinkProtocolFromString(protocol) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) - } + if topic.IsHardCode(rule) { + return topic.NewStaticTopicDispatcher(rule), nil + } - if p == config.ProtocolAvro { - err := topicExpr.ValidateForAvro() - if err != nil { - return nil, err - } - } else { - err := topicExpr.Validate() - if err != nil { - return nil, err - } - } + // check if this rule is a valid topic expression + topicExpr := topic.Expression(rule) + err := validateTopicExpression(topicExpr, protocol) + if err != nil { + return nil, err } return topic.NewDynamicTopicDispatcher(topicExpr), nil } + +func validateTopicExpression(expr topic.Expression, protocol config.Protocol) error { + switch protocol { + case config.ProtocolAvro: + return expr.ValidateForAvro() + default: + } + + return expr.Validate() +} diff --git a/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go b/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go index f06934138bd..5a63c17caf3 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/event_router_test.go @@ -24,78 +24,112 @@ import ( "github.com/stretchr/testify/require" ) -func TestEventRouter(t *testing.T) { - t.Parallel() - - d, err := NewEventRouter(config.GetDefaultReplicaConfig(), "test") - require.Nil(t, err) - require.Equal(t, "test", d.GetDefaultTopic()) - topicDispatcher, partitionDispatcher := d.matchDispatcher("test", "test") - require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher) - require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher) - - d, err = NewEventRouter(&config.ReplicaConfig{ +func newReplicaConfig4DispatcherTest() *config.ReplicaConfig { + return &config.ReplicaConfig{ Sink: &config.SinkConfig{ DispatchRules: []*config.DispatchRule{ + // rule-0 { Matcher: []string{"test_default1.*"}, PartitionRule: "default", }, + // rule-1 { Matcher: []string{"test_default2.*"}, PartitionRule: "unknown-dispatcher", }, + // rule-2 { Matcher: []string{"test_table.*"}, PartitionRule: "table", TopicRule: "hello_{schema}_world", }, + // rule-3 { Matcher: []string{"test_index_value.*"}, PartitionRule: "index-value", TopicRule: "{schema}_world", }, + // rule-4 { Matcher: []string{"test.*"}, PartitionRule: "rowid", TopicRule: "hello_{schema}", }, + // rule-5 { Matcher: []string{"*.*", "!*.test"}, PartitionRule: "ts", TopicRule: "{schema}_{table}", }, + // rule-6: hard code the topic + { + Matcher: []string{"hard_code_schema.*"}, + PartitionRule: "default", + TopicRule: "hard_code_topic", + }, }, }, - }, "") - require.Nil(t, err) - topicDispatcher, partitionDispatcher = d.matchDispatcher("test", "table1") - require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher) - require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher) + } +} - topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "table2") - require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher) - require.IsType(t, &partition.TsDispatcher{}, partitionDispatcher) +func TestEventRouter(t *testing.T) { + t.Parallel() + + replicaConfig := config.GetDefaultReplicaConfig() + d, err := NewEventRouter(replicaConfig, "test", config.ProtocolCanalJSON) + require.NoError(t, err) + require.Equal(t, "test", d.GetDefaultTopic()) + topicDispatcher, partitionDispatcher := d.matchDispatcher("test", "test") + require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher) + require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher) + + actual := topicDispatcher.Substitute("test", "test") + require.Equal(t, d.defaultTopic, actual) + + replicaConfig = newReplicaConfig4DispatcherTest() + d, err = NewEventRouter(replicaConfig, "", config.ProtocolCanalJSON) + require.NoError(t, err) + + // no matched, use the default topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "test") require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher) require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher) + // match rule-0 topicDispatcher, partitionDispatcher = d.matchDispatcher("test_default1", "test") require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher) require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher) + // match rule-1 topicDispatcher, partitionDispatcher = d.matchDispatcher("test_default2", "test") require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher) require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher) + // match rule-2 topicDispatcher, partitionDispatcher = d.matchDispatcher("test_table", "test") require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher) require.IsType(t, &partition.TableDispatcher{}, partitionDispatcher) + // match rule-4 topicDispatcher, partitionDispatcher = d.matchDispatcher("test_index_value", "test") require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher) require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher) + + // match rule-4 + topicDispatcher, partitionDispatcher = d.matchDispatcher("test", "table1") + require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher) + require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher) + + topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "table2") + require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher) + require.IsType(t, &partition.TsDispatcher{}, partitionDispatcher) + + // match rule-6 + topicDispatcher, partitionDispatcher = d.matchDispatcher("hard_code_schema", "test") + require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher) + require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher) } func TestGetActiveTopics(t *testing.T) { @@ -134,7 +168,7 @@ func TestGetActiveTopics(t *testing.T) { }, }, }, - }, "test") + }, "test", config.ProtocolCanalJSON) require.Nil(t, err) names := []model.TableName{ {Schema: "test_default1", Table: "table"}, @@ -184,7 +218,7 @@ func TestGetTopicForRowChange(t *testing.T) { }, }, }, - }, "test") + }, "test", config.ProtocolCanalJSON) require.Nil(t, err) topicName := d.GetTopicForRowChange(&model.RowChangedEvent{ @@ -245,7 +279,7 @@ func TestGetPartitionForRowChange(t *testing.T) { }, }, }, - }, "test") + }, "test", config.ProtocolCanalJSON) require.Nil(t, err) p := d.GetPartitionForRowChange(&model.RowChangedEvent{ @@ -313,7 +347,7 @@ func TestGetDLLDispatchRuleByProtocol(t *testing.T) { }, }, }, - }, "test") + }, "test", config.ProtocolCanalJSON) require.Nil(t, err) tests := []struct { @@ -374,7 +408,7 @@ func TestGetTopicForDDL(t *testing.T) { }, }, }, - }, "test") + }, "test", config.ProtocolCanalJSON) require.Nil(t, err) tests := []struct { diff --git a/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher.go b/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher.go index 90a543b2b12..4053222d517 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher.go +++ b/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher.go @@ -23,28 +23,28 @@ type Dispatcher interface { Substitute(schema, table string) string } -// StaticTopicDispatcher is a topic dispatcher which dispatches rows and ddls to the default topic. +// StaticTopicDispatcher is a topic dispatcher which dispatches rows and DDL to the specific topic. type StaticTopicDispatcher struct { - defaultTopic string + topic string } // NewStaticTopicDispatcher returns a StaticTopicDispatcher. func NewStaticTopicDispatcher(defaultTopic string) *StaticTopicDispatcher { return &StaticTopicDispatcher{ - defaultTopic: defaultTopic, + topic: defaultTopic, } } // Substitute converts schema/table name in a topic expression to kafka topic name. func (s *StaticTopicDispatcher) Substitute(schema, table string) string { - return s.defaultTopic + return s.topic } func (s *StaticTopicDispatcher) String() string { - return s.defaultTopic + return s.topic } -// DynamicTopicDispatcher is a topic dispatcher which dispatches rows and ddls +// DynamicTopicDispatcher is a topic dispatcher which dispatches rows and DDLs // dynamically to the target topics. type DynamicTopicDispatcher struct { expression Expression diff --git a/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher_test.go b/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher_test.go index 73974317f73..c387ac0e85a 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/topic/dispatcher_test.go @@ -29,7 +29,8 @@ func TestDynamicTopicDispatcherForSchema(t *testing.T) { topicExpr := Expression("hello_{schema}_world") err := topicExpr.Validate() - require.Nil(t, err) + require.NoError(t, err) + testCase := []struct { schema string table string @@ -68,7 +69,8 @@ func TestDynamicTopicDispatcherForTable(t *testing.T) { topicExpr := Expression("{schema}_{table}") err := topicExpr.Validate() - require.Nil(t, err) + require.NoError(t, err) + testCases := []struct { schema string table string diff --git a/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go b/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go index a6dde858a8d..e8f3e279e22 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go +++ b/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go @@ -20,9 +20,12 @@ import ( ) var ( + // hardCodeTopicNameRe is used to match a topic name which is hard code in the config + hardCodeTopicNameRe = regexp.MustCompile(`^([A-Za-z0-9\._\-]+)$`) + // topicNameRE is used to match a valid topic expression topicNameRE = regexp.MustCompile( - `^[A-Za-z0-9\._\-]*\{schema\}([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`, + `^[A-Za-z0-9\._\-]*(\{schema\})?([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`, ) // kafkaForbidRE is used to reject the characters which are forbidden in kafka topic name kafkaForbidRE = regexp.MustCompile(`[^a-zA-Z0-9\._\-]`) @@ -48,13 +51,14 @@ const kafkaTopicNameMaxLength = 249 type Expression string // Validate checks whether a kafka topic name is valid or not. +// return true if the expression is hard coded. func (e Expression) Validate() error { // validate the topic expression - if ok := topicNameRE.MatchString(string(e)); !ok { - return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs() + if ok := topicNameRE.MatchString(string(e)); ok { + return nil } - return nil + return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs() } // ValidateForAvro checks whether topic pattern is {schema}_{table}, the only allowed @@ -94,3 +98,8 @@ func (e Expression) Substitute(schema, table string) string { return topicName } } + +// IsHardCode checks whether a topic name is hard code or not. +func IsHardCode(topicName string) bool { + return hardCodeTopicNameRe.MatchString(topicName) +} diff --git a/cdc/sink/dmlsink/mq/dispatcher/topic/expression_test.go b/cdc/sink/dmlsink/mq/dispatcher/topic/expression_test.go index c02bdc7ffdf..070ae718d49 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/topic/expression_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/topic/expression_test.go @@ -227,6 +227,30 @@ func TestSubstituteTopicExpression(t *testing.T) { } } +func TestSchemaOptional(t *testing.T) { + expression := "prefix_{table}" + topicExpr := Expression(expression) + err := topicExpr.Validate() + require.NoError(t, err) + + schemaName := "test" + tableName := "table1" + topicName := topicExpr.Substitute(schemaName, tableName) + require.Equal(t, topicName, "prefix_table1") +} + +func TestTableOptional(t *testing.T) { + expression := "prefix_{schema}" + topicExpr := Expression(expression) + err := topicExpr.Validate() + require.NoError(t, err) + + schemaName := "test" + tableName := "abc" + topicName := topicExpr.Substitute(schemaName, tableName) + require.Equal(t, topicName, "prefix_test") +} + // cmd: go test -run='^$' -bench '^(BenchmarkSubstitute)$' github.com/pingcap/tiflow/cdc/sink/dispatcher/topic // goos: linux // goarch: amd64 diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go index 23aff7dfe3e..69075fe3f49 100644 --- a/cdc/sink/dmlsink/mq/kafka_dml_sink.go +++ b/cdc/sink/dmlsink/mq/kafka_dml_sink.go @@ -88,7 +88,7 @@ func NewKafkaDMLSink( return nil, errors.Trace(err) } - eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic) + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, protocol) if err != nil { return nil, errors.Trace(err) } diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 29da7cf23e7..797e4609c45 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -433,7 +433,7 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { // rule, make sure decoded `RowChangedEvent` contains information // identical to the CDC side. if replicaConfig != nil { - eventRouter, err := dispatcher.NewEventRouter(replicaConfig, kafkaTopic) + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, kafkaTopic, protocol) if err != nil { return nil, errors.Trace(err) } diff --git a/tests/integration_tests/multi_topics/conf/changefeed.toml b/tests/integration_tests/multi_topics/conf/changefeed.toml index 0f72c584c5f..2d5a3c83c8c 100644 --- a/tests/integration_tests/multi_topics/conf/changefeed.toml +++ b/tests/integration_tests/multi_topics/conf/changefeed.toml @@ -1,4 +1,5 @@ [sink] dispatchers = [ + { matcher = ['workload.*'], topic = "workload"}, { matcher = ['test.*'], topic = "{schema}_{table}" }, ] diff --git a/tests/integration_tests/multi_topics/data/step1.sql b/tests/integration_tests/multi_topics/data/step1.sql index fa18f2208ce..a33dbc81ccf 100644 --- a/tests/integration_tests/multi_topics/data/step1.sql +++ b/tests/integration_tests/multi_topics/data/step1.sql @@ -41,4 +41,50 @@ create table table3 ); insert into table3(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint) -values (1, 2, 3, 4, 5); \ No newline at end of file +values (1, 2, 3, 4, 5); + +create database workload; +use workload; + +create table table1 +( + id int auto_increment, + c_tinyint tinyint null, + c_smallint smallint null, + c_mediumint mediumint null, + c_int int null, + c_bigint bigint null, + constraint pk + primary key (id) +); +insert into table1(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint) +values (1, 2, 3, 4, 5); + +create table table2 +( + id int auto_increment, + c_tinyint tinyint null, + c_smallint smallint null, + c_mediumint mediumint null, + c_int int null, + c_bigint bigint null, + constraint pk + primary key (id) +); +insert into table2(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint) +values (1, 2, 3, 4, 5); + +create table table3 +( + id int auto_increment, + c_tinyint tinyint null, + c_smallint smallint null, + c_mediumint mediumint null, + c_int int null, + c_bigint bigint null, + constraint pk + primary key (id) +); + +insert into table3(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint) +values (1, 2, 3, 4, 5); diff --git a/tests/integration_tests/multi_topics/run.sh b/tests/integration_tests/multi_topics/run.sh index fa38b2f1037..99f8acf6458 100644 --- a/tests/integration_tests/multi_topics/run.sh +++ b/tests/integration_tests/multi_topics/run.sh @@ -36,6 +36,8 @@ function run() { run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/test_table${i}?protocol=canal-json&version=${KAFKA_VERSION}&enable-tidb-extension=true" "" ${i} done + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/workload?protocol=canal-json&enable-tidb-extension=true" "" + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first for i in $(seq 1 3); do check_table_exists test.table${i} ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300