From 928c414491464bc6d6bb0939613d0c18fe6df666 Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Wed, 6 Dec 2023 11:08:24 +0800 Subject: [PATCH] This is an automated cherry-pick of #10258 Signed-off-by: ti-chi-bot --- cdc/puller/puller.go | 55 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index f2d014b426a..6f824f2ab2d 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -71,6 +71,15 @@ type pullerImpl struct { changefeed model.ChangeFeedID tableID model.TableID tableName string +<<<<<<< HEAD +======= + + cfg *config.ServerConfig + lastForwardTime time.Time + lastForwardResolvedTs uint64 + // startResolvedTs is the resolvedTs when puller is initialized + startResolvedTs uint64 +>>>>>>> e16c52d843 (puller(ticdc): fix stuck detect issue (#10258)) } // New create a new Puller fetch event start from checkpointTs and put into buf. @@ -110,6 +119,12 @@ func New(ctx context.Context, changefeed: changefeed, tableID: tableID, tableName: tableName, +<<<<<<< HEAD +======= + cfg: cfg, + + startResolvedTs: checkpointTs, +>>>>>>> e16c52d843 (puller(ticdc): fix stuck detect issue (#10258)) } return p } @@ -176,6 +191,14 @@ func (p *pullerImpl) Run(ctx context.Context) error { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) +<<<<<<< HEAD +======= + case <-stuckDetectorTicker.C: + if err := p.detectResolvedTsStuck(); err != nil { + return errors.Trace(err) + } + continue +>>>>>>> e16c52d843 (puller(ticdc): fix stuck detect issue (#10258)) case e = <-eventCh: } @@ -235,6 +258,38 @@ func (p *pullerImpl) Run(ctx context.Context) error { return g.Wait() } +<<<<<<< HEAD +======= +func (p *pullerImpl) detectResolvedTsStuck() error { + if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection { + resolvedTs := p.tsTracker.Frontier() + // check if the resolvedTs is advancing, + // If the resolvedTs in Frontier is less than startResolvedTs, it means that the incremental scan has + // not complete yet. We need to make no decision in this scenario. + if resolvedTs <= p.startResolvedTs { + return nil + } + if resolvedTs == p.lastForwardResolvedTs { + log.Warn("ResolvedTs stuck detected in puller", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.Int64("tableID", p.tableID), + zap.String("tableName", p.tableName), + zap.Uint64("lastResolvedTs", p.lastForwardResolvedTs), + zap.Uint64("resolvedTs", resolvedTs)) + if time.Since(p.lastForwardTime) > time.Duration(p.cfg.Debug.Puller.ResolvedTsStuckInterval) { + // throw an error to cause changefeed restart + return errors.New("resolved ts stuck") + } + } else { + p.lastForwardTime = time.Now() + p.lastForwardResolvedTs = resolvedTs + } + } + return nil +} + +>>>>>>> e16c52d843 (puller(ticdc): fix stuck detect issue (#10258)) func (p *pullerImpl) Output() <-chan *model.RawKVEntry { return p.outputCh }