Skip to content

Commit

Permalink
This is an automated cherry-pick of #10258
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
sdojjy authored and ti-chi-bot committed Dec 6, 2023
1 parent 0050e31 commit 928c414
Showing 1 changed file with 55 additions and 0 deletions.
55 changes: 55 additions & 0 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ type pullerImpl struct {
changefeed model.ChangeFeedID
tableID model.TableID
tableName string
<<<<<<< HEAD

Check failure on line 74 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expected field name or embedded type

Check failure on line 74 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expected field name or embedded type
=======

Check failure on line 75 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ==, expected field name or embedded type

Check failure on line 75 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ==, expected field name or embedded type

cfg *config.ServerConfig
lastForwardTime time.Time
lastForwardResolvedTs uint64
// startResolvedTs is the resolvedTs when puller is initialized
startResolvedTs uint64
>>>>>>> e16c52d843 (puller(ticdc): fix stuck detect issue (#10258))

Check failure on line 82 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected >>, expected field name or embedded type

Check failure on line 82 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'

Check failure on line 82 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected >>, expected field name or embedded type

Check failure on line 82 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'
}

// New create a new Puller fetch event start from checkpointTs and put into buf.
Expand Down Expand Up @@ -110,6 +119,12 @@ func New(ctx context.Context,
changefeed: changefeed,
tableID: tableID,
tableName: tableName,
<<<<<<< HEAD

Check failure on line 122 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expected expression

Check failure on line 122 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expected expression
=======
cfg: cfg,

startResolvedTs: checkpointTs,
>>>>>>> e16c52d843 (puller(ticdc): fix stuck detect issue (#10258))

Check failure on line 127 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ) in composite literal; possibly missing comma or }

Check failure on line 127 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'

Check failure on line 127 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ) in composite literal; possibly missing comma or }

Check failure on line 127 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'
}
return p

Check failure on line 129 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: non-declaration statement outside function body

Check failure on line 129 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: non-declaration statement outside function body
}
Expand Down Expand Up @@ -176,6 +191,14 @@ func (p *pullerImpl) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
<<<<<<< HEAD

Check failure on line 194 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expected case or default or }

Check failure on line 194 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expected case or default or }
=======
case <-stuckDetectorTicker.C:

Check failure on line 196 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected case, expected :

Check failure on line 196 in cdc/puller/puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected case, expected :
if err := p.detectResolvedTsStuck(); err != nil {
return errors.Trace(err)
}
continue
>>>>>>> e16c52d843 (puller(ticdc): fix stuck detect issue (#10258))
case e = <-eventCh:
}

Expand Down Expand Up @@ -235,6 +258,38 @@ func (p *pullerImpl) Run(ctx context.Context) error {
return g.Wait()
}

<<<<<<< HEAD
=======
func (p *pullerImpl) detectResolvedTsStuck() error {
if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection {
resolvedTs := p.tsTracker.Frontier()
// check if the resolvedTs is advancing,
// If the resolvedTs in Frontier is less than startResolvedTs, it means that the incremental scan has
// not complete yet. We need to make no decision in this scenario.
if resolvedTs <= p.startResolvedTs {
return nil
}
if resolvedTs == p.lastForwardResolvedTs {
log.Warn("ResolvedTs stuck detected in puller",
zap.String("namespace", p.changefeed.Namespace),
zap.String("changefeed", p.changefeed.ID),
zap.Int64("tableID", p.tableID),
zap.String("tableName", p.tableName),
zap.Uint64("lastResolvedTs", p.lastForwardResolvedTs),
zap.Uint64("resolvedTs", resolvedTs))
if time.Since(p.lastForwardTime) > time.Duration(p.cfg.Debug.Puller.ResolvedTsStuckInterval) {
// throw an error to cause changefeed restart
return errors.New("resolved ts stuck")
}
} else {
p.lastForwardTime = time.Now()
p.lastForwardResolvedTs = resolvedTs
}
}
return nil
}

>>>>>>> e16c52d843 (puller(ticdc): fix stuck detect issue (#10258))
func (p *pullerImpl) Output() <-chan *model.RawKVEntry {
return p.outputCh
}
Expand Down

0 comments on commit 928c414

Please sign in to comment.