Skip to content

Commit

Permalink
This is an automated cherry-pick of #9755
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Dec 24, 2023
1 parent ef65442 commit 278b7d2
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 13 deletions.
25 changes: 25 additions & 0 deletions cdc/sink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,12 @@ func getTopicDispatcher(
return topic.NewStaticTopicDispatcher(defaultTopic), nil
}

if topic.IsHardCode(rule) {
return topic.NewStaticTopicDispatcher(rule), nil
}

// check if this rule is a valid topic expression
<<<<<<< HEAD:cdc/sink/mq/dispatcher/event_router.go
topicExpr := topic.Expression(ruleConfig.TopicRule)

if protocol != "" {
Expand All @@ -277,6 +282,26 @@ func getTopicDispatcher(
return nil, err
}
}
=======
topicExpr := topic.Expression(rule)
err := validateTopicExpression(topicExpr, schema, protocol)
if err != nil {
return nil, err
>>>>>>> ef7a972df8 (kafka(ticdc): event router allow hard code topics and set the schema optional in the topic expression (#9755)):cdc/sink/dmlsink/mq/dispatcher/event_router.go
}
return topic.NewDynamicTopicDispatcher(topicExpr), nil
}

func validateTopicExpression(expr topic.Expression, scheme string, protocol config.Protocol) error {
if sink.IsPulsarScheme(scheme) {
return expr.PulsarValidate()
}

switch protocol {
case config.ProtocolAvro:
return expr.ValidateForAvro()
default:
}

return expr.Validate()
}
64 changes: 64 additions & 0 deletions cdc/sink/mq/dispatcher/event_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,67 @@ func TestEventRouter(t *testing.T) {
PartitionRule: "ts",
TopicRule: "{schema}_{table}",
},
// rule-6: hard code the topic
{
Matcher: []string{"hard_code_schema.*"},
PartitionRule: "default",
TopicRule: "hard_code_topic",
},
},
},
<<<<<<< HEAD:cdc/sink/mq/dispatcher/event_router_test.go
}, "")
require.Nil(t, err)
=======
}
}

func TestEventRouter(t *testing.T) {
t.Parallel()

replicaConfig := config.GetDefaultReplicaConfig()
d, err := NewEventRouter(replicaConfig, config.ProtocolCanalJSON, "test", sink.KafkaScheme)
require.NoError(t, err)
require.Equal(t, "test", d.GetDefaultTopic())

topicDispatcher, partitionDispatcher := d.matchDispatcher("test", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)

actual := topicDispatcher.Substitute("test", "test")
require.Equal(t, d.defaultTopic, actual)

replicaConfig = newReplicaConfig4DispatcherTest()
d, err = NewEventRouter(replicaConfig, config.ProtocolCanalJSON, "", sink.KafkaScheme)
require.NoError(t, err)

// no matched, use the default
topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)

// match rule-0
topicDispatcher, partitionDispatcher = d.matchDispatcher("test_default1", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)

// match rule-1
topicDispatcher, partitionDispatcher = d.matchDispatcher("test_default2", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)

// match rule-2
topicDispatcher, partitionDispatcher = d.matchDispatcher("test_table", "test")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.TableDispatcher{}, partitionDispatcher)

// match rule-4
topicDispatcher, partitionDispatcher = d.matchDispatcher("test_index_value", "test")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher)

