Skip to content

Commit

Permalink
ddl_manager (ticdc): Fix a panic in ddlManager. (#8716)
Browse files Browse the repository at this point in the history
close #8714
  • Loading branch information
asddongmen authored Apr 7, 2023
1 parent 3554e47 commit 0d82a60
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 14 deletions.
9 changes: 9 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,16 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
// Note: There may be some tableBarrierTs larger than otherBarrierTs,
// but we can ignore them because they will be handled in the processor.
if barrier.GlobalBarrierTs > otherBarrierTs {
log.Debug("There are other barriers less than ddl barrier, wait for them",
zap.Uint64("otherBarrierTs", otherBarrierTs),
zap.Uint64("ddlBarrierTs", barrier.GlobalBarrierTs))
barrier.GlobalBarrierTs = otherBarrierTs
}

if minTableBarrierTs > otherBarrierTs {
log.Debug("There are other barriers less than min table barrier, wait for them",
zap.Uint64("otherBarrierTs", otherBarrierTs),
zap.Uint64("ddlBarrierTs", barrier.GlobalBarrierTs))
minTableBarrierTs = otherBarrierTs
}

Expand Down
7 changes: 4 additions & 3 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ func TestEmitCheckpointTs(t *testing.T) {
require.Equal(t, cf.state.Status.CheckpointTs, mockDDLPuller.resolvedTs)
tables, err = cf.ddlManager.allTables(ctx)
require.Nil(t, err)
// The ephemeral table should have left no trace in the schema cache
require.Len(t, tables, 0)
// The ephemeral table should only be deleted after the ddl is executed.
require.Len(t, tables, 1)
// We can't use the new schema because the ddl hasn't been executed yet.
ts, names = mockDDLSink.getCheckpointTsAndTableNames()
require.Equal(t, ts, mockDDLPuller.resolvedTs)
Expand Down Expand Up @@ -504,7 +504,8 @@ func TestFinished(t *testing.T) {
cf.Tick(ctx, captures)
tester.MustApplyPatches()
}

fmt.Println("checkpoint ts", cf.state.Status.CheckpointTs)
fmt.Println("target ts", cf.state.Info.TargetTs)
require.Equal(t, cf.state.Status.CheckpointTs, cf.state.Info.TargetTs)
require.Equal(t, cf.state.Info.State, model.StateFinished)
}
Expand Down
34 changes: 23 additions & 11 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package owner

import (
"context"
"math/rand"
"sort"
"time"

Expand Down Expand Up @@ -146,7 +147,7 @@ func newDDLManager(
redoMetaManager: redoMetaManager,
startTs: startTs,
checkpointTs: checkpointTs,
ddlResolvedTs: checkpointTs,
ddlResolvedTs: startTs,
BDRMode: bdrMode,
// use the passed sinkType after we support get resolvedTs from sink
sinkType: model.DB,
Expand Down Expand Up @@ -201,13 +202,15 @@ func (m *ddlManager) tick(
log.Info("handle a ddl job",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("ID", m.changfeedID.ID),
zap.Any("ddlJob", job))
zap.Int64("tableID", job.TableID),
zap.Int64("jobID", job.ID),
zap.String("query", job.Query),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
)
events, err := m.schema.BuildDDLEvents(ctx, job)
if err != nil {
return nil, minTableBarrierTs, barrier, err
}
// Clear the table cache after the schema is updated.
m.cleanCache()

for _, event := range events {
// If changefeed is in BDRMode, skip ddl.
Expand Down Expand Up @@ -281,7 +284,6 @@ func (m *ddlManager) tick(

if m.executingDDL == nil {
m.executingDDL = nextDDL
m.cleanCache()
}

err := m.executeDDL(ctx)
Expand Down Expand Up @@ -342,6 +344,13 @@ func (m *ddlManager) executeDDL(ctx context.Context) error {
failpoint.Return(nil)
}
})

failpoint.Inject("ExecuteDDLSlowly", func() {
lag := time.Duration(rand.Intn(5000)) * time.Millisecond
log.Warn("execute ddl slowly", zap.Duration("lag", lag))
time.Sleep(lag)
})

done, err := m.ddlSink.emitDDLEvent(ctx, m.executingDDL)
if err != nil {
return err
Expand All @@ -359,6 +368,7 @@ func (m *ddlManager) executeDDL(ctx context.Context) error {
m.schema.DoGC(m.executingDDL.CommitTs - 1)
m.justSentDDL = m.executingDDL
m.executingDDL = nil
m.cleanCache()
}
return nil
}
Expand Down Expand Up @@ -491,7 +501,8 @@ func (m *ddlManager) allTables(ctx context.Context) ([]*model.TableInfo, error)
log.Debug("changefeed current tables updated",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("changefeed", m.changfeedID.ID),
zap.Uint64("checkpointTs", ts),
zap.Uint64("checkpointTs", m.checkpointTs),
zap.Uint64("snapshotTs", ts),
zap.Any("tables", m.tableInfoCache),
)
return m.tableInfoCache, nil
Expand Down Expand Up @@ -531,16 +542,17 @@ func (m *ddlManager) allPhysicalTables(ctx context.Context) ([]model.TableID, er
func (m *ddlManager) getSnapshotTs() (ts uint64) {
ts = m.checkpointTs

if m.checkpointTs == m.startTs+1 && m.executingDDL == nil {
// If checkpointTs is equal to startTs+1, and executingDDL is nil
// it means that the changefeed is just started, and the physicalTablesCache
// is empty. So we need to get all tables from the snapshot at the startTs.
if m.ddlResolvedTs == m.startTs {
// If ddlResolvedTs is equal to startTs it means that the changefeed is just started,
// So we need to get all tables from the snapshot at the startTs.
ts = m.startTs
log.Debug("changefeed is just started, use startTs to get snapshot",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("changefeed", m.changfeedID.ID),
zap.Uint64("startTs", m.startTs),
zap.Uint64("checkpointTs", m.checkpointTs))
zap.Uint64("checkpointTs", m.checkpointTs),
zap.Uint64("ddlResolvedTs", m.ddlResolvedTs),
)
return
}

Expand Down
28 changes: 28 additions & 0 deletions tests/integration_tests/ddl_manager/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# diff Configuration.
check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/ddl_manager/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["ddl_manager.*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
105 changes: 105 additions & 0 deletions tests/integration_tests/ddl_manager/data/prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
drop database if exists `ddl_manager`;
create database `ddl_manager`;

drop database if exists `ddl_manager_test1`;
create database `ddl_manager_test1`;

drop database if exists `ddl_manager_test2`;
create database `ddl_manager_test2`;

drop database if exists `ddl_manager_test3`;
create database `ddl_manager_test3`;

use `ddl_manager`;

create table t1 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
INSERT INTO t1 (val, col0) VALUES (1, 1);
INSERT INTO t1 (val, col0) VALUES (2, 2);
INSERT INTO t1 (val, col0) VALUES (3, 3);
INSERT INTO t1 (val, col0) VALUES (4, 4);
INSERT INTO t1 (val, col0) VALUES (5, 5);


CREATE TABLE t2 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
INSERT INTO t2 (val, col0) VALUES (1, 1);
INSERT INTO t2 (val, col0) VALUES (2, 2);
INSERT INTO t2 (val, col0) VALUES (3, 3);
INSERT INTO t2 (val, col0) VALUES (4, 4);
INSERT INTO t2 (val, col0) VALUES (5, 5);

drop table t2;

create table t3 (
a int, primary key (a)
) partition by hash(a) partitions 5;
insert into t3 values (1),(2),(3),(4),(5),(6);
insert into t3 values (7),(8),(9);
alter table t3 truncate partition p3;

create table t4 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21));
insert into t4 values (1),(2),(3),(4),(5),(6);
insert into t4 values (7),(8),(9);
insert into t4 values (11),(12),(20);
alter table t4 add partition (partition p3 values less than (30), partition p4 values less than (40));

CREATE TABLE t5 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);

CREATE TABLE t6 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);

DROP TABLE t5;

drop database if exists `ddl_manager_test2`;

CREATE TABLE t7 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);

DROP TABLE t6;

CREATE TABLE t8 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);

DROP TABLE t7;

CREATE TABLE t9 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);

DROP TABLE t8;

CREATE TABLE t10 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);

DROP TABLE t9;

CREATE TABLE finish_mark (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
53 changes: 53 additions & 0 deletions tests/integration_tests/ddl_manager/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ExecuteDDLSlowly=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')

# this test contains `recover table`, which requires super privilege, so we
# can't use the normal user
TOPIC_NAME="ticdc-ddl-mamager-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://[email protected]:3306/" ;;
esac
changefeed_id="ddl-manager"
run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id}

if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# kill owner to make sure ddl manager is working right when owner is down and up
kill_cdc_pid $owner_pid
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
check_table_exists ddl_manager.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
# make sure all tables are equal in upstream and downstream
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180
cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit 0d82a60

Please sign in to comment.