From b7d47ecbd14088448582757b4d7bcb177663e6e4 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 16 Oct 2023 18:11:47 +0800 Subject: [PATCH] fix integration test. --- cmd/kafka-consumer/main.go | 22 ++++--------------- .../mq_sink_dispatcher/run.sh | 4 ++-- 2 files changed, 6 insertions(+), 20 deletions(-) diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index c2b2ae20df3..b160d423b54 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -486,25 +486,11 @@ func NewConsumer(ctx context.Context, o *consumerOption) (*Consumer, error) { } if o.replicaConfig != nil { - // table info is not synced to the downstream, - // cannot check the dispatch rule, so keep the event router as nil. - needTableInfo := false - if o.replicaConfig.Sink.DispatchRules != nil { - for _, rule := range o.replicaConfig.Sink.DispatchRules { - if rule.IndexName != "" || len(rule.Columns) != 0 { - needTableInfo = true - break - } - } - } - - if !needTableInfo { - eventRouter, err := dispatcher.NewEventRouter(o.replicaConfig, o.protocol, o.topic, "kafka") - if err != nil { - return nil, errors.Trace(err) - } - c.eventRouter = eventRouter + eventRouter, err := dispatcher.NewEventRouter(o.replicaConfig, o.protocol, o.topic, "kafka") + if err != nil { + return nil, errors.Trace(err) } + c.eventRouter = eventRouter } c.sinks = make([]*partitionSinks, o.partitionNum) diff --git a/tests/integration_tests/mq_sink_dispatcher/run.sh b/tests/integration_tests/mq_sink_dispatcher/run.sh index 94897792711..9212ac0b94b 100644 --- a/tests/integration_tests/mq_sink_dispatcher/run.sh +++ b/tests/integration_tests/mq_sink_dispatcher/run.sh @@ -33,8 +33,6 @@ function run() { ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} $changefeed_id "normal" "null" "" - cdc_kafka_consumer --upstream-uri $SINK_URI --downstream-uri="mysql://root@127.0.0.1:3306/?safe-mode=true&batch-dml-enable=false" --upstream-tidb-dsn="root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/?" --config="$CUR/conf/changefeed.toml" 2>&1 & - run_sql "DROP DATABASE if exists dispatcher;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "CREATE DATABASE dispatcher;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "CREATE TABLE dispatcher.index (a int primary key, b int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} @@ -49,6 +47,8 @@ function run() { ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} $changefeed_id "normal" "null" "" + cdc_kafka_consumer --upstream-uri $SINK_URI --downstream-uri="mysql://root@127.0.0.1:3306/?safe-mode=true&batch-dml-enable=false" --upstream-tidb-dsn="root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/?" --config="$CUR/conf/new_changefeed.toml" 2>&1 & + run_sql "INSERT INTO dispatcher.index values (2, 3);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "INSERT INTO dispatcher.index values (3, 4);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "INSERT INTO dispatcher.index values (4, 5);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}