// match rule-4
>>>>>>> ef7a972df8 (kafka(ticdc): event router allow hard code topics and set the schema optional in the topic expression (#9755)):cdc/sink/dmlsink/mq/dispatcher/event_router_test.go
topicDispatcher, partitionDispatcher = d.matchDispatcher("test", "table1")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher)
Expand All @@ -77,6 +134,7 @@ func TestEventRouter(t *testing.T) {
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.TsDispatcher{}, partitionDispatcher)

<<<<<<< HEAD:cdc/sink/mq/dispatcher/event_router_test.go
topicDispatcher, partitionDispatcher = d.matchDispatcher("sbs", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)
Expand All @@ -96,6 +154,12 @@ func TestEventRouter(t *testing.T) {
topicDispatcher, partitionDispatcher = d.matchDispatcher("test_index_value", "test")
require.IsType(t, &topic.DynamicTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.IndexValueDispatcher{}, partitionDispatcher)
=======
// match rule-6
topicDispatcher, partitionDispatcher = d.matchDispatcher("hard_code_schema", "test")
require.IsType(t, &topic.StaticTopicDispatcher{}, topicDispatcher)
require.IsType(t, &partition.DefaultDispatcher{}, partitionDispatcher)
>>>>>>> ef7a972df8 (kafka(ticdc): event router allow hard code topics and set the schema optional in the topic expression (#9755)):cdc/sink/dmlsink/mq/dispatcher/event_router_test.go
}

func TestGetActiveTopics(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions cdc/sink/mq/dispatcher/topic/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,28 @@ type Dispatcher interface {
Substitute(schema, table string) string
}

// StaticTopicDispatcher is a topic dispatcher which dispatches rows and ddls to the default topic.
// StaticTopicDispatcher is a topic dispatcher which dispatches rows and DDL to the specific topic.
type StaticTopicDispatcher struct {
defaultTopic string
topic string
}

// NewStaticTopicDispatcher returns a StaticTopicDispatcher.
func NewStaticTopicDispatcher(defaultTopic string) *StaticTopicDispatcher {
return &StaticTopicDispatcher{
defaultTopic: defaultTopic,
topic: defaultTopic,
}
}

// Substitute converts schema/table name in a topic expression to kafka topic name.
func (s *StaticTopicDispatcher) Substitute(schema, table string) string {
return s.defaultTopic
return s.topic
}

func (s *StaticTopicDispatcher) String() string {
return s.defaultTopic
return s.topic
}

// DynamicTopicDispatcher is a topic dispatcher which dispatches rows and ddls
// DynamicTopicDispatcher is a topic dispatcher which dispatches rows and DDLs
// dynamically to the target topics.
type DynamicTopicDispatcher struct {
expression Expression
Expand Down
6 changes: 4 additions & 2 deletions cdc/sink/mq/dispatcher/topic/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func TestDynamicTopicDispatcherForSchema(t *testing.T) {

topicExpr := Expression("hello_{schema}_world")
err := topicExpr.Validate()
require.Nil(t, err)
require.NoError(t, err)

testCase := []struct {
schema string
table string
Expand Down Expand Up @@ -68,7 +69,8 @@ func TestDynamicTopicDispatcherForTable(t *testing.T) {

topicExpr := Expression("{schema}_{table}")
err := topicExpr.Validate()
require.Nil(t, err)
require.NoError(t, err)

testCases := []struct {
schema string
table string
Expand Down
46 changes: 42 additions & 4 deletions cdc/sink/mq/dispatcher/topic/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
)

var (
// hardCodeTopicNameRe is used to match a topic name which is hard code in the config
hardCodeTopicNameRe = regexp.MustCompile(`^([A-Za-z0-9\._\-]+)$`)

// topicNameRE is used to match a valid topic expression
topicNameRE = regexp.MustCompile(
`^[A-Za-z0-9\._\-]*\{schema\}([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`,
`^[A-Za-z0-9\._\-]*(\{schema\})?([A-Za-z0-9\._\-]*\{table\})?[A-Za-z0-9\._\-]*$`,
)
// kafkaForbidRE is used to reject the characters which are forbidden in kafka topic name
kafkaForbidRE = regexp.MustCompile(`[^a-zA-Z0-9\._\-]`)
Expand All @@ -48,13 +51,14 @@ const kafkaTopicNameMaxLength = 249
type Expression string

// Validate checks whether a kafka topic name is valid or not.
// return true if the expression is hard coded.
func (e Expression) Validate() error {
// validate the topic expression
if ok := topicNameRE.MatchString(string(e)); !ok {
return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs()
if ok := topicNameRE.MatchString(string(e)); ok {
return nil
}

return nil
return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs()
}

// ValidateForAvro checks whether topic pattern is {schema}_{table}, the only allowed
Expand Down Expand Up @@ -94,3 +98,37 @@ func (e Expression) Substitute(schema, table string) string {
return topicName
}
}
<<<<<<< HEAD:cdc/sink/mq/dispatcher/topic/expression.go

Check failure on line 101 in cdc/sink/mq/dispatcher/topic/expression.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: non-declaration statement outside function body

Check failure on line 101 in cdc/sink/mq/dispatcher/topic/expression.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: non-declaration statement outside function body
=======

// PulsarValidate checks whether a pulsar topic name is valid or not.
func (e Expression) PulsarValidate() error {
// validate the topic expression
topicName := string(e)

if len(topicName) == 0 {
return errors.ErrPulsarInvalidTopicExpression.GenWithStackByArgs(
"topic name is empty")
}

// if not full name, must be simple name
if !pulsarTopicNameREFull.MatchString(topicName) {
if strings.Contains(topicName, "/") {
return errors.ErrPulsarInvalidTopicExpression.GenWithStackByArgs(
"it should be in the format of a <topic> and topic name must contain '{schema}'" +
"and simple topic name must not contain '/'")
}
} else if !pulsarTopicNameRE.MatchString(topicName) {
return errors.ErrPulsarInvalidTopicExpression.GenWithStackByArgs(
"it should be in the format of <tenant>/<namespace>/<topic> or <topic> " +
"and topic name must contain '{schema}'")
}

return nil
}

// IsHardCode checks whether a topic name is hard code or not.
func IsHardCode(topicName string) bool {
return hardCodeTopicNameRe.MatchString(topicName)
}
>>>>>>> ef7a972df8 (kafka(ticdc): event router allow hard code topics and set the schema optional in the topic expression (#9755)):cdc/sink/dmlsink/mq/dispatcher/topic/expression.go

Check failure on line 134 in cdc/sink/mq/dispatcher/topic/expression.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: non-declaration statement outside function body

Check failure on line 134 in cdc/sink/mq/dispatcher/topic/expression.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'

Check failure on line 134 in cdc/sink/mq/dispatcher/topic/expression.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: non-declaration statement outside function body

Check failure on line 134 in cdc/sink/mq/dispatcher/topic/expression.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'
24 changes: 24 additions & 0 deletions cdc/sink/mq/dispatcher/topic/expression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,30 @@ func TestSubstituteTopicExpression(t *testing.T) {
}
}

func TestSchemaOptional(t *testing.T) {
expression := "prefix_{table}"
topicExpr := Expression(expression)
err := topicExpr.Validate()
require.NoError(t, err)

schemaName := "test"
tableName := "table1"
topicName := topicExpr.Substitute(schemaName, tableName)
require.Equal(t, topicName, "prefix_table1")
}

func TestTableOptional(t *testing.T) {
expression := "prefix_{schema}"
topicExpr := Expression(expression)
err := topicExpr.Validate()
require.NoError(t, err)

schemaName := "test"
tableName := "abc"
topicName := topicExpr.Substitute(schemaName, tableName)
require.Equal(t, topicName, "prefix_test")
}

// cmd: go test -run='^$' -bench '^(BenchmarkSubstitute)$' github.com/pingcap/tiflow/cdc/sink/dispatcher/topic
// goos: linux
// goarch: amd64
Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/multi_topics/conf/changefeed.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[sink]
dispatchers = [
{ matcher = ['workload.*'], topic = "workload"},
{ matcher = ['test.*'], topic = "{schema}_{table}" },
]
48 changes: 47 additions & 1 deletion tests/integration_tests/multi_topics/data/step1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,50 @@ create table table3
);

insert into table3(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint)
values (1, 2, 3, 4, 5);
values (1, 2, 3, 4, 5);

create database workload;
use workload;

create table table1
(
id int auto_increment,
c_tinyint tinyint null,
c_smallint smallint null,
c_mediumint mediumint null,
c_int int null,
c_bigint bigint null,
constraint pk
primary key (id)
);
insert into table1(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint)
values (1, 2, 3, 4, 5);

create table table2
(
id int auto_increment,
c_tinyint tinyint null,
c_smallint smallint null,
c_mediumint mediumint null,
c_int int null,
c_bigint bigint null,
constraint pk
primary key (id)
);
insert into table2(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint)
values (1, 2, 3, 4, 5);

create table table3
(
id int auto_increment,
c_tinyint tinyint null,
c_smallint smallint null,
c_mediumint mediumint null,
c_int int null,
c_bigint bigint null,
constraint pk
primary key (id)
);

insert into table3(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint)
values (1, 2, 3, 4, 5);
2 changes: 2 additions & 0 deletions tests/integration_tests/multi_topics/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ function run() {
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/test_table${i}?protocol=canal-json&version=${KAFKA_VERSION}&enable-tidb-extension=true" "" ${i}
done

run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/workload?protocol=canal-json&enable-tidb-extension=true" ""

# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
for i in $(seq 1 3); do
check_table_exists test.table${i} ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
Expand Down

0 comments on commit 278b7d2

Please sign in to comment.