From 6b74d6e830f81e21ca01553f4989689fc721f930 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 21 Aug 2023 12:08:02 +0800 Subject: [PATCH] dm: fix lag keeps growing when failed ddl is skipped and no following ddl (#9607) close pingcap/tiflow#9605 --- dm/syncer/syncer.go | 26 ++++++++++++++++++++++---- dm/syncer/syncer_test.go | 4 ++++ dm/worker/subtask.go | 2 +- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index e2848925cc1..634080e2fe0 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -222,9 +222,23 @@ type Syncer struct { // `lower_case_table_names` setting of upstream db SourceTableNamesFlavor conn.LowerCaseTableNamesFlavor - tsOffset atomic.Int64 // time offset between upstream and syncer, DM's timestamp - MySQL's timestamp - secondsBehindMaster atomic.Int64 // current task delay second behind upstream - workerJobTSArray []*atomic.Int64 // worker's sync job TS array, note that idx=0 is skip idx and idx=1 is ddl idx,sql worker job idx=(queue id + 2) + // time difference between upstream and DM nodes: time of DM - time of upstream. + // we use this to calculate replication lag more accurately when clock is not synced + // on either upstream or DM nodes. + tsOffset atomic.Int64 + // this field measures the time difference in seconds between current time of DM and + // the minimal timestamp of currently processing binlog events. + // this lag will consider time difference between upstream and DM nodes + secondsBehindMaster atomic.Int64 + // stores the last job TS(binlog event timestamp) of each worker, + // if there's no active job, the corresponding worker's TS is reset to 0. + // since DML worker runs jobs in batch, the TS is the TS of the first job in the batch. + // We account for skipped events too, if the distance between up and downstream is long, + // and there's no interested binlog event in between, we can have a decreasing lag. + // - 0 is for skip jobs + // - 1 is ddl worker + // - 2+ is for DML worker job idx=(queue id + 2) + workerJobTSArray []*atomic.Int64 lastCheckpointFlushedTime time.Time firstMeetBinlogTS *int64 @@ -607,6 +621,10 @@ func (s *Syncer) reset() { if s.streamerController != nil { s.streamerController.Close() } + s.secondsBehindMaster.Store(0) + for _, jobTS := range s.workerJobTSArray { + jobTS.Store(0) + } // create new job chans s.newJobChans() s.checkpoint.DiscardPendingSnapshots() @@ -880,7 +898,7 @@ func (s *Syncer) calcReplicationLag(headerTS int64) int64 { // updateReplicationJobTS store job TS, it is called after every batch dml job / one skip job / one ddl job is added and committed. func (s *Syncer) updateReplicationJobTS(job *job, jobIdx int) { - // when job is nil mean no job in this bucket, need do reset this bucket job ts to 0 + // when job is nil mean no job in this bucket, need to reset this bucket job ts to 0 if job == nil { s.workerJobTSArray[jobIdx].Store(0) } else { diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index c8cb0002147..6ad694b5346 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -964,7 +964,11 @@ func (s *testSyncerSuite) TestRun(c *C) { ctx, cancel = context.WithCancel(context.Background()) resultCh = make(chan pb.ProcessResult) // simulate `syncer.Resume` here, but doesn't reset database conns + syncer.secondsBehindMaster.Store(100) + syncer.workerJobTSArray[ddlJobIdx].Store(100) syncer.reset() + c.Assert(syncer.secondsBehindMaster.Load(), Equals, int64(0)) + c.Assert(syncer.workerJobTSArray[ddlJobIdx].Load(), Equals, int64(0)) mockStreamerProducer = &MockStreamProducer{s.generateEvents(events2, c)} mockStreamer, err = mockStreamerProducer.GenerateStreamFrom(binlog.MustZeroLocation(mysql.MySQLFlavor)) c.Assert(err, IsNil) diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index c9ac683fb14..374f3276632 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -575,7 +575,7 @@ func (st *SubTask) Kill() { st.validator = nil } -// Pause pauses a running sub task or a sub task paused by error. +// Pause pauses a running subtask or a subtask paused by error. func (st *SubTask) Pause() error { if st.markResultCanceled() { return nil