Skip to content

Commit

Permalink
dm: fix lag keeps growing when failed ddl is skipped and no following…
Browse files Browse the repository at this point in the history
… ddl (#9607) (#9613)

close #9605
  • Loading branch information
ti-chi-bot authored Nov 20, 2023
1 parent e0924d4 commit 7396a87
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
26 changes: 22 additions & 4 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,23 @@ type Syncer struct {
// `lower_case_table_names` setting of upstream db
SourceTableNamesFlavor utils.LowerCaseTableNamesFlavor

tsOffset atomic.Int64 // time offset between upstream and syncer, DM's timestamp - MySQL's timestamp
secondsBehindMaster atomic.Int64 // current task delay second behind upstream
workerJobTSArray []*atomic.Int64 // worker's sync job TS array, note that idx=0 is skip idx and idx=1 is ddl idx,sql worker job idx=(queue id + 2)
// time difference between upstream and DM nodes: time of DM - time of upstream.
// we use this to calculate replication lag more accurately when clock is not synced
// on either upstream or DM nodes.
tsOffset atomic.Int64
// this field measures the time difference in seconds between current time of DM and
// the minimal timestamp of currently processing binlog events.
// this lag will consider time difference between upstream and DM nodes
secondsBehindMaster atomic.Int64
// stores the last job TS(binlog event timestamp) of each worker,
// if there's no active job, the corresponding worker's TS is reset to 0.
// since DML worker runs jobs in batch, the TS is the TS of the first job in the batch.
// We account for skipped events too, if the distance between up and downstream is long,
// and there's no interested binlog event in between, we can have a decreasing lag.
// - 0 is for skip jobs
// - 1 is ddl worker
// - 2+ is for DML worker job idx=(queue id + 2)
workerJobTSArray []*atomic.Int64
lastCheckpointFlushedTime time.Time

firstMeetBinlogTS *int64
Expand Down Expand Up @@ -615,6 +629,10 @@ func (s *Syncer) reset() {
if s.streamerController != nil {
s.streamerController.Close()
}
s.secondsBehindMaster.Store(0)
for _, jobTS := range s.workerJobTSArray {
jobTS.Store(0)
}
// create new job chans
s.newJobChans()
s.checkpoint.DiscardPendingSnapshots()
Expand Down Expand Up @@ -878,7 +896,7 @@ func (s *Syncer) calcReplicationLag(headerTS int64) int64 {

// updateReplicationJobTS store job TS, it is called after every batch dml job / one skip job / one ddl job is added and committed.
func (s *Syncer) updateReplicationJobTS(job *job, jobIdx int) {
// when job is nil mean no job in this bucket, need do reset this bucket job ts to 0
// when job is nil mean no job in this bucket, need to reset this bucket job ts to 0
if job == nil {
s.workerJobTSArray[jobIdx].Store(0)
} else {
Expand Down
4 changes: 4 additions & 0 deletions dm/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,11 @@ func (s *testSyncerSuite) TestRun(c *C) {
ctx, cancel = context.WithCancel(context.Background())
resultCh = make(chan pb.ProcessResult)
// simulate `syncer.Resume` here, but doesn't reset database conns
syncer.secondsBehindMaster.Store(100)
syncer.workerJobTSArray[ddlJobIdx].Store(100)
syncer.reset()
c.Assert(syncer.secondsBehindMaster.Load(), Equals, int64(0))
c.Assert(syncer.workerJobTSArray[ddlJobIdx].Load(), Equals, int64(0))
mockStreamerProducer = &MockStreamProducer{s.generateEvents(events2, c)}
mockStreamer, err = mockStreamerProducer.GenerateStreamFrom(binlog.MustZeroLocation(mysql.MySQLFlavor))
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func (st *SubTask) Kill() {
st.validator = nil
}

// Pause pauses a running sub task or a sub task paused by error.
// Pause pauses a running subtask or a subtask paused by error.
func (st *SubTask) Pause() error {
if st.markResultCanceled() {
return nil
Expand Down

0 comments on commit 7396a87

Please sign in to comment.