Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release-7.5' into cherry-pick-…
Browse files Browse the repository at this point in the history
…10258-to-release-7.5

# Conflicts:
#	cdc/puller/puller.go
  • Loading branch information
sdojjy committed Dec 7, 2023
2 parents 928c414 + 6b4b212 commit 40d467f
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 15 deletions.
16 changes: 2 additions & 14 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,12 @@ type pullerImpl struct {
changefeed model.ChangeFeedID
tableID model.TableID
tableName string
<<<<<<< HEAD
=======

cfg *config.ServerConfig
lastForwardTime time.Time
lastForwardResolvedTs uint64
// startResolvedTs is the resolvedTs when puller is initialized
startResolvedTs uint64
>>>>>>> e16c52d843 (puller(ticdc): fix stuck detect issue (#10258))
}

// New create a new Puller fetch event start from checkpointTs and put into buf.
Expand Down Expand Up @@ -119,12 +116,9 @@ func New(ctx context.Context,
changefeed: changefeed,
tableID: tableID,
tableName: tableName,
<<<<<<< HEAD
=======
cfg: cfg,

startResolvedTs: checkpointTs,
>>>>>>> e16c52d843 (puller(ticdc): fix stuck detect issue (#10258))
}
return p
}
Expand Down Expand Up @@ -153,8 +147,8 @@ func (p *pullerImpl) Run(ctx context.Context) error {

lastResolvedTs := p.checkpointTs
g.Go(func() error {
metricsTicker := time.NewTicker(15 * time.Second)
defer metricsTicker.Stop()
stuckDetectorTicker := time.NewTicker(1 * time.Minute)
defer stuckDetectorTicker.Stop()
output := func(raw *model.RawKVEntry) error {
// even after https://github.com/pingcap/tiflow/pull/2038, kv client
// could still miss region change notification, which leads to resolved
Expand Down Expand Up @@ -191,14 +185,11 @@ func (p *pullerImpl) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
<<<<<<< HEAD
=======
case <-stuckDetectorTicker.C:
if err := p.detectResolvedTsStuck(); err != nil {
return errors.Trace(err)
}
continue
>>>>>>> e16c52d843 (puller(ticdc): fix stuck detect issue (#10258))
case e = <-eventCh:
}

Expand Down Expand Up @@ -258,8 +249,6 @@ func (p *pullerImpl) Run(ctx context.Context) error {
return g.Wait()
}

<<<<<<< HEAD
=======
func (p *pullerImpl) detectResolvedTsStuck() error {
if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection {
resolvedTs := p.tsTracker.Frontier()
Expand Down Expand Up @@ -289,7 +278,6 @@ func (p *pullerImpl) detectResolvedTsStuck() error {
return nil
}

>>>>>>> e16c52d843 (puller(ticdc): fix stuck detect issue (#10258))
func (p *pullerImpl) Output() <-chan *model.RawKVEntry {
return p.outputCh
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ func TestParseCfg(t *testing.T) {
CheckBalanceInterval: 60000000000,
AddTableBatchSize: 50,
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -338,6 +342,10 @@ check-balance-interval = "10s"
CheckBalanceInterval: config.TomlDuration(10 * time.Second),
AddTableBatchSize: 50,
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -469,6 +477,10 @@ cert-allowed-cn = ["dd","ee"]
CheckBalanceInterval: 60000000000,
AddTableBatchSize: 50,
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -527,5 +539,9 @@ unknown3 = 3
CheckBalanceInterval: 60000000000,
AddTableBatchSize: 50,
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
}, o.serverConfig.Debug)
}
6 changes: 5 additions & 1 deletion pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ const (
"check-balance-interval": 60000000000,
"add-table-batch-size": 50
},
"enable-kv-connect-backoff": false
"enable-kv-connect-backoff": false,
"puller": {
"enable-resolved-ts-stuck-detection": false,
"resolved-ts-stuck-interval": 300000000000
}
},
"cluster-id": "default",
"max-memory-percentage": 0,
Expand Down
11 changes: 11 additions & 0 deletions pkg/config/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type DebugConfig struct {

// EnableKVConnectBackOff enables the backoff for kv connect.
EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"`

// Puller is the configuration of the puller.
Puller *PullerConfig `toml:"puller" json:"puller"`
}

// ValidateAndAdjust validates and adjusts the debug configuration
Expand All @@ -44,3 +47,11 @@ func (c *DebugConfig) ValidateAndAdjust() error {

return nil
}

// PullerConfig represents config for puller
type PullerConfig struct {
// EnableResolvedTsStuckDetection is used to enable resolved ts stuck detection.
EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable-resolved-ts-stuck-detection"`
// ResolvedTsStuckInterval is the interval of checking resolved ts stuck.
ResolvedTsStuckInterval TomlDuration `toml:"resolved-ts-stuck-interval" json:"resolved-ts-stuck-interval"`
}
4 changes: 4 additions & 0 deletions pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ var defaultServerConfig = &ServerConfig{

Scheduler: NewDefaultSchedulerConfig(),
EnableKVConnectBackOff: false,
Puller: &PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
GcTunerMemoryThreshold: DisableMemoryLimit,
Expand Down

0 comments on commit 40d467f

Please sign in to comment.