Skip to content

Commit

Permalink
sink(cdc): fix the bug that mq sink can lost callbacks (#9852) (#9858)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
ti-chi-bot authored Oct 18, 2023
1 parent 8ad2fba commit c27eddf
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 3 deletions.
18 changes: 15 additions & 3 deletions cdc/sinkv2/eventsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/builder"
Expand All @@ -35,6 +37,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// Assert EventSink[E event.TableEvent] implementation
Expand All @@ -60,8 +63,9 @@ type dmlSink struct {
isDead bool
}

ctx context.Context
cancel context.CancelFunc
ctx context.Context
cancel context.CancelFunc
cancelErr error

wg sync.WaitGroup
dead chan struct{}
Expand All @@ -77,13 +81,14 @@ func newSink(ctx context.Context,
errCh chan error,
) (*dmlSink, error) {
changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
ctx, cancel := context.WithCancel(ctx)

encoderBuilder, err := builder.NewEventBatchEncoderBuilder(ctx, encoderConfig)
if err != nil {
cancel()
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

ctx, cancel := context.WithCancel(ctx)
statistics := metrics.NewStatistics(ctx, sink.TxnSink)
worker := newWorker(changefeedID, encoderConfig.Protocol,
encoderBuilder, encoderConcurrency, producer, statistics)
Expand Down Expand Up @@ -149,10 +154,17 @@ func (s *dmlSink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.Single
continue
}
callback := mergedCallback(txn.Callback, uint64(len(txn.Event.Rows)))

for _, row := range txn.Event.Rows {
topic := s.alive.eventRouter.GetTopicForRowChange(row)
partitionNum, err := s.alive.topicManager.GetPartitionNum(topic)
failpoint.Inject("MQSinkGetPartitionError", func() {
log.Info("failpoint MQSinkGetPartitionError injected", zap.String("changefeedID", s.id.ID))
err = errors.New("MQSinkGetPartitionError")
})
if err != nil {
s.cancelErr = err
s.cancel()
return errors.Trace(err)
}
partition := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/mq_sink_lost_callback/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["mq_sink_lost_callback.t"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
57 changes: 57 additions & 0 deletions tests/integration_tests/mq_sink_lost_callback/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

CDC_COUNT=3
DB_COUNT=4

function test_mq_sink_lost_callback() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

run_sql "DROP DATABASE if exists mq_sink_lost_callback;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE DATABASE mq_sink_lost_callback;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE TABLE mq_sink_lost_callback.t (a int not null primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

cd $WORK_DIR
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/MQSinkGetPartitionError=2*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info"

TOPIC_NAME="ticdc-kafka-message-test-$RANDOM"
SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=12582912"
run_cdc_cli changefeed create --sink-uri="$SINK_URI"

run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (1);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (2);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (3);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (4);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO mq_sink_lost_callback.t (a) values (5);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}"
fi
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
stop_tidb_cluster
}

function run() {
# test kafka sink only in this case
if [ "$SINK_TYPE" != "kafka" ]; then
return
fi

test_mq_sink_lost_callback $*
}

trap stop_tidb_cluster EXIT
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
87 changes: 87 additions & 0 deletions tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#!/bin/bash

set -eo pipefail

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)

sink_type=$1
group=$2

# Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant
# changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence
# multi_cdc_cluster capture_suicide_while_balance_table
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_suicide server_config_compatibility"
mysql_only_http="http_api http_api_tls api_v2"
mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"

kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume mq_sink_lost_callback"
kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic multi_topics avro_basic canal_json_handle_key_only open_protocol_handle_key_only canal_json_claim_check open_protocol_claim_check"
kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2"

storage_only="lossy_ddl storage_csv_update"
storage_only_csv="csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table"
storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_table"

# Define groups
# Note: If new group is added, the group name must also be added to CI
# * https://github.com/PingCAP-QE/ci/blob/main/pipelines/pingcap/tiflow/latest/pull_cdc_integration_kafka_test.groovy
# * https://github.com/PingCAP-QE/ci/blob/main/pipelines/pingcap/tiflow/latest/pull_cdc_integration_test.groovy
# Each group of tests consumes as much time as possible, thus reducing CI waiting time.
# Putting multiple light tests together and heavy tests in a separate group.
declare -A groups
groups=(
# Note: only the tests in the first three groups are running in storage sink pipeline.
["G00"]="$mysql_only $kafka_only $storage_only"
["G01"]="$mysql_only_http $kafka_only_protocol $storage_only_canal_json multi_tables_ddl"
["G02"]="$mysql_only_consistent_replicate $kafka_only_v2 $storage_only_csv"
["G03"]='row_format drop_many_tables processor_stop_delay partition_table'
["G04"]='foreign_key ddl_puller_lag ddl_only_block_related_table changefeed_auto_stop'
["G05"]='charset_gbk ddl_manager multi_source'
["G06"]='sink_retry changefeed_error ddl_sequence resourcecontrol'
["G07"]='kv_client_stream_reconnect cdc split_region'
["G08"]='processor_err_chan changefeed_reconstruct multi_capture'
["G09"]='gc_safepoint changefeed_pause_resume cli savepoint'
["G10"]='default_value simple cdc_server_tips event_filter'
["G11"]='resolve_lock move_table autorandom generate_column'
["G12"]='many_pk_or_uk capture_session_done_during_task ddl_attributes'
["G13"]='tiflash region_merge common_1'
["G14"]='big_txn changefeed_finish force_replicate_table'
["G15"]='new_ci_collation batch_add_table multi_rocks'
# currently G16 is not running in kafka pipeline
["G16"]='owner_resign processor_etcd_worker_delay sink_hang'
["G17"]='clustered_index processor_resolved_ts_fallback'
# only run the following tests in mysql pipeline
["G18"]='availability http_proxies sequence'
["G19"]='changefeed_fast_fail batch_update_to_no_batch changefeed_resume_with_checkpoint_ts'
["G20"]='tidb_mysql_test ddl_reentrant multi_cdc_cluster'
["G21"]='bank kill_owner_with_ddl owner_remove_table_error'
)

# Get other cases not in groups, to avoid missing any case
others=()
for script in "$CUR"/*/run.sh; do
test_name="$(basename "$(dirname "$script")")"
# shellcheck disable=SC2076
if [[ ! " ${groups[*]} " =~ " ${test_name} " ]]; then
others=("${others[@]} ${test_name}")
fi
done

if [[ "$group" == "others" ]]; then
if [[ -z $others ]]; then
echo "All CDC integration test cases are added to groups"
exit 0
fi
echo "Error: "$others" is not added to any group in tests/integration_tests/run_group.sh"
exit 1
elif [[ " ${!groups[*]} " =~ " ${group} " ]]; then
test_names="${groups[${group}]}"
# Run test cases
if [[ -n $test_names ]]; then
echo "Run cases: ${test_names}"
"${CUR}"/run.sh "${sink_type}" "${test_names}"
fi
else
echo "Error: invalid group name: ${group}"
exit 1
fi

0 comments on commit c27eddf

Please sign in to comment.