Skip to content

Commit

Permalink
fix integration test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Oct 16, 2023
1 parent 9d309e1 commit b7d47ec
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 20 deletions.
22 changes: 4 additions & 18 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,25 +486,11 @@ func NewConsumer(ctx context.Context, o *consumerOption) (*Consumer, error) {
}

if o.replicaConfig != nil {
// table info is not synced to the downstream,
// cannot check the dispatch rule, so keep the event router as nil.
needTableInfo := false
if o.replicaConfig.Sink.DispatchRules != nil {
for _, rule := range o.replicaConfig.Sink.DispatchRules {
if rule.IndexName != "" || len(rule.Columns) != 0 {
needTableInfo = true
break
}
}
}

if !needTableInfo {
eventRouter, err := dispatcher.NewEventRouter(o.replicaConfig, o.protocol, o.topic, "kafka")
if err != nil {
return nil, errors.Trace(err)
}
c.eventRouter = eventRouter
eventRouter, err := dispatcher.NewEventRouter(o.replicaConfig, o.protocol, o.topic, "kafka")
if err != nil {
return nil, errors.Trace(err)
}
c.eventRouter = eventRouter
}

c.sinks = make([]*partitionSinks, o.partitionNum)
Expand Down
4 changes: 2 additions & 2 deletions tests/integration_tests/mq_sink_dispatcher/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ function run() {

ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} $changefeed_id "normal" "null" ""

cdc_kafka_consumer --upstream-uri $SINK_URI --downstream-uri="mysql://[email protected]:3306/?safe-mode=true&batch-dml-enable=false" --upstream-tidb-dsn="root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/?" --config="$CUR/conf/changefeed.toml" 2>&1 &

run_sql "DROP DATABASE if exists dispatcher;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE DATABASE dispatcher;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE TABLE dispatcher.index (a int primary key, b int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand All @@ -49,6 +47,8 @@ function run() {

ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} $changefeed_id "normal" "null" ""

cdc_kafka_consumer --upstream-uri $SINK_URI --downstream-uri="mysql://[email protected]:3306/?safe-mode=true&batch-dml-enable=false" --upstream-tidb-dsn="root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/?" --config="$CUR/conf/new_changefeed.toml" 2>&1 &

run_sql "INSERT INTO dispatcher.index values (2, 3);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO dispatcher.index values (3, 4);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO dispatcher.index values (4, 5);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down

0 comments on commit b7d47ec

Please sign in to comment.