From 4d17995811868b6f1a668cfc95259e940a4a4384 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 7 Dec 2023 15:01:20 +0800 Subject: [PATCH] puller(ticdc): fix stuck detect issue (#10258) (#10261) close pingcap/tiflow#10256 --- cdc/puller/puller.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index f635ecc5577..3d53966fc42 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -75,6 +75,8 @@ type pullerImpl struct { cfg *config.ServerConfig lastForwardTime time.Time lastForwardResolvedTs uint64 + // startResolvedTs is the resolvedTs when puller is initialized + startResolvedTs uint64 } // New create a new Puller fetch event start from checkpointTs and put into buf. @@ -115,6 +117,8 @@ func New(ctx context.Context, tableID: tableID, tableName: tableName, cfg: cfg, + + startResolvedTs: checkpointTs, } return p } @@ -182,7 +186,7 @@ func (p *pullerImpl) Run(ctx context.Context) error { case <-ctx.Done(): return errors.Trace(ctx.Err()) case <-stuckDetectorTicker.C: - if err := p.detectResolvedTsStuck(initialized); err != nil { + if err := p.detectResolvedTsStuck(); err != nil { return errors.Trace(err) } continue @@ -245,9 +249,15 @@ func (p *pullerImpl) Run(ctx context.Context) error { return g.Wait() } -func (p *pullerImpl) detectResolvedTsStuck(initialized bool) error { - if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection && initialized { +func (p *pullerImpl) detectResolvedTsStuck() error { + if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection { resolvedTs := p.tsTracker.Frontier() + // check if the resolvedTs is advancing, + // If the resolvedTs in Frontier is less than startResolvedTs, it means that the incremental scan has + // not complete yet. We need to make no decision in this scenario. + if resolvedTs <= p.startResolvedTs { + return nil + } if resolvedTs == p.lastForwardResolvedTs { log.Warn("ResolvedTs stuck detected in puller", zap.String("namespace", p.changefeed.Namespace),