Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#10182
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
sdojjy authored and ti-chi-bot committed Dec 1, 2023
1 parent 8c006b3 commit 686a342
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 2 deletions.
37 changes: 35 additions & 2 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type pullerImpl struct {
changefeed model.ChangeFeedID
tableID model.TableID
tableName string

cfg *config.ServerConfig
lastForwardTime time.Time
lastForwardResolvedTs uint64
}

// New create a new Puller fetch event start from checkpointTs and put into buf.
Expand Down Expand Up @@ -110,6 +114,7 @@ func New(ctx context.Context,
changefeed: changefeed,
tableID: tableID,
tableName: tableName,
cfg: cfg,
}
return p
}
Expand Down Expand Up @@ -138,8 +143,8 @@ func (p *pullerImpl) Run(ctx context.Context) error {

lastResolvedTs := p.checkpointTs
g.Go(func() error {
metricsTicker := time.NewTicker(15 * time.Second)
defer metricsTicker.Stop()
stuckDetectorTicker := time.NewTicker(1 * time.Minute)
defer stuckDetectorTicker.Stop()
output := func(raw *model.RawKVEntry) error {
// even after https://github.com/pingcap/tiflow/pull/2038, kv client
// could still miss region change notification, which leads to resolved
Expand Down Expand Up @@ -176,6 +181,11 @@ func (p *pullerImpl) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-stuckDetectorTicker.C:
if err := p.detectResolvedTsStuck(initialized); err != nil {
return errors.Trace(err)
}
continue
case e = <-eventCh:
}

Expand Down Expand Up @@ -235,6 +245,29 @@ func (p *pullerImpl) Run(ctx context.Context) error {
return g.Wait()
}

func (p *pullerImpl) detectResolvedTsStuck(initialized bool) error {
if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection && initialized {
resolvedTs := p.tsTracker.Frontier()
if resolvedTs == p.lastForwardResolvedTs {
log.Warn("ResolvedTs stuck detected in puller",
zap.String("namespace", p.changefeed.Namespace),
zap.String("changefeed", p.changefeed.ID),
zap.Int64("tableID", p.tableID),
zap.String("tableName", p.tableName),
zap.Uint64("lastResolvedTs", p.lastForwardResolvedTs),
zap.Uint64("resolvedTs", resolvedTs))
if time.Since(p.lastForwardTime) > time.Duration(p.cfg.Debug.Puller.ResolvedTsStuckInterval) {
// throw an error to cause changefeed restart
return errors.New("resolved ts stuck")
}
} else {
p.lastForwardTime = time.Now()
p.lastForwardResolvedTs = resolvedTs
}
}
return nil
}

func (p *pullerImpl) Output() <-chan *model.RawKVEntry {
return p.outputCh
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,17 @@ func TestParseCfg(t *testing.T) {
CheckBalanceInterval: 60000000000,
AddTableBatchSize: 50,
},
<<<<<<< HEAD
=======
CDCV2: &config.CDCV2{
Enable: false,
MetaStoreConfig: config.MetaStoreConfiguration{},
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
>>>>>>> 05e032835b (puller(ticdc): detect resolved ts stuck in puller (#10182))
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -338,6 +349,17 @@ check-balance-interval = "10s"
CheckBalanceInterval: config.TomlDuration(10 * time.Second),
AddTableBatchSize: 50,
},
<<<<<<< HEAD
=======
CDCV2: &config.CDCV2{
Enable: false,
MetaStoreConfig: config.MetaStoreConfiguration{},
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
>>>>>>> 05e032835b (puller(ticdc): detect resolved ts stuck in puller (#10182))
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -469,6 +491,17 @@ cert-allowed-cn = ["dd","ee"]
CheckBalanceInterval: 60000000000,
AddTableBatchSize: 50,
},
<<<<<<< HEAD
=======
CDCV2: &config.CDCV2{
Enable: false,
MetaStoreConfig: config.MetaStoreConfiguration{},
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
>>>>>>> 05e032835b (puller(ticdc): detect resolved ts stuck in puller (#10182))
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -527,5 +560,16 @@ unknown3 = 3
CheckBalanceInterval: 60000000000,
AddTableBatchSize: 50,
},
<<<<<<< HEAD
=======
CDCV2: &config.CDCV2{
Enable: false,
MetaStoreConfig: config.MetaStoreConfiguration{},
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
>>>>>>> 05e032835b (puller(ticdc): detect resolved ts stuck in puller (#10182))
}, o.serverConfig.Debug)
}
17 changes: 17 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,24 @@ const (
"check-balance-interval": 60000000000,
"add-table-batch-size": 50
},
<<<<<<< HEAD
"enable-kv-connect-backoff": false
=======
"enable-kv-connect-backoff": false,
"cdc-v2": {
"enable": false,
"meta-store": {
"uri": "",
"ssl-ca": "",
"ssl-cert": "",
"ssl-key": ""
}
},
"puller": {
"enable-resolved-ts-stuck-detection": false,
"resolved-ts-stuck-interval": 300000000000
}
>>>>>>> 05e032835b (puller(ticdc): detect resolved ts stuck in puller (#10182))
},
"cluster-id": "default",
"max-memory-percentage": 0,
Expand Down
17 changes: 17 additions & 0 deletions pkg/config/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ type DebugConfig struct {

// EnableKVConnectBackOff enables the backoff for kv connect.
EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"`
<<<<<<< HEAD
=======

// CDCV2 enables ticdc version 2 implementation with new metastore
CDCV2 *CDCV2 `toml:"cdc-v2" json:"cdc-v2"`

// Puller is the configuration of the puller.
Puller *PullerConfig `toml:"puller" json:"puller"`
>>>>>>> 05e032835b (puller(ticdc): detect resolved ts stuck in puller (#10182))
}

// ValidateAndAdjust validates and adjusts the debug configuration
Expand All @@ -44,3 +53,11 @@ func (c *DebugConfig) ValidateAndAdjust() error {

return nil
}

// PullerConfig represents config for puller
type PullerConfig struct {
// EnableResolvedTsStuckDetection is used to enable resolved ts stuck detection.
EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable-resolved-ts-stuck-detection"`
// ResolvedTsStuckInterval is the interval of checking resolved ts stuck.
ResolvedTsStuckInterval TomlDuration `toml:"resolved-ts-stuck-interval" json:"resolved-ts-stuck-interval"`
}
8 changes: 8 additions & 0 deletions pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ var defaultServerConfig = &ServerConfig{

Scheduler: NewDefaultSchedulerConfig(),
EnableKVConnectBackOff: false,
<<<<<<< HEAD
=======
CDCV2: &CDCV2{Enable: false},
Puller: &PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: TomlDuration(5 * time.Minute),
},
>>>>>>> 05e032835b (puller(ticdc): detect resolved ts stuck in puller (#10182))
},
ClusterID: "default",
GcTunerMemoryThreshold: DisableMemoryLimit,
Expand Down

0 comments on commit 686a342

Please sign in to comment.