diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index dddbcd323fd..5d860150ba1 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -223,9 +223,23 @@ type Syncer struct { // `lower_case_table_names` setting of upstream db SourceTableNamesFlavor utils.LowerCaseTableNamesFlavor - tsOffset atomic.Int64 // time offset between upstream and syncer, DM's timestamp - MySQL's timestamp - secondsBehindMaster atomic.Int64 // current task delay second behind upstream - workerJobTSArray []*atomic.Int64 // worker's sync job TS array, note that idx=0 is skip idx and idx=1 is ddl idx,sql worker job idx=(queue id + 2) + // time difference between upstream and DM nodes: time of DM - time of upstream. + // we use this to calculate replication lag more accurately when clock is not synced + // on either upstream or DM nodes. + tsOffset atomic.Int64 + // this field measures the time difference in seconds between current time of DM and + // the minimal timestamp of currently processing binlog events. + // this lag will consider time difference between upstream and DM nodes + secondsBehindMaster atomic.Int64 + // stores the last job TS(binlog event timestamp) of each worker, + // if there's no active job, the corresponding worker's TS is reset to 0. + // since DML worker runs jobs in batch, the TS is the TS of the first job in the batch. + // We account for skipped events too, if the distance between up and downstream is long, + // and there's no interested binlog event in between, we can have a decreasing lag. + // - 0 is for skip jobs + // - 1 is ddl worker + // - 2+ is for DML worker job idx=(queue id + 2) + workerJobTSArray []*atomic.Int64 lastCheckpointFlushedTime time.Time firstMeetBinlogTS *int64 @@ -615,6 +629,10 @@ func (s *Syncer) reset() { if s.streamerController != nil { s.streamerController.Close() } + s.secondsBehindMaster.Store(0) + for _, jobTS := range s.workerJobTSArray { + jobTS.Store(0) + } // create new job chans s.newJobChans() s.checkpoint.DiscardPendingSnapshots() @@ -878,7 +896,7 @@ func (s *Syncer) calcReplicationLag(headerTS int64) int64 { // updateReplicationJobTS store job TS, it is called after every batch dml job / one skip job / one ddl job is added and committed. func (s *Syncer) updateReplicationJobTS(job *job, jobIdx int) { - // when job is nil mean no job in this bucket, need do reset this bucket job ts to 0 + // when job is nil mean no job in this bucket, need to reset this bucket job ts to 0 if job == nil { s.workerJobTSArray[jobIdx].Store(0) } else { diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 20e87d27170..f308cfb2002 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -978,7 +978,11 @@ func (s *testSyncerSuite) TestRun(c *C) { ctx, cancel = context.WithCancel(context.Background()) resultCh = make(chan pb.ProcessResult) // simulate `syncer.Resume` here, but doesn't reset database conns + syncer.secondsBehindMaster.Store(100) + syncer.workerJobTSArray[ddlJobIdx].Store(100) syncer.reset() + c.Assert(syncer.secondsBehindMaster.Load(), Equals, int64(0)) + c.Assert(syncer.workerJobTSArray[ddlJobIdx].Load(), Equals, int64(0)) mockStreamerProducer = &MockStreamProducer{s.generateEvents(events2, c)} mockStreamer, err = mockStreamerProducer.GenerateStreamFrom(binlog.MustZeroLocation(mysql.MySQLFlavor)) c.Assert(err, IsNil) diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index b0a178d6eda..84fcc858aa5 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -585,7 +585,7 @@ func (st *SubTask) Kill() { st.validator = nil } -// Pause pauses a running sub task or a sub task paused by error. +// Pause pauses a running subtask or a subtask paused by error. func (st *SubTask) Pause() error { if st.markResultCanceled() { return nil