From 1fde95e7947579577030e26307af2c00e33fb933 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 4 Dec 2023 22:47:49 +0800 Subject: [PATCH] puller(ticdc): detect resolved ts stuck in puller (#10182) (#10219) close pingcap/tiflow#10181 --- cdc/puller/puller.go | 37 ++++++++++++++++++++++++++++++++-- pkg/cmd/server/server_test.go | 16 +++++++++++++++ pkg/config/config_test_data.go | 6 +++++- pkg/config/debug.go | 11 ++++++++++ pkg/config/server_config.go | 4 ++++ 5 files changed, 71 insertions(+), 3 deletions(-) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 7853275a287..313daf06b82 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -72,6 +72,10 @@ type pullerImpl struct { changefeed model.ChangeFeedID tableID model.TableID tableName string + + cfg *config.ServerConfig + lastForwardTime time.Time + lastForwardResolvedTs uint64 } // New create a new Puller fetch event start from checkpointTs and put into buf. @@ -112,6 +116,7 @@ func New(ctx context.Context, changefeed: changefeed, tableID: tableID, tableName: tableName, + cfg: cfg, } return p } @@ -140,8 +145,8 @@ func (p *pullerImpl) Run(ctx context.Context) error { lastResolvedTs := p.checkpointTs g.Go(func() error { - metricsTicker := time.NewTicker(15 * time.Second) - defer metricsTicker.Stop() + stuckDetectorTicker := time.NewTicker(1 * time.Minute) + defer stuckDetectorTicker.Stop() output := func(raw *model.RawKVEntry) error { // even after https://github.com/pingcap/tiflow/pull/2038, kv client // could still miss region change notification, which leads to resolved @@ -178,6 +183,11 @@ func (p *pullerImpl) Run(ctx context.Context) error { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) + case <-stuckDetectorTicker.C: + if err := p.detectResolvedTsStuck(initialized); err != nil { + return errors.Trace(err) + } + continue case e = <-eventCh: } @@ -237,6 +247,29 @@ func (p *pullerImpl) Run(ctx context.Context) error { return g.Wait() } +func (p *pullerImpl) detectResolvedTsStuck(initialized bool) error { + if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection && initialized { + resolvedTs := p.tsTracker.Frontier() + if resolvedTs == p.lastForwardResolvedTs { + log.Warn("ResolvedTs stuck detected in puller", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.Int64("tableID", p.tableID), + zap.String("tableName", p.tableName), + zap.Uint64("lastResolvedTs", p.lastForwardResolvedTs), + zap.Uint64("resolvedTs", resolvedTs)) + if time.Since(p.lastForwardTime) > time.Duration(p.cfg.Debug.Puller.ResolvedTsStuckInterval) { + // throw an error to cause changefeed restart + return errors.New("resolved ts stuck") + } + } else { + p.lastForwardTime = time.Now() + p.lastForwardResolvedTs = resolvedTs + } + } + return nil +} + func (p *pullerImpl) Output() <-chan *model.RawKVEntry { return p.outputCh } diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 843825a4344..6cfee19b5a9 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -201,6 +201,10 @@ func TestParseCfg(t *testing.T) { CheckBalanceInterval: 60000000000, AddTableBatchSize: 50, }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -348,6 +352,10 @@ check-balance-interval = "10s" CheckBalanceInterval: config.TomlDuration(10 * time.Second), AddTableBatchSize: 50, }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -483,6 +491,10 @@ cert-allowed-cn = ["dd","ee"] CheckBalanceInterval: 60000000000, AddTableBatchSize: 50, }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -546,5 +558,9 @@ unknown3 = 3 CheckBalanceInterval: 60000000000, AddTableBatchSize: 50, }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, o.serverConfig.Debug) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 5a44aac5aac..8e13ef911c1 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -149,7 +149,11 @@ const ( "check-balance-interval": 60000000000, "add-table-batch-size": 50 }, - "enable-kv-connect-backoff": false + "enable-kv-connect-backoff": false, + "puller": { + "enable-resolved-ts-stuck-detection": false, + "resolved-ts-stuck-interval": 300000000000 + } }, "cluster-id": "default", "max-memory-percentage": 0, diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 3a7815090bc..59528b886a1 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -28,6 +28,9 @@ type DebugConfig struct { // EnableKVConnectBackOff enables the backoff for kv connect. EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"` + + // Puller is the configuration of the puller. + Puller *PullerConfig `toml:"puller" json:"puller"` } // ValidateAndAdjust validates and adjusts the debug configuration @@ -44,3 +47,11 @@ func (c *DebugConfig) ValidateAndAdjust() error { return nil } + +// PullerConfig represents config for puller +type PullerConfig struct { + // EnableResolvedTsStuckDetection is used to enable resolved ts stuck detection. + EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable-resolved-ts-stuck-detection"` + // ResolvedTsStuckInterval is the interval of checking resolved ts stuck. + ResolvedTsStuckInterval TomlDuration `toml:"resolved-ts-stuck-interval" json:"resolved-ts-stuck-interval"` +} diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 5df56c4a1f1..27f40104544 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -140,6 +140,10 @@ var defaultServerConfig = &ServerConfig{ Scheduler: NewDefaultSchedulerConfig(), EnableKVConnectBackOff: false, + Puller: &PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", GcTunerMemoryThreshold: DisableMemoryLimit,