From a2f0d639e2024f0cb8802b8afbef94fac2d96cc8 Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Wed, 29 Nov 2023 16:25:37 +0800 Subject: [PATCH 1/3] detect resolved ts stuck in puller --- cdc/puller/puller.go | 37 ++++++++++++++++++++++++++++++++-- pkg/config/config_test_data.go | 4 ++++ pkg/config/debug.go | 10 +++++++++ pkg/config/server_config.go | 4 ++++ 4 files changed, 53 insertions(+), 2 deletions(-) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index f2d014b426a..f635ecc5577 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -71,6 +71,10 @@ type pullerImpl struct { changefeed model.ChangeFeedID tableID model.TableID tableName string + + cfg *config.ServerConfig + lastForwardTime time.Time + lastForwardResolvedTs uint64 } // New create a new Puller fetch event start from checkpointTs and put into buf. @@ -110,6 +114,7 @@ func New(ctx context.Context, changefeed: changefeed, tableID: tableID, tableName: tableName, + cfg: cfg, } return p } @@ -138,8 +143,8 @@ func (p *pullerImpl) Run(ctx context.Context) error { lastResolvedTs := p.checkpointTs g.Go(func() error { - metricsTicker := time.NewTicker(15 * time.Second) - defer metricsTicker.Stop() + stuckDetectorTicker := time.NewTicker(1 * time.Minute) + defer stuckDetectorTicker.Stop() output := func(raw *model.RawKVEntry) error { // even after https://github.com/pingcap/tiflow/pull/2038, kv client // could still miss region change notification, which leads to resolved @@ -176,6 +181,11 @@ func (p *pullerImpl) Run(ctx context.Context) error { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) + case <-stuckDetectorTicker.C: + if err := p.detectResolvedTsStuck(initialized); err != nil { + return errors.Trace(err) + } + continue case e = <-eventCh: } @@ -235,6 +245,29 @@ func (p *pullerImpl) Run(ctx context.Context) error { return g.Wait() } +func (p *pullerImpl) detectResolvedTsStuck(initialized bool) error { + if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection && initialized { + resolvedTs := p.tsTracker.Frontier() + if resolvedTs == p.lastForwardResolvedTs { + log.Warn("ResolvedTs stuck detected in puller", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.Int64("tableID", p.tableID), + zap.String("tableName", p.tableName), + zap.Uint64("lastResolvedTs", p.lastForwardResolvedTs), + zap.Uint64("resolvedTs", resolvedTs)) + if time.Since(p.lastForwardTime) > time.Duration(p.cfg.Debug.Puller.ResolvedTsStuckInterval) { + // throw an error to cause changefeed restart + return errors.New("resolved ts stuck") + } + } else { + p.lastForwardTime = time.Now() + p.lastForwardResolvedTs = resolvedTs + } + } + return nil +} + func (p *pullerImpl) Output() <-chan *model.RawKVEntry { return p.outputCh } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 5ae46d827aa..2487fc08831 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -171,6 +171,10 @@ const ( "ssl-cert": "", "ssl-key": "" } + }, + "puller": { + "enable-resolved-ts-stuck-detection": false, + "resolved-ts-stuck-interval": 300000000000 } }, "cluster-id": "default", diff --git a/pkg/config/debug.go b/pkg/config/debug.go index d5c24cd10d2..4c328a763e4 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -31,6 +31,9 @@ type DebugConfig struct { // CDCV2 enables ticdc version 2 implementation with new metastore CDCV2 *CDCV2 `toml:"cdc-v2" json:"cdc-v2"` + + // Puller is the configuration of the puller. + Puller *PullerConfig `toml:"puller" json:"puller"` } // ValidateAndAdjust validates and adjusts the debug configuration @@ -50,3 +53,10 @@ func (c *DebugConfig) ValidateAndAdjust() error { return nil } + +type PullerConfig struct { + // RegionScanLimit is the limit of regions to scan concurrently. + EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable-resolved-ts-stuck-detection"` + // ResolvedTsStuckInterval is the interval of checking resolved ts stuck. + ResolvedTsStuckInterval TomlDuration `toml:"resolved-ts-stuck-interval" json:"resolved-ts-stuck-interval"` +} diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index bac3c1384be..ca15de02917 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -135,6 +135,10 @@ var defaultServerConfig = &ServerConfig{ Scheduler: NewDefaultSchedulerConfig(), EnableKVConnectBackOff: false, CDCV2: &CDCV2{Enable: false}, + Puller: &PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", GcTunerMemoryThreshold: DisableMemoryLimit, From 578394f00693b4f6866f54b9fae07639a00a7d96 Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Wed, 29 Nov 2023 16:48:13 +0800 Subject: [PATCH 2/3] detect resolved ts stuck in puller --- pkg/config/debug.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 4c328a763e4..a74d8235ccd 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -54,8 +54,9 @@ func (c *DebugConfig) ValidateAndAdjust() error { return nil } +// PullerConfig represents config for puller type PullerConfig struct { - // RegionScanLimit is the limit of regions to scan concurrently. + // EnableResolvedTsStuckDetection is used to enable resolved ts stuck detection. EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable-resolved-ts-stuck-detection"` // ResolvedTsStuckInterval is the interval of checking resolved ts stuck. ResolvedTsStuckInterval TomlDuration `toml:"resolved-ts-stuck-interval" json:"resolved-ts-stuck-interval"` From e87afdde250fdd604c1aaf5b332cfcaf08c39a10 Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Wed, 29 Nov 2023 18:06:16 +0800 Subject: [PATCH 3/3] fix ut --- pkg/cmd/server/server_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 64413bb4a9b..45708a80b14 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -203,6 +203,10 @@ func TestParseCfg(t *testing.T) { Enable: false, MetaStoreConfig: config.MetaStoreConfiguration{}, }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -346,6 +350,10 @@ check-balance-interval = "10s" Enable: false, MetaStoreConfig: config.MetaStoreConfiguration{}, }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -481,6 +489,10 @@ cert-allowed-cn = ["dd","ee"] Enable: false, MetaStoreConfig: config.MetaStoreConfiguration{}, }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -543,5 +555,9 @@ unknown3 = 3 Enable: false, MetaStoreConfig: config.MetaStoreConfiguration{}, }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, o.serverConfig.Debug) }