Skip to content

Commit

Permalink
puller(ticdc): detect resolved ts stuck in puller (#10182) (#10220)
Browse files Browse the repository at this point in the history
close #10181
  • Loading branch information
ti-chi-bot authored Dec 1, 2023
1 parent da0e717 commit a940814
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 1 deletion.
34 changes: 34 additions & 0 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type pullerImpl struct {
changefeed model.ChangeFeedID
tableID model.TableID
tableName string

cfg *config.ServerConfig
lastForwardTime time.Time
lastForwardResolvedTs uint64
}

// New create a new Puller fetch event start from checkpointTs and put into buf.
Expand Down Expand Up @@ -121,6 +125,7 @@ func New(ctx context.Context,
changefeed: changefeed,
tableID: tableID,
tableName: tableName,
cfg: cfg,
}
return p
}
Expand Down Expand Up @@ -165,6 +170,8 @@ func (p *pullerImpl) Run(ctx context.Context) error {
g.Go(func() error {
metricsTicker := time.NewTicker(15 * time.Second)
defer metricsTicker.Stop()
stuckDetectorTicker := time.NewTicker(1 * time.Minute)
defer stuckDetectorTicker.Stop()
output := func(raw *model.RawKVEntry) error {
// even after https://github.com/pingcap/tiflow/pull/2038, kv client
// could still miss region change notification, which leads to resolved
Expand Down Expand Up @@ -205,6 +212,10 @@ func (p *pullerImpl) Run(ctx context.Context) error {
metricEventChanSize.Observe(float64(len(eventCh)))
metricOutputChanSize.Observe(float64(len(p.outputCh)))
metricPullerResolvedTs.Set(float64(oracle.ExtractPhysical(atomic.LoadUint64(&p.resolvedTs))))
case <-stuckDetectorTicker.C:
if err := p.detectResolvedTsStuck(initialized); err != nil {
return errors.Trace(err)
}
continue
case e = <-eventCh:
}
Expand Down Expand Up @@ -269,6 +280,29 @@ func (p *pullerImpl) GetResolvedTs() uint64 {
return atomic.LoadUint64(&p.resolvedTs)
}

func (p *pullerImpl) detectResolvedTsStuck(initialized bool) error {
if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection && initialized {
resolvedTs := p.tsTracker.Frontier()
if resolvedTs == p.lastForwardResolvedTs {
log.Warn("ResolvedTs stuck detected in puller",
zap.String("namespace", p.changefeed.Namespace),
zap.String("changefeed", p.changefeed.ID),
zap.Int64("tableID", p.tableID),
zap.String("tableName", p.tableName),
zap.Uint64("lastResolvedTs", p.lastForwardResolvedTs),
zap.Uint64("resolvedTs", resolvedTs))
if time.Since(p.lastForwardTime) > time.Duration(p.cfg.Debug.Puller.ResolvedTsStuckInterval) {
// throw an error to cause changefeed restart
return errors.New("resolved ts stuck")
}
} else {
p.lastForwardTime = time.Now()
p.lastForwardResolvedTs = resolvedTs
}
}
return nil
}

func (p *pullerImpl) Output() <-chan *model.RawKVEntry {
return p.outputCh
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ func TestParseCfg(t *testing.T) {
AddTableBatchSize: 50,
},
EnableNewSink: true,
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -374,6 +378,10 @@ check-balance-interval = "10s"
AddTableBatchSize: 50,
},
EnableNewSink: true,
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -520,6 +528,10 @@ cert-allowed-cn = ["dd","ee"]
AddTableBatchSize: 50,
},
EnableNewSink: true,
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -590,5 +602,9 @@ unknown3 = 3
AddTableBatchSize: 50,
},
EnableNewSink: true,
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
}, o.serverConfig.Debug)
}
6 changes: 5 additions & 1 deletion pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,11 @@ const (
"add-table-batch-size": 50
},
"enable-new-sink": true,
"enable-kv-connect-backoff": false
"enable-kv-connect-backoff": false,
"puller": {
"enable-resolved-ts-stuck-detection": false,
"resolved-ts-stuck-interval": 300000000000
}
},
"cluster-id": "default",
"max-memory-percentage": 0,
Expand Down
11 changes: 11 additions & 0 deletions pkg/config/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type DebugConfig struct {

// EnableKVConnectBackOff enables the backoff for kv connect.
EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"`

// Puller is the configuration of the puller.
Puller *PullerConfig `toml:"puller" json:"puller"`
}

// ValidateAndAdjust validates and adjusts the debug configuration
Expand Down Expand Up @@ -83,3 +86,11 @@ func (c *DebugConfig) ValidateAndAdjust() error {
func (c *DebugConfig) IsPullBasedSinkEnabled() bool {
return c.EnablePullBasedSink && c.EnableDBSorter && c.EnableNewSink
}

// PullerConfig represents config for puller
type PullerConfig struct {
// EnableResolvedTsStuckDetection is used to enable resolved ts stuck detection.
EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable-resolved-ts-stuck-detection"`
// ResolvedTsStuckInterval is the interval of checking resolved ts stuck.
ResolvedTsStuckInterval TomlDuration `toml:"resolved-ts-stuck-interval" json:"resolved-ts-stuck-interval"`
}
4 changes: 4 additions & 0 deletions pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ var defaultServerConfig = &ServerConfig{
EnableNewSink: true,
EnablePullBasedSink: true,
EnableKVConnectBackOff: false,
Puller: &PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
GcTunerMemoryThreshold: DisableMemoryLimit,
Expand Down

0 comments on commit a940814

Please sign in to comment.