diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index f2d014b426a..f635ecc5577 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -71,6 +71,10 @@ type pullerImpl struct { changefeed model.ChangeFeedID tableID model.TableID tableName string + + cfg *config.ServerConfig + lastForwardTime time.Time + lastForwardResolvedTs uint64 } // New create a new Puller fetch event start from checkpointTs and put into buf. @@ -110,6 +114,7 @@ func New(ctx context.Context, changefeed: changefeed, tableID: tableID, tableName: tableName, + cfg: cfg, } return p } @@ -138,8 +143,8 @@ func (p *pullerImpl) Run(ctx context.Context) error { lastResolvedTs := p.checkpointTs g.Go(func() error { - metricsTicker := time.NewTicker(15 * time.Second) - defer metricsTicker.Stop() + stuckDetectorTicker := time.NewTicker(1 * time.Minute) + defer stuckDetectorTicker.Stop() output := func(raw *model.RawKVEntry) error { // even after https://github.com/pingcap/tiflow/pull/2038, kv client // could still miss region change notification, which leads to resolved @@ -176,6 +181,11 @@ func (p *pullerImpl) Run(ctx context.Context) error { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) + case <-stuckDetectorTicker.C: + if err := p.detectResolvedTsStuck(initialized); err != nil { + return errors.Trace(err) + } + continue case e = <-eventCh: } @@ -235,6 +245,29 @@ func (p *pullerImpl) Run(ctx context.Context) error { return g.Wait() } +func (p *pullerImpl) detectResolvedTsStuck(initialized bool) error { + if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection && initialized { + resolvedTs := p.tsTracker.Frontier() + if resolvedTs == p.lastForwardResolvedTs { + log.Warn("ResolvedTs stuck detected in puller", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.Int64("tableID", p.tableID), + zap.String("tableName", p.tableName), + zap.Uint64("lastResolvedTs", p.lastForwardResolvedTs), + zap.Uint64("resolvedTs", resolvedTs)) + if time.Since(p.lastForwardTime) > time.Duration(p.cfg.Debug.Puller.ResolvedTsStuckInterval) { + // throw an error to cause changefeed restart + return errors.New("resolved ts stuck") + } + } else { + p.lastForwardTime = time.Now() + p.lastForwardResolvedTs = resolvedTs + } + } + return nil +} + func (p *pullerImpl) Output() <-chan *model.RawKVEntry { return p.outputCh } diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 7e67a43cf0c..9716589678e 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -199,6 +199,10 @@ func TestParseCfg(t *testing.T) { CheckBalanceInterval: 60000000000, AddTableBatchSize: 50, }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -338,6 +342,10 @@ check-balance-interval = "10s" CheckBalanceInterval: config.TomlDuration(10 * time.Second), AddTableBatchSize: 50, }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -469,6 +477,10 @@ cert-allowed-cn = ["dd","ee"] CheckBalanceInterval: 60000000000, AddTableBatchSize: 50, }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -527,5 +539,9 @@ unknown3 = 3 CheckBalanceInterval: 60000000000, AddTableBatchSize: 50, }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, }, o.serverConfig.Debug) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index b1c3aa6286b..e3b588087bd 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -161,7 +161,11 @@ const ( "check-balance-interval": 60000000000, "add-table-batch-size": 50 }, - "enable-kv-connect-backoff": false + "enable-kv-connect-backoff": false, + "puller": { + "enable-resolved-ts-stuck-detection": false, + "resolved-ts-stuck-interval": 300000000000 + } }, "cluster-id": "default", "max-memory-percentage": 0, diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 3a7815090bc..59528b886a1 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -28,6 +28,9 @@ type DebugConfig struct { // EnableKVConnectBackOff enables the backoff for kv connect. EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"` + + // Puller is the configuration of the puller. + Puller *PullerConfig `toml:"puller" json:"puller"` } // ValidateAndAdjust validates and adjusts the debug configuration @@ -44,3 +47,11 @@ func (c *DebugConfig) ValidateAndAdjust() error { return nil } + +// PullerConfig represents config for puller +type PullerConfig struct { + // EnableResolvedTsStuckDetection is used to enable resolved ts stuck detection. + EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable-resolved-ts-stuck-detection"` + // ResolvedTsStuckInterval is the interval of checking resolved ts stuck. + ResolvedTsStuckInterval TomlDuration `toml:"resolved-ts-stuck-interval" json:"resolved-ts-stuck-interval"` +} diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index eda9f95c8e5..c77b6e17ad5 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -134,6 +134,10 @@ var defaultServerConfig = &ServerConfig{ Scheduler: NewDefaultSchedulerConfig(), EnableKVConnectBackOff: false, + Puller: &PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: TomlDuration(5 * time.Minute), + }, }, ClusterID: "default", GcTunerMemoryThreshold: DisableMemoryLimit,