From 0d82a607c5511bc7a607c1494a6012868d50339f Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Fri, 7 Apr 2023 12:50:58 +0800 Subject: [PATCH] ddl_manager (ticdc): Fix a panic in ddlManager. (#8716) close pingcap/tiflow#8714 --- cdc/owner/changefeed.go | 9 ++ cdc/owner/changefeed_test.go | 7 +- cdc/owner/ddl_manager.go | 34 ++++-- .../ddl_manager/conf/diff_config.toml | 28 +++++ .../ddl_manager/data/prepare.sql | 105 ++++++++++++++++++ tests/integration_tests/ddl_manager/run.sh | 53 +++++++++ 6 files changed, 222 insertions(+), 14 deletions(-) create mode 100644 tests/integration_tests/ddl_manager/conf/diff_config.toml create mode 100644 tests/integration_tests/ddl_manager/data/prepare.sql create mode 100755 tests/integration_tests/ddl_manager/run.sh diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 0951be2a88d..9de43d2abcd 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -312,7 +312,16 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* // Note: There may be some tableBarrierTs larger than otherBarrierTs, // but we can ignore them because they will be handled in the processor. if barrier.GlobalBarrierTs > otherBarrierTs { + log.Debug("There are other barriers less than ddl barrier, wait for them", + zap.Uint64("otherBarrierTs", otherBarrierTs), + zap.Uint64("ddlBarrierTs", barrier.GlobalBarrierTs)) barrier.GlobalBarrierTs = otherBarrierTs + } + + if minTableBarrierTs > otherBarrierTs { + log.Debug("There are other barriers less than min table barrier, wait for them", + zap.Uint64("otherBarrierTs", otherBarrierTs), + zap.Uint64("ddlBarrierTs", barrier.GlobalBarrierTs)) minTableBarrierTs = otherBarrierTs } diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index be77a7aab06..067adc06413 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -435,8 +435,8 @@ func TestEmitCheckpointTs(t *testing.T) { require.Equal(t, cf.state.Status.CheckpointTs, mockDDLPuller.resolvedTs) tables, err = cf.ddlManager.allTables(ctx) require.Nil(t, err) - // The ephemeral table should have left no trace in the schema cache - require.Len(t, tables, 0) + // The ephemeral table should only be deleted after the ddl is executed. + require.Len(t, tables, 1) // We can't use the new schema because the ddl hasn't been executed yet. ts, names = mockDDLSink.getCheckpointTsAndTableNames() require.Equal(t, ts, mockDDLPuller.resolvedTs) @@ -504,7 +504,8 @@ func TestFinished(t *testing.T) { cf.Tick(ctx, captures) tester.MustApplyPatches() } - + fmt.Println("checkpoint ts", cf.state.Status.CheckpointTs) + fmt.Println("target ts", cf.state.Info.TargetTs) require.Equal(t, cf.state.Status.CheckpointTs, cf.state.Info.TargetTs) require.Equal(t, cf.state.Info.State, model.StateFinished) } diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index d0d01f70f73..6a7daf04d1f 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -15,6 +15,7 @@ package owner import ( "context" + "math/rand" "sort" "time" @@ -146,7 +147,7 @@ func newDDLManager( redoMetaManager: redoMetaManager, startTs: startTs, checkpointTs: checkpointTs, - ddlResolvedTs: checkpointTs, + ddlResolvedTs: startTs, BDRMode: bdrMode, // use the passed sinkType after we support get resolvedTs from sink sinkType: model.DB, @@ -201,13 +202,15 @@ func (m *ddlManager) tick( log.Info("handle a ddl job", zap.String("namespace", m.changfeedID.Namespace), zap.String("ID", m.changfeedID.ID), - zap.Any("ddlJob", job)) + zap.Int64("tableID", job.TableID), + zap.Int64("jobID", job.ID), + zap.String("query", job.Query), + zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), + ) events, err := m.schema.BuildDDLEvents(ctx, job) if err != nil { return nil, minTableBarrierTs, barrier, err } - // Clear the table cache after the schema is updated. - m.cleanCache() for _, event := range events { // If changefeed is in BDRMode, skip ddl. @@ -281,7 +284,6 @@ func (m *ddlManager) tick( if m.executingDDL == nil { m.executingDDL = nextDDL - m.cleanCache() } err := m.executeDDL(ctx) @@ -342,6 +344,13 @@ func (m *ddlManager) executeDDL(ctx context.Context) error { failpoint.Return(nil) } }) + + failpoint.Inject("ExecuteDDLSlowly", func() { + lag := time.Duration(rand.Intn(5000)) * time.Millisecond + log.Warn("execute ddl slowly", zap.Duration("lag", lag)) + time.Sleep(lag) + }) + done, err := m.ddlSink.emitDDLEvent(ctx, m.executingDDL) if err != nil { return err @@ -359,6 +368,7 @@ func (m *ddlManager) executeDDL(ctx context.Context) error { m.schema.DoGC(m.executingDDL.CommitTs - 1) m.justSentDDL = m.executingDDL m.executingDDL = nil + m.cleanCache() } return nil } @@ -491,7 +501,8 @@ func (m *ddlManager) allTables(ctx context.Context) ([]*model.TableInfo, error) log.Debug("changefeed current tables updated", zap.String("namespace", m.changfeedID.Namespace), zap.String("changefeed", m.changfeedID.ID), - zap.Uint64("checkpointTs", ts), + zap.Uint64("checkpointTs", m.checkpointTs), + zap.Uint64("snapshotTs", ts), zap.Any("tables", m.tableInfoCache), ) return m.tableInfoCache, nil @@ -531,16 +542,17 @@ func (m *ddlManager) allPhysicalTables(ctx context.Context) ([]model.TableID, er func (m *ddlManager) getSnapshotTs() (ts uint64) { ts = m.checkpointTs - if m.checkpointTs == m.startTs+1 && m.executingDDL == nil { - // If checkpointTs is equal to startTs+1, and executingDDL is nil - // it means that the changefeed is just started, and the physicalTablesCache - // is empty. So we need to get all tables from the snapshot at the startTs. + if m.ddlResolvedTs == m.startTs { + // If ddlResolvedTs is equal to startTs it means that the changefeed is just started, + // So we need to get all tables from the snapshot at the startTs. ts = m.startTs log.Debug("changefeed is just started, use startTs to get snapshot", zap.String("namespace", m.changfeedID.Namespace), zap.String("changefeed", m.changfeedID.ID), zap.Uint64("startTs", m.startTs), - zap.Uint64("checkpointTs", m.checkpointTs)) + zap.Uint64("checkpointTs", m.checkpointTs), + zap.Uint64("ddlResolvedTs", m.ddlResolvedTs), + ) return } diff --git a/tests/integration_tests/ddl_manager/conf/diff_config.toml b/tests/integration_tests/ddl_manager/conf/diff_config.toml new file mode 100644 index 00000000000..8f488a40ead --- /dev/null +++ b/tests/integration_tests/ddl_manager/conf/diff_config.toml @@ -0,0 +1,28 @@ +# diff Configuration. +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/ddl_manager/sync_diff/output" + +source-instances = ["mysql1"] + +target-instance = "tidb0" + +target-check-tables = ["ddl_manager.*"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" diff --git a/tests/integration_tests/ddl_manager/data/prepare.sql b/tests/integration_tests/ddl_manager/data/prepare.sql new file mode 100644 index 00000000000..80ef9efe808 --- /dev/null +++ b/tests/integration_tests/ddl_manager/data/prepare.sql @@ -0,0 +1,105 @@ +drop database if exists `ddl_manager`; +create database `ddl_manager`; + +drop database if exists `ddl_manager_test1`; +create database `ddl_manager_test1`; + +drop database if exists `ddl_manager_test2`; +create database `ddl_manager_test2`; + +drop database if exists `ddl_manager_test3`; +create database `ddl_manager_test3`; + +use `ddl_manager`; + +create table t1 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); +INSERT INTO t1 (val, col0) VALUES (1, 1); +INSERT INTO t1 (val, col0) VALUES (2, 2); +INSERT INTO t1 (val, col0) VALUES (3, 3); +INSERT INTO t1 (val, col0) VALUES (4, 4); +INSERT INTO t1 (val, col0) VALUES (5, 5); + + +CREATE TABLE t2 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); +INSERT INTO t2 (val, col0) VALUES (1, 1); +INSERT INTO t2 (val, col0) VALUES (2, 2); +INSERT INTO t2 (val, col0) VALUES (3, 3); +INSERT INTO t2 (val, col0) VALUES (4, 4); +INSERT INTO t2 (val, col0) VALUES (5, 5); + +drop table t2; + +create table t3 ( + a int, primary key (a) +) partition by hash(a) partitions 5; +insert into t3 values (1),(2),(3),(4),(5),(6); +insert into t3 values (7),(8),(9); +alter table t3 truncate partition p3; + +create table t4 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21)); +insert into t4 values (1),(2),(3),(4),(5),(6); +insert into t4 values (7),(8),(9); +insert into t4 values (11),(12),(20); +alter table t4 add partition (partition p3 values less than (30), partition p4 values less than (40)); + +CREATE TABLE t5 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); + +CREATE TABLE t6 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); + +DROP TABLE t5; + +drop database if exists `ddl_manager_test2`; + +CREATE TABLE t7 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); + +DROP TABLE t6; + +CREATE TABLE t8 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); + +DROP TABLE t7; + +CREATE TABLE t9 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); + +DROP TABLE t8; + +CREATE TABLE t10 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); + +DROP TABLE t9; + +CREATE TABLE finish_mark ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); diff --git a/tests/integration_tests/ddl_manager/run.sh b/tests/integration_tests/ddl_manager/run.sh new file mode 100755 index 00000000000..29dbb7452b1 --- /dev/null +++ b/tests/integration_tests/ddl_manager/run.sh @@ -0,0 +1,53 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ExecuteDDLSlowly=return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') + + # this test contains `recover table`, which requires super privilege, so we + # can't use the normal user + TOPIC_NAME="ticdc-ddl-mamager-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + *) SINK_URI="mysql://root@127.0.0.1:3306/" ;; + esac + changefeed_id="ddl-manager" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id} + + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + fi + + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # kill owner to make sure ddl manager is working right when owner is down and up + kill_cdc_pid $owner_pid + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + check_table_exists ddl_manager.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + # make sure all tables are equal in upstream and downstream + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180 + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"