Skip to content

Commit

Permalink
puller(ticdc): detect resolved ts stuck in puller (#10182) (#10219)
Browse files Browse the repository at this point in the history
close #10181
  • Loading branch information
ti-chi-bot authored Dec 4, 2023
1 parent 06e345b commit 1fde95e
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 3 deletions.
37 changes: 35 additions & 2 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type pullerImpl struct {
changefeed model.ChangeFeedID
tableID model.TableID
tableName string

cfg *config.ServerConfig
lastForwardTime time.Time
lastForwardResolvedTs uint64
}

// New create a new Puller fetch event start from checkpointTs and put into buf.
Expand Down Expand Up @@ -112,6 +116,7 @@ func New(ctx context.Context,
changefeed: changefeed,
tableID: tableID,
tableName: tableName,
cfg: cfg,
}
return p
}
Expand Down Expand Up @@ -140,8 +145,8 @@ func (p *pullerImpl) Run(ctx context.Context) error {

lastResolvedTs := p.checkpointTs
g.Go(func() error {
metricsTicker := time.NewTicker(15 * time.Second)
defer metricsTicker.Stop()
stuckDetectorTicker := time.NewTicker(1 * time.Minute)
defer stuckDetectorTicker.Stop()
output := func(raw *model.RawKVEntry) error {
// even after https://github.com/pingcap/tiflow/pull/2038, kv client
// could still miss region change notification, which leads to resolved
Expand Down Expand Up @@ -178,6 +183,11 @@ func (p *pullerImpl) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-stuckDetectorTicker.C:
if err := p.detectResolvedTsStuck(initialized); err != nil {
return errors.Trace(err)
}
continue
case e = <-eventCh:
}

Expand Down Expand Up @@ -237,6 +247,29 @@ func (p *pullerImpl) Run(ctx context.Context) error {
return g.Wait()
}

func (p *pullerImpl) detectResolvedTsStuck(initialized bool) error {
if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection && initialized {
resolvedTs := p.tsTracker.Frontier()
if resolvedTs == p.lastForwardResolvedTs {
log.Warn("ResolvedTs stuck detected in puller",
zap.String("namespace", p.changefeed.Namespace),
zap.String("changefeed", p.changefeed.ID),
zap.Int64("tableID", p.tableID),
zap.String("tableName", p.tableName),
zap.Uint64("lastResolvedTs", p.lastForwardResolvedTs),
zap.Uint64("resolvedTs", resolvedTs))
if time.Since(p.lastForwardTime) > time.Duration(p.cfg.Debug.Puller.ResolvedTsStuckInterval) {
// throw an error to cause changefeed restart
return errors.New("resolved ts stuck")
}
} else {
p.lastForwardTime = time.Now()
p.lastForwardResolvedTs = resolvedTs
}
}
return nil
}

func (p *pullerImpl) Output() <-chan *model.RawKVEntry {
return p.outputCh
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ func TestParseCfg(t *testing.T) {
CheckBalanceInterval: 60000000000,
AddTableBatchSize: 50,
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -348,6 +352,10 @@ check-balance-interval = "10s"
CheckBalanceInterval: config.TomlDuration(10 * time.Second),
AddTableBatchSize: 50,
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -483,6 +491,10 @@ cert-allowed-cn = ["dd","ee"]
CheckBalanceInterval: 60000000000,
AddTableBatchSize: 50,
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -546,5 +558,9 @@ unknown3 = 3
CheckBalanceInterval: 60000000000,
AddTableBatchSize: 50,
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
}, o.serverConfig.Debug)
}
6 changes: 5 additions & 1 deletion pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ const (
"check-balance-interval": 60000000000,
"add-table-batch-size": 50
},
"enable-kv-connect-backoff": false
"enable-kv-connect-backoff": false,
"puller": {
"enable-resolved-ts-stuck-detection": false,
"resolved-ts-stuck-interval": 300000000000
}
},
"cluster-id": "default",
"max-memory-percentage": 0,
Expand Down
11 changes: 11 additions & 0 deletions pkg/config/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type DebugConfig struct {

// EnableKVConnectBackOff enables the backoff for kv connect.
EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"`

// Puller is the configuration of the puller.
Puller *PullerConfig `toml:"puller" json:"puller"`
}

// ValidateAndAdjust validates and adjusts the debug configuration
Expand All @@ -44,3 +47,11 @@ func (c *DebugConfig) ValidateAndAdjust() error {

return nil
}

// PullerConfig represents config for puller
type PullerConfig struct {
// EnableResolvedTsStuckDetection is used to enable resolved ts stuck detection.
EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable-resolved-ts-stuck-detection"`
// ResolvedTsStuckInterval is the interval of checking resolved ts stuck.
ResolvedTsStuckInterval TomlDuration `toml:"resolved-ts-stuck-interval" json:"resolved-ts-stuck-interval"`
}
4 changes: 4 additions & 0 deletions pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ var defaultServerConfig = &ServerConfig{

Scheduler: NewDefaultSchedulerConfig(),
EnableKVConnectBackOff: false,
Puller: &PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
GcTunerMemoryThreshold: DisableMemoryLimit,
Expand Down

0 comments on commit 1fde95e

Please sign in to comment.