diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 10fbbfd3d7c..c740b2ef952 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -77,6 +77,8 @@ type pullerImpl struct { cfg *config.ServerConfig lastForwardTime time.Time lastForwardResolvedTs uint64 + // startResolvedTs is the resolvedTs when puller is initialized + startResolvedTs uint64 } // New create a new Puller fetch event start from checkpointTs and put into buf. @@ -126,6 +128,8 @@ func New(ctx context.Context, tableID: tableID, tableName: tableName, cfg: cfg, + + startResolvedTs: checkpointTs, } return p } @@ -213,7 +217,7 @@ func (p *pullerImpl) Run(ctx context.Context) error { metricOutputChanSize.Observe(float64(len(p.outputCh))) metricPullerResolvedTs.Set(float64(oracle.ExtractPhysical(atomic.LoadUint64(&p.resolvedTs)))) case <-stuckDetectorTicker.C: - if err := p.detectResolvedTsStuck(initialized); err != nil { + if err := p.detectResolvedTsStuck(); err != nil { return errors.Trace(err) } continue @@ -280,9 +284,15 @@ func (p *pullerImpl) GetResolvedTs() uint64 { return atomic.LoadUint64(&p.resolvedTs) } -func (p *pullerImpl) detectResolvedTsStuck(initialized bool) error { - if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection && initialized { +func (p *pullerImpl) detectResolvedTsStuck() error { + if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection { resolvedTs := p.tsTracker.Frontier() + // check if the resolvedTs is advancing, + // If the resolvedTs in Frontier is less than startResolvedTs, it means that the incremental scan has + // not complete yet. We need to make no decision in this scenario. + if resolvedTs <= p.startResolvedTs { + return nil + } if resolvedTs == p.lastForwardResolvedTs { log.Warn("ResolvedTs stuck detected in puller", zap.String("namespace", p.changefeed.Namespace),