From a940814771eabfbb42ce323a9a1d51234420ad76 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 1 Dec 2023 17:50:50 +0800 Subject: [PATCH] puller(ticdc): detect resolved ts stuck in puller (#10182) (#10220) close pingcap/tiflow#10181 --- cdc/puller/puller.go | 34 ++++++++++++++++++++++++++++++++++ pkg/cmd/server/server_test.go | 16 ++++++++++++++++ pkg/config/config_test_data.go | 6 +++++- pkg/config/debug.go | 11 +++++++++++ pkg/config/server_config.go | 4 ++++ 5 files changed, 70 insertions(+), 1 deletion(-) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 30cdbda9769..10fbbfd3d7c 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -73,6 +73,10 @@ type pullerImpl struct { changefeed model.ChangeFeedID tableID model.TableID tableName string + + cfg *config.ServerConfig + lastForwardTime time.Time + lastForwardResolvedTs uint64 } // New create a new Puller fetch event start from checkpointTs and put into buf. @@ -121,6 +125,7 @@ func New(ctx context.Context, changefeed: changefeed, tableID: tableID, tableName: tableName, + cfg: cfg, } return p } @@ -165,6 +170,8 @@ func (p *pullerImpl) Run(ctx context.Context) error { g.Go(func() error { metricsTicker := time.NewTicker(15 * time.Second) defer metricsTicker.Stop() + stuckDetectorTicker := time.NewTicker(1 * time.Minute) + defer stuckDetectorTicker.Stop() output := func(raw *model.RawKVEntry) error { // even after https://github.com/pingcap/tiflow/pull/2038, kv client // could still miss region change notification, which leads to resolved @@ -205,6 +212,10 @@ func (p *pullerImpl) Run(ctx context.Context) error { metricEventChanSize.Observe(float64(len(eventCh))) metricOutputChanSize.Observe(float64(len(p.outputCh))) metricPullerResolvedTs.Set(float64(oracle.ExtractPhysical(atomic.LoadUint64(&p.resolvedTs)))) + case <-stuckDetectorTicker.C: + if err := p.detectResolvedTsStuck(initialized); err != nil { + return errors.Trace(err) + } continue case e = <-eventCh: } @@ -269,6 +280,29 @@ func (p *pullerImpl) GetResolvedTs() uint64 { return atomic.LoadUint64(&p.resolvedTs) } +func (p *pullerImpl) detectResolvedTsStuck(initialized bool) error { + if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection && initialized { + resolvedTs := p.tsTracker.Frontier() + if resolvedTs == p.lastForwardResolvedTs { + log.Warn("ResolvedTs stuck detected in puller", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.Int64("tableID", p.tableID), + zap.String("tableName", p.tableName), + zap.Uint64("lastResolvedTs", p.lastForwardResolvedTs), + zap.Uint64("resolvedTs", resolvedTs)) + if time.Since(p.lastForwardTime) > time.Duration(p.cfg.Debug.Puller.ResolvedTsStuckInterval) { + // throw an error to cause changefeed restart + return errors.New("resolved ts stuck") + } + } else { + p.lastForwardTime = time.Now() + p.lastForwardResolvedTs = resolvedTs + } + } + return nil +} + func (p *pullerImpl) Output() <-chan *model.RawKVEntry { return p.outputCh } diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 3eaa97aaf0b..4b3feaf2e15 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -214,6 +214,10 @@ func TestParseCfg(t *testing.T) { AddTableBatchSize: 50, }, EnableNewSink: true, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -374,6 +378,10 @@ check-balance-interval = "10s" AddTableBatchSize: 50, }, EnableNewSink: true, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -520,6 +528,10 @@ cert-allowed-cn = ["dd","ee"] AddTableBatchSize: 50, }, EnableNewSink: true, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -590,5 +602,9 @@ unknown3 = 3 AddTableBatchSize: 50, }, EnableNewSink: true, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, o.serverConfig.Debug) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index a4fc933eb3a..2d388c5d5e2 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -153,7 +153,11 @@ const ( "add-table-batch-size": 50 }, "enable-new-sink": true, - "enable-kv-connect-backoff": false + "enable-kv-connect-backoff": false, + "puller": { + "enable-resolved-ts-stuck-detection": false, + "resolved-ts-stuck-interval": 300000000000 + } }, "cluster-id": "default", "max-memory-percentage": 0, diff --git a/pkg/config/debug.go b/pkg/config/debug.go index e1a76f18230..d98d0e9be61 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -51,6 +51,9 @@ type DebugConfig struct { // EnableKVConnectBackOff enables the backoff for kv connect. EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"` + + // Puller is the configuration of the puller. + Puller *PullerConfig `toml:"puller" json:"puller"` } // ValidateAndAdjust validates and adjusts the debug configuration @@ -83,3 +86,11 @@ func (c *DebugConfig) ValidateAndAdjust() error { func (c *DebugConfig) IsPullBasedSinkEnabled() bool { return c.EnablePullBasedSink && c.EnableDBSorter && c.EnableNewSink } + +// PullerConfig represents config for puller +type PullerConfig struct { + // EnableResolvedTsStuckDetection is used to enable resolved ts stuck detection. + EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable-resolved-ts-stuck-detection"` + // ResolvedTsStuckInterval is the interval of checking resolved ts stuck. + ResolvedTsStuckInterval TomlDuration `toml:"resolved-ts-stuck-interval" json:"resolved-ts-stuck-interval"` +} diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 7c3c6ff7247..9ae645bf1c0 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -159,6 +159,10 @@ var defaultServerConfig = &ServerConfig{ EnableNewSink: true, EnablePullBasedSink: true, EnableKVConnectBackOff: false, + Puller: &PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", GcTunerMemoryThreshold: DisableMemoryLimit,