From 686a3426ca39f32efcb8a42a088f46730060483c Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Fri, 1 Dec 2023 12:34:18 +0800 Subject: [PATCH] This is an automated cherry-pick of #10182 Signed-off-by: ti-chi-bot --- cdc/puller/puller.go | 37 ++++++++++++++++++++++++++-- pkg/cmd/server/server_test.go | 44 ++++++++++++++++++++++++++++++++++ pkg/config/config_test_data.go | 17 +++++++++++++ pkg/config/debug.go | 17 +++++++++++++ pkg/config/server_config.go | 8 +++++++ 5 files changed, 121 insertions(+), 2 deletions(-) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index f2d014b426a..f635ecc5577 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -71,6 +71,10 @@ type pullerImpl struct { changefeed model.ChangeFeedID tableID model.TableID tableName string + + cfg *config.ServerConfig + lastForwardTime time.Time + lastForwardResolvedTs uint64 } // New create a new Puller fetch event start from checkpointTs and put into buf. @@ -110,6 +114,7 @@ func New(ctx context.Context, changefeed: changefeed, tableID: tableID, tableName: tableName, + cfg: cfg, } return p } @@ -138,8 +143,8 @@ func (p *pullerImpl) Run(ctx context.Context) error { lastResolvedTs := p.checkpointTs g.Go(func() error { - metricsTicker := time.NewTicker(15 * time.Second) - defer metricsTicker.Stop() + stuckDetectorTicker := time.NewTicker(1 * time.Minute) + defer stuckDetectorTicker.Stop() output := func(raw *model.RawKVEntry) error { // even after https://github.com/pingcap/tiflow/pull/2038, kv client // could still miss region change notification, which leads to resolved @@ -176,6 +181,11 @@ func (p *pullerImpl) Run(ctx context.Context) error { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) + case <-stuckDetectorTicker.C: + if err := p.detectResolvedTsStuck(initialized); err != nil { + return errors.Trace(err) + } + continue case e = <-eventCh: } @@ -235,6 +245,29 @@ func (p *pullerImpl) Run(ctx context.Context) error { return g.Wait() } +func (p *pullerImpl) detectResolvedTsStuck(initialized bool) error { + if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection && initialized { + resolvedTs := p.tsTracker.Frontier() + if resolvedTs == p.lastForwardResolvedTs { + log.Warn("ResolvedTs stuck detected in puller", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.Int64("tableID", p.tableID), + zap.String("tableName", p.tableName), + zap.Uint64("lastResolvedTs", p.lastForwardResolvedTs), + zap.Uint64("resolvedTs", resolvedTs)) + if time.Since(p.lastForwardTime) > time.Duration(p.cfg.Debug.Puller.ResolvedTsStuckInterval) { + // throw an error to cause changefeed restart + return errors.New("resolved ts stuck") + } + } else { + p.lastForwardTime = time.Now() + p.lastForwardResolvedTs = resolvedTs + } + } + return nil +} + func (p *pullerImpl) Output() <-chan *model.RawKVEntry { return p.outputCh } diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 7e67a43cf0c..d0befbc17e3 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -199,6 +199,17 @@ func TestParseCfg(t *testing.T) { CheckBalanceInterval: 60000000000, AddTableBatchSize: 50, }, +<<<<<<< HEAD +======= + CDCV2: &config.CDCV2{ + Enable: false, + MetaStoreConfig: config.MetaStoreConfiguration{}, + }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, +>>>>>>> 05e032835b (puller(ticdc): detect resolved ts stuck in puller (#10182)) }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -338,6 +349,17 @@ check-balance-interval = "10s" CheckBalanceInterval: config.TomlDuration(10 * time.Second), AddTableBatchSize: 50, }, +<<<<<<< HEAD +======= + CDCV2: &config.CDCV2{ + Enable: false, + MetaStoreConfig: config.MetaStoreConfiguration{}, + }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, +>>>>>>> 05e032835b (puller(ticdc): detect resolved ts stuck in puller (#10182)) }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -469,6 +491,17 @@ cert-allowed-cn = ["dd","ee"] CheckBalanceInterval: 60000000000, AddTableBatchSize: 50, }, +<<<<<<< HEAD +======= + CDCV2: &config.CDCV2{ + Enable: false, + MetaStoreConfig: config.MetaStoreConfiguration{}, + }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, +>>>>>>> 05e032835b (puller(ticdc): detect resolved ts stuck in puller (#10182)) }, ClusterID: "default", MaxMemoryPercentage: config.DisableMemoryLimit, @@ -527,5 +560,16 @@ unknown3 = 3 CheckBalanceInterval: 60000000000, AddTableBatchSize: 50, }, +<<<<<<< HEAD +======= + CDCV2: &config.CDCV2{ + Enable: false, + MetaStoreConfig: config.MetaStoreConfiguration{}, + }, + Puller: &config.PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute), + }, +>>>>>>> 05e032835b (puller(ticdc): detect resolved ts stuck in puller (#10182)) }, o.serverConfig.Debug) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index b1c3aa6286b..7fbf87f61c8 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -161,7 +161,24 @@ const ( "check-balance-interval": 60000000000, "add-table-batch-size": 50 }, +<<<<<<< HEAD "enable-kv-connect-backoff": false +======= + "enable-kv-connect-backoff": false, + "cdc-v2": { + "enable": false, + "meta-store": { + "uri": "", + "ssl-ca": "", + "ssl-cert": "", + "ssl-key": "" + } + }, + "puller": { + "enable-resolved-ts-stuck-detection": false, + "resolved-ts-stuck-interval": 300000000000 + } +>>>>>>> 05e032835b (puller(ticdc): detect resolved ts stuck in puller (#10182)) }, "cluster-id": "default", "max-memory-percentage": 0, diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 3a7815090bc..70b3c641b2b 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -28,6 +28,15 @@ type DebugConfig struct { // EnableKVConnectBackOff enables the backoff for kv connect. EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"` +<<<<<<< HEAD +======= + + // CDCV2 enables ticdc version 2 implementation with new metastore + CDCV2 *CDCV2 `toml:"cdc-v2" json:"cdc-v2"` + + // Puller is the configuration of the puller. + Puller *PullerConfig `toml:"puller" json:"puller"` +>>>>>>> 05e032835b (puller(ticdc): detect resolved ts stuck in puller (#10182)) } // ValidateAndAdjust validates and adjusts the debug configuration @@ -44,3 +53,11 @@ func (c *DebugConfig) ValidateAndAdjust() error { return nil } + +// PullerConfig represents config for puller +type PullerConfig struct { + // EnableResolvedTsStuckDetection is used to enable resolved ts stuck detection. + EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable-resolved-ts-stuck-detection"` + // ResolvedTsStuckInterval is the interval of checking resolved ts stuck. + ResolvedTsStuckInterval TomlDuration `toml:"resolved-ts-stuck-interval" json:"resolved-ts-stuck-interval"` +} diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index eda9f95c8e5..18141db73b1 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -134,6 +134,14 @@ var defaultServerConfig = &ServerConfig{ Scheduler: NewDefaultSchedulerConfig(), EnableKVConnectBackOff: false, +<<<<<<< HEAD +======= + CDCV2: &CDCV2{Enable: false}, + Puller: &PullerConfig{ + EnableResolvedTsStuckDetection: false, + ResolvedTsStuckInterval: TomlDuration(5 * time.Minute), + }, +>>>>>>> 05e032835b (puller(ticdc): detect resolved ts stuck in puller (#10182)) }, ClusterID: "default", GcTunerMemoryThreshold: DisableMemoryLimit,