From 278b7d27717ecfc1899853c78db1b03455b3f329 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Fri, 22 Sep 2023 10:55:44 +0800 Subject: [PATCH] This is an automated cherry-pick of #9755 Signed-off-by: ti-chi-bot --- cdc/sink/mq/dispatcher/event_router.go | 25 ++++++++ cdc/sink/mq/dispatcher/event_router_test.go | 64 +++++++++++++++++++ cdc/sink/mq/dispatcher/topic/dispatcher.go | 12 ++-- .../mq/dispatcher/topic/dispatcher_test.go | 6 +- cdc/sink/mq/dispatcher/topic/expression.go | 46 +++++++++++-- .../mq/dispatcher/topic/expression_test.go | 24 +++++++ .../multi_topics/conf/changefeed.toml | 1 + .../multi_topics/data/step1.sql | 48 +++++++++++++- tests/integration_tests/multi_topics/run.sh | 2 + 9 files changed, 215 insertions(+), 13 deletions(-) diff --git a/cdc/sink/mq/dispatcher/event_router.go b/cdc/sink/mq/dispatcher/event_router.go index 512a12ac65b..9a4cf046b2f 100644 --- a/cdc/sink/mq/dispatcher/event_router.go +++ b/cdc/sink/mq/dispatcher/event_router.go @@ -257,7 +257,12 @@ func getTopicDispatcher( return topic.NewStaticTopicDispatcher(defaultTopic), nil } + if topic.IsHardCode(rule) { + return topic.NewStaticTopicDispatcher(rule), nil + } + // check if this rule is a valid topic expression +<<<<<<< HEAD:cdc/sink/mq/dispatcher/event_router.go topicExpr := topic.Expression(ruleConfig.TopicRule) if protocol != "" { @@ -277,6 +282,26 @@ func getTopicDispatcher( return nil, err } } +======= + topicExpr := topic.Expression(rule) + err := validateTopicExpression(topicExpr, schema, protocol) + if err != nil { + return nil, err +>>>>>>> ef7a972df8 (kafka(ticdc): event router allow hard code topics and set the schema optional in the topic expression (#9755)):cdc/sink/dmlsink/mq/dispatcher/event_router.go } return topic.NewDynamicTopicDispatcher(topicExpr), nil } + +func validateTopicExpression(expr topic.Expression, scheme string, protocol config.Protocol) error { + if sink.IsPulsarScheme(scheme) { + return expr.PulsarValidate() + } + + switch protocol { + case config.ProtocolAvro: + return expr.ValidateForAvro() + default: + } + + return expr.Validate() +} diff --git a/cdc/sink/mq/dispatcher/event_router_test.go b/cdc/sink/mq/dispatcher/event_router_test.go index d022fe76a2d..5aa4746697d 100644 --- a/cdc/sink/mq/dispatcher/event_router_test.go +++ b/cdc/sink/mq/dispatcher/event_router_test.go @@ -65,10 +65,67 @@ func TestEventRouter(t *testing.T) { PartitionRule: "ts", TopicRule: "{schema}_{table}", }, + // rule-6: hard code the topic + { + Matcher: []string{"hard_code_schema.*"}, + PartitionRule: "default", + TopicRule: "hard_code_topic", + }, }, }, +<<<<<<< HEAD:cdc/sink/mq/dispatcher/event_router_test.go }, "") require.Nil(t, err) +======= + } +} + +func TestEventRouter(t *testing.T) { + t.Parallel() + + replicaConfig := config.GetDefaultReplicaConfig() + d, err := NewEventRouter(replicaConfig, config.ProtocolCanalJSON, "test", sink.KafkaScheme) + require.NoError(t, err) + require.Equal(t, "test", d.GetDefaultTopic()) + + topicDispatcher, partitionDispatcher := d.matchDispatcher("test", "test") + require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher) + require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher) + + actual := topicDispatcher.Substitute("test", "test") + require.Equal(t, d.defaultTopic, actual) + + replicaConfig = newReplicaConfig4DispatcherTest() + d, err = NewEventRouter(replicaConfig, config.ProtocolCanalJSON, "", sink.KafkaScheme) + require.NoError(t, err) + + // no matched, use the default + topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "test") + require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher) + require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher) + + // match rule-0 + topicDispatcher, partitionDispatcher = d.matchDispatcher("test_default1", "test") + require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher) + require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher) + + // match rule-1 + topicDispatcher, partitionDispatcher = d.matchDispatcher("test_default2", "test") + require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher) + require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher) + + // match rule-2 + topicDispatcher, partitionDispatcher = d.matchDispatcher("test_table", "test") + require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher) + require.IsType(t, &partition.TableDispatcher{}, partitionDispatcher) + + // match rule-4 + topicDispatcher, partitionDispatcher = d.matchDispatcher("test_index_value", "test") + require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher) + require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher) + + // match rule-4 +>>>>>>> ef7a972df8 (kafka(ticdc): event router allow hard code topics and set the schema optional in the topic expression (#9755)):cdc/sink/dmlsink/mq/dispatcher/event_router_test.go topicDispatcher, partitionDispatcher = d.matchDispatcher("test", "table1") require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher) require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher) @@ -77,6 +134,7 @@ func TestEventRouter(t *testing.T) { require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher) require.IsType(t, &partition.TsDispatcher{}, partitionDispatcher) +<<<<<<< HEAD:cdc/sink/mq/dispatcher/event_router_test.go topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "test") require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher) require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher) @@ -96,6 +154,12 @@ func TestEventRouter(t *testing.T) { topicDispatcher, partitionDispatcher = d.matchDispatcher("test_index_value", "test") require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher) require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher) +======= + // match rule-6 + topicDispatcher, partitionDispatcher = d.matchDispatcher("hard_code_schema", "test") + require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher) + require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher) +>>>>>>> ef7a972df8 (kafka(ticdc): event router allow hard code topics and set the schema optional in the topic expression (#9755)):cdc/sink/dmlsink/mq/dispatcher/event_router_test.go } func TestGetActiveTopics(t *testing.T) { diff --git a/cdc/sink/mq/dispatcher/topic/dispatcher.go b/cdc/sink/mq/dispatcher/topic/dispatcher.go index 90a543b2b12..4053222d517 100644 --- a/cdc/sink/mq/dispatcher/topic/dispatcher.go +++ b/cdc/sink/mq/dispatcher/topic/dispatcher.go @@ -23,28 +23,28 @@ type Dispatcher interface { Substitute(schema, table string) string } -// StaticTopicDispatcher is a topic dispatcher which dispatches rows and ddls to the default topic. +// StaticTopicDispatcher is a topic dispatcher which dispatches rows and DDL to the specific topic. type StaticTopicDispatcher struct { - defaultTopic string + topic string } // NewStaticTopicDispatcher returns a StaticTopicDispatcher. func NewStaticTopicDispatcher(defaultTopic string) *StaticTopicDispatcher { return &StaticTopicDispatcher{ - defaultTopic: defaultTopic, + topic: defaultTopic, } } // Substitute converts schema/table name in a topic expression to kafka topic name. func (s *StaticTopicDispatcher) Substitute(schema, table string) string { - return s.defaultTopic + return s.topic } func (s *StaticTopicDispatcher) String() string { - return s.defaultTopic + return s.topic } -// DynamicTopicDispatcher is a topic dispatcher which dispatches rows and ddls +// DynamicTopicDispatcher is a topic dispatcher which dispatches rows and DDLs // dynamically to the target topics. type DynamicTopicDispatcher struct { expression Expression diff --git a/cdc/sink/mq/dispatcher/topic/dispatcher_test.go b/cdc/sink/mq/dispatcher/topic/dispatcher_test.go index 73974317f73..c387ac0e85a 100644 --- a/cdc/sink/mq/dispatcher/topic/dispatcher_test.go +++ b/cdc/sink/mq/dispatcher/topic/dispatcher_test.go @@ -29,7 +29,8 @@ func TestDynamicTopicDispatcherForSchema(t *testing.T) { topicExpr := Expression("hello_{schema}_world") err := topicExpr.Validate() - require.Nil(t, err) + require.NoError(t, err) + testCase := []struct { schema string table string @@ -68,7 +69,8 @@ func TestDynamicTopicDispatcherForTable(t *testing.T) { topicExpr := Expression("{schema}_{table}") err := topicExpr.Validate() - require.Nil(t, err) + require.NoError(t, err) + testCases := []struct { schema string table string diff --git a/cdc/sink/mq/dispatcher/topic/expression.go b/cdc/sink/mq/dispatcher/topic/expression.go index a6dde858a8d..092941cba64 100644 --- a/cdc/sink/mq/dispatcher/topic/expression.go +++ b/cdc/sink/mq/dispatcher/topic/expression.go @@ -20,9 +20,12 @@ import ( ) var ( + // hardCodeTopicNameRe is used to match a topic name which is hard code in the config + hardCodeTopicNameRe = regexp.MustCompile(`^([A-Za-z0-9\._\-]+)$`) + // topicNameRE is used to match a valid topic expression topicNameRE = regexp.MustCompile( - `^[A-Za-z0-9\._\-]*\{schema\}([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`, + `^[A-Za-z0-9\._\-]*(\{schema\})?([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`, ) // kafkaForbidRE is used to reject the characters which are forbidden in kafka topic name kafkaForbidRE = regexp.MustCompile(`[^a-zA-Z0-9\._\-]`) @@ -48,13 +51,14 @@ const kafkaTopicNameMaxLength = 249 type Expression string // Validate checks whether a kafka topic name is valid or not. +// return true if the expression is hard coded. func (e Expression) Validate() error { // validate the topic expression - if ok := topicNameRE.MatchString(string(e)); !ok { - return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs() + if ok := topicNameRE.MatchString(string(e)); ok { + return nil } - return nil + return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs() } // ValidateForAvro checks whether topic pattern is {schema}_{table}, the only allowed @@ -94,3 +98,37 @@ func (e Expression) Substitute(schema, table string) string { return topicName } } +<<<<<<< HEAD:cdc/sink/mq/dispatcher/topic/expression.go +======= + +// PulsarValidate checks whether a pulsar topic name is valid or not. +func (e Expression) PulsarValidate() error { + // validate the topic expression + topicName := string(e) + + if len(topicName) == 0 { + return errors.ErrPulsarInvalidTopicExpression.GenWithStackByArgs( + "topic name is empty") + } + + // if not full name, must be simple name + if !pulsarTopicNameREFull.MatchString(topicName) { + if strings.Contains(topicName, "/") { + return errors.ErrPulsarInvalidTopicExpression.GenWithStackByArgs( + "it should be in the format of a and topic name must contain '{schema}'" + + "and simple topic name must not contain '/'") + } + } else if !pulsarTopicNameRE.MatchString(topicName) { + return errors.ErrPulsarInvalidTopicExpression.GenWithStackByArgs( + "it should be in the format of // or " + + "and topic name must contain '{schema}'") + } + + return nil +} + +// IsHardCode checks whether a topic name is hard code or not. +func IsHardCode(topicName string) bool { + return hardCodeTopicNameRe.MatchString(topicName) +} +>>>>>>> ef7a972df8 (kafka(ticdc): event router allow hard code topics and set the schema optional in the topic expression (#9755)):cdc/sink/dmlsink/mq/dispatcher/topic/expression.go diff --git a/cdc/sink/mq/dispatcher/topic/expression_test.go b/cdc/sink/mq/dispatcher/topic/expression_test.go index c02bdc7ffdf..070ae718d49 100644 --- a/cdc/sink/mq/dispatcher/topic/expression_test.go +++ b/cdc/sink/mq/dispatcher/topic/expression_test.go @@ -227,6 +227,30 @@ func TestSubstituteTopicExpression(t *testing.T) { } } +func TestSchemaOptional(t *testing.T) { + expression := "prefix_{table}" + topicExpr := Expression(expression) + err := topicExpr.Validate() + require.NoError(t, err) + + schemaName := "test" + tableName := "table1" + topicName := topicExpr.Substitute(schemaName, tableName) + require.Equal(t, topicName, "prefix_table1") +} + +func TestTableOptional(t *testing.T) { + expression := "prefix_{schema}" + topicExpr := Expression(expression) + err := topicExpr.Validate() + require.NoError(t, err) + + schemaName := "test" + tableName := "abc" + topicName := topicExpr.Substitute(schemaName, tableName) + require.Equal(t, topicName, "prefix_test") +} + // cmd: go test -run='^$' -bench '^(BenchmarkSubstitute)$' github.com/pingcap/tiflow/cdc/sink/dispatcher/topic // goos: linux // goarch: amd64 diff --git a/tests/integration_tests/multi_topics/conf/changefeed.toml b/tests/integration_tests/multi_topics/conf/changefeed.toml index 0f72c584c5f..2d5a3c83c8c 100644 --- a/tests/integration_tests/multi_topics/conf/changefeed.toml +++ b/tests/integration_tests/multi_topics/conf/changefeed.toml @@ -1,4 +1,5 @@ [sink] dispatchers = [ + { matcher = ['workload.*'], topic = "workload"}, { matcher = ['test.*'], topic = "{schema}_{table}" }, ] diff --git a/tests/integration_tests/multi_topics/data/step1.sql b/tests/integration_tests/multi_topics/data/step1.sql index fa18f2208ce..a33dbc81ccf 100644 --- a/tests/integration_tests/multi_topics/data/step1.sql +++ b/tests/integration_tests/multi_topics/data/step1.sql @@ -41,4 +41,50 @@ create table table3 ); insert into table3(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint) -values (1, 2, 3, 4, 5); \ No newline at end of file +values (1, 2, 3, 4, 5); + +create database workload; +use workload; + +create table table1 +( + id int auto_increment, + c_tinyint tinyint null, + c_smallint smallint null, + c_mediumint mediumint null, + c_int int null, + c_bigint bigint null, + constraint pk + primary key (id) +); +insert into table1(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint) +values (1, 2, 3, 4, 5); + +create table table2 +( + id int auto_increment, + c_tinyint tinyint null, + c_smallint smallint null, + c_mediumint mediumint null, + c_int int null, + c_bigint bigint null, + constraint pk + primary key (id) +); +insert into table2(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint) +values (1, 2, 3, 4, 5); + +create table table3 +( + id int auto_increment, + c_tinyint tinyint null, + c_smallint smallint null, + c_mediumint mediumint null, + c_int int null, + c_bigint bigint null, + constraint pk + primary key (id) +); + +insert into table3(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint) +values (1, 2, 3, 4, 5); diff --git a/tests/integration_tests/multi_topics/run.sh b/tests/integration_tests/multi_topics/run.sh index f7257f0fbcc..90f714c29d7 100644 --- a/tests/integration_tests/multi_topics/run.sh +++ b/tests/integration_tests/multi_topics/run.sh @@ -36,6 +36,8 @@ function run() { run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/test_table${i}?protocol=canal-json&version=${KAFKA_VERSION}&enable-tidb-extension=true" "" ${i} done + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/workload?protocol=canal-json&enable-tidb-extension=true" "" + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first for i in $(seq 1 3); do check_table_exists test.table${i} ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300