Skip to content

Commit

Permalink
config, changefeed (ticdc): add changefeed error stuck duration config (
Browse files Browse the repository at this point in the history
#9872) (#9878)

close #9875
  • Loading branch information
ti-chi-bot authored Oct 16, 2023
1 parent dd7ccf4 commit 172a8e7
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 59 deletions.
9 changes: 3 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,12 @@ check-static: tools/bin/golangci-lint
cd dm && ../tools/bin/golangci-lint run --timeout 10m0s

check: check-copyright generate_mock go-generate fmt check-static tidy terror_check errdoc \
check-merge-conflicts check-ticdc-dashboard check-diff-line-width swagger-spec check-makefiles \
check-merge-conflicts check-ticdc-dashboard check-diff-line-width check-makefiles \
check_cdc_integration_test check_dm_integration_test check_engine_integration_test
@git --no-pager diff --exit-code || (echo "Please add changed files!" && false)

fast_check: check-copyright fmt check-static tidy terror_check errdoc \
check-merge-conflicts check-ticdc-dashboard check-diff-line-width swagger-spec check-makefiles \
check-merge-conflicts check-ticdc-dashboard check-diff-line-width check-makefiles \
check_cdc_integration_test check_dm_integration_test check_engine_integration_test
@git --no-pager diff --exit-code || (echo "Please add changed files!" && false)

Expand All @@ -344,10 +344,7 @@ unit_test_coverage:

data-flow-diagram: docs/data-flow.dot
dot -Tsvg docs/data-flow.dot > docs/data-flow.svg

swagger-spec: tools/bin/swag
tools/bin/swag init --exclude dm,engine --parseVendor -generalInfo cdc/api/v1/api.go --output docs/swagger


generate_mock: ## Generate mock code.
generate_mock: tools/bin/mockgen
scripts/generate-mock.sh
Expand Down
20 changes: 13 additions & 7 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,13 @@ type ReplicaConfig struct {
SyncPointInterval *JSONDuration `json:"sync_point_interval" swaggertype:"string"`
SyncPointRetention *JSONDuration `json:"sync_point_retention" swaggertype:"string"`

Filter *FilterConfig `json:"filter"`
Mounter *MounterConfig `json:"mounter"`
Sink *SinkConfig `json:"sink"`
Consistent *ConsistentConfig `json:"consistent"`
Scheduler *ChangefeedSchedulerConfig `json:"scheduler"`
Integrity *IntegrityConfig `json:"integrity"`
Filter *FilterConfig `json:"filter"`
Mounter *MounterConfig `json:"mounter"`
Sink *SinkConfig `json:"sink"`
Consistent *ConsistentConfig `json:"consistent,omitempty"`
Scheduler *ChangefeedSchedulerConfig `json:"scheduler"`
Integrity *IntegrityConfig `json:"integrity"`
ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty" swaggertype:"string"`
}

// ToInternalReplicaConfig coverts *v2.ReplicaConfig into *config.ReplicaConfig
Expand Down Expand Up @@ -423,6 +424,9 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
CorruptionHandleLevel: c.Integrity.CorruptionHandleLevel,
}
}
if c.ChangefeedErrorStuckDuration != nil {
res.ChangefeedErrorStuckDuration = &c.ChangefeedErrorStuckDuration.duration
}
return res
}

Expand Down Expand Up @@ -649,7 +653,9 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
CorruptionHandleLevel: cloned.Integrity.CorruptionHandleLevel,
}
}

if cloned.ChangefeedErrorStuckDuration != nil {
res.ChangefeedErrorStuckDuration = &JSONDuration{*cloned.ChangefeedErrorStuckDuration}
}
return res
}

Expand Down
2 changes: 2 additions & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ var defaultAPIConfig = &ReplicaConfig{
IntegrityCheckLevel: config.GetDefaultReplicaConfig().Integrity.IntegrityCheckLevel,
CorruptionHandleLevel: config.GetDefaultReplicaConfig().Integrity.CorruptionHandleLevel,
},
ChangefeedErrorStuckDuration: &JSONDuration{*config.
GetDefaultReplicaConfig().ChangefeedErrorStuckDuration},
}

func TestDefaultReplicaConfig(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ func (info *ChangeFeedInfo) VerifyAndComplete() {
if info.Config.Integrity == nil {
info.Config.Integrity = defaultConfig.Integrity
}

if info.Config.ChangefeedErrorStuckDuration == nil {
info.Config.ChangefeedErrorStuckDuration = defaultConfig.ChangefeedErrorStuckDuration
}
}

// FixIncompatible fixes incompatible changefeed meta info.
Expand Down Expand Up @@ -361,6 +365,12 @@ func (info *ChangeFeedInfo) FixIncompatible() {
log.Info("Fix incompatible memory quota completed", zap.String("changefeed", info.String()))
}

if info.Config.ChangefeedErrorStuckDuration == nil {
log.Info("Start fixing incompatible error stuck duration", zap.String("changefeed", info.String()))
info.Config.ChangefeedErrorStuckDuration = config.GetDefaultReplicaConfig().ChangefeedErrorStuckDuration
log.Info("Fix incompatible error stuck duration completed", zap.String("changefeed", info.String()))
}

log.Info("Start fixing incompatible scheduler", zap.String("changefeed", info.String()))
inheritV66 := creatorVersionGate.ChangefeedInheritSchedulerConfigFromV66()
info.fixScheduler(inheritV66)
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func newChangefeed(
// The scheduler will be created lazily.
scheduler: nil,
barriers: newBarriers(),
feedStateManager: newFeedStateManager(up),
feedStateManager: newFeedStateManager(up, state.Info.Config),
upstream: up,

errCh: make(chan error, defaultErrChSize),
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
info = ctx.ChangefeedVars().Info
return info, true, nil
})

tester.MustApplyPatches()
cf := newChangefeed4Test(ctx.ChangefeedVars().ID, state, up,
// new ddl puller
func(ctx context.Context,
Expand Down
36 changes: 19 additions & 17 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/upstream"
Expand All @@ -36,7 +37,6 @@ const (
// To avoid thunderherd, a random factor is also added.
defaultBackoffInitInterval = 10 * time.Second
defaultBackoffMaxInterval = 10 * time.Minute
defaultBackoffMaxElapsedTime = 30 * time.Minute
defaultBackoffRandomizationFactor = 0.1
defaultBackoffMultiplier = 2.0
)
Expand Down Expand Up @@ -66,25 +66,27 @@ type feedStateManager struct {
resolvedTs model.Ts
initCheckpointTs model.Ts

checkpointTsAdvanced time.Time
lastCheckpointTs model.Ts
checkpointTsAdvanced time.Time
lastCheckpointTs model.Ts
changefeedErrorStuckDuration time.Duration
}

// newFeedStateManager creates feedStateManager and initialize the exponential backoff
func newFeedStateManager(up *upstream.Upstream) *feedStateManager {
f := new(feedStateManager)
f.upstream = up

f.errBackoff = backoff.NewExponentialBackOff()
f.errBackoff.InitialInterval = defaultBackoffInitInterval
f.errBackoff.MaxInterval = defaultBackoffMaxInterval
f.errBackoff.Multiplier = defaultBackoffMultiplier
f.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor
func newFeedStateManager(up *upstream.Upstream, cfg *config.ReplicaConfig) *feedStateManager {
m := new(feedStateManager)
m.upstream = up

m.errBackoff = backoff.NewExponentialBackOff()
m.errBackoff.InitialInterval = defaultBackoffInitInterval
m.errBackoff.MaxInterval = defaultBackoffMaxInterval
m.errBackoff.Multiplier = defaultBackoffMultiplier
m.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor
// backoff will stop once the defaultBackoffMaxElapsedTime has elapsed.
f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime
m.errBackoff.MaxElapsedTime = *cfg.ChangefeedErrorStuckDuration
m.changefeedErrorStuckDuration = *cfg.ChangefeedErrorStuckDuration

f.resetErrRetry()
return f
m.resetErrRetry()
return m
}

// resetErrRetry reset the error retry related fields
Expand Down Expand Up @@ -567,8 +569,8 @@ func (m *feedStateManager) handleWarning(errs ...*model.RunningError) {
// 1. checkpoint lag is large enough;
// 2. checkpoint hasn't been advanced for a long while;
// 3. the changefeed has been initialized.
if currTime.Sub(ckptTime) > defaultBackoffMaxElapsedTime &&
time.Since(m.checkpointTsAdvanced) > defaultBackoffMaxElapsedTime &&
if currTime.Sub(ckptTime) > m.changefeedErrorStuckDuration &&
time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration &&
m.resolvedTs > m.initCheckpointTs {
log.Info("changefeed retry on warning for a very long time and does not resume, "+
"it will be failed", zap.String("changefeed", m.state.ID.ID),
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ func TestHandleWarning(t *testing.T) {
tester.MustApplyPatches()
// mock the checkpointTs is not progressing for defaultBackoffMaxElapsedTime time
manager.checkpointTsAdvanced = manager.
checkpointTsAdvanced.Add(-(defaultBackoffMaxElapsedTime + 1))
checkpointTsAdvanced.Add(-(manager.changefeedErrorStuckDuration + 1))
// resolveTs = 202 > checkpointTs = 201
manager.Tick(state, 202)
// some patches will be generated when the manager.Tick is called
Expand Down
9 changes: 6 additions & 3 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ const (
"integrity": {
"integrity-check-level": "none",
"corruption-handle-level": "warn"
}
},
"changefeed-error-stuck-duration": 1800000000000
}`

testCfgTestServerConfigMarshal = `{
Expand Down Expand Up @@ -285,7 +286,8 @@ const (
"integrity": {
"integrity-check-level": "none",
"corruption-handle-level": "warn"
}
},
"changefeed-error-stuck-duration": 1800000000000
}`

testCfgTestReplicaConfigMarshal2 = `{
Expand Down Expand Up @@ -415,6 +417,7 @@ const (
"integrity": {
"integrity-check-level": "none",
"corruption-handle-level": "warn"
}
},
"changefeed-error-stuck-duration": 1800000000000
}`
)
18 changes: 15 additions & 3 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ const (
// minSyncPointInterval is the minimum of SyncPointInterval can be set.
minSyncPointInterval = time.Second * 30
// minSyncPointRetention is the minimum of SyncPointRetention can be set.
minSyncPointRetention = time.Hour * 1
minSyncPointRetention = time.Hour * 1
minChangeFeedErrorStuckDuration = time.Minute * 30
)

var defaultReplicaConfig = &ReplicaConfig{
Expand Down Expand Up @@ -83,6 +84,7 @@ var defaultReplicaConfig = &ReplicaConfig{
IntegrityCheckLevel: integrity.CheckLevelNone,
CorruptionHandleLevel: integrity.CorruptionHandleLevelWarn,
},
ChangefeedErrorStuckDuration: util.AddressOf(time.Minute * 30),
}

// GetDefaultReplicaConfig returns the default replica config.
Expand Down Expand Up @@ -123,8 +125,9 @@ type replicaConfig struct {
Sink *SinkConfig `toml:"sink" json:"sink"`
Consistent *ConsistentConfig `toml:"consistent" json:"consistent"`
// Scheduler is the configuration for scheduler.
Scheduler *ChangefeedSchedulerConfig `toml:"scheduler" json:"scheduler"`
Integrity *integrity.Config `toml:"integrity" json:"integrity"`
Scheduler *ChangefeedSchedulerConfig `toml:"scheduler" json:"scheduler"`
Integrity *integrity.Config `toml:"integrity" json:"integrity"`
ChangefeedErrorStuckDuration *time.Duration `toml:"changefeed-error-stuck-duration" json:"changefeed-error-stuck-duration,omitempty"`
}

// Marshal returns the json marshal format of a ReplicationConfig
Expand Down Expand Up @@ -251,6 +254,15 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin
}
}

if c.ChangefeedErrorStuckDuration != nil &&
*c.ChangefeedErrorStuckDuration < minChangeFeedErrorStuckDuration {
return cerror.ErrInvalidReplicaConfig.
FastGenByArgs(
fmt.Sprintf("The ChangefeedErrorStuckDuration:%f must be larger than %f Seconds",
c.ChangefeedErrorStuckDuration.Seconds(),
minChangeFeedErrorStuckDuration.Seconds()))
}

return nil
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,17 @@ func TestValidateAndAdjust(t *testing.T) {
cfg.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness
require.NoError(t, cfg.ValidateAndAdjust(sinkURL))
require.Equal(t, integrity.CheckLevelNone, cfg.Integrity.IntegrityCheckLevel)

// changefeed error stuck duration is less than 30 minutes
cfg = GetDefaultReplicaConfig()
duration := minChangeFeedErrorStuckDuration - time.Second*1
cfg.ChangefeedErrorStuckDuration = &duration
err = cfg.ValidateAndAdjust(sinkURL)
require.Error(t, err)
require.Contains(t, err.Error(), "The ChangefeedErrorStuckDuration")
duration = minChangeFeedErrorStuckDuration
cfg.ChangefeedErrorStuckDuration = &duration
require.NoError(t, cfg.ValidateAndAdjust(sinkURL))
}

func TestIsSinkCompatibleWithSpanReplication(t *testing.T) {
Expand Down
44 changes: 24 additions & 20 deletions pkg/orchestrator/reactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ func TestChangefeedStateUpdate(t *testing.T) {
"dispatchers": null,
"protocol": "open-protocol",
"advance-timeout-in-sec": 150
},
"consistent": {
"level": "normal",
"storage": "local"
}
},
"state": "normal",
Expand Down Expand Up @@ -125,13 +121,15 @@ func TestChangefeedStateUpdate(t *testing.T) {
CheckGCSafePoint: true,
Filter: &config.FilterConfig{Rules: []string{"*.*"}},
Mounter: &config.MounterConfig{WorkerNum: 16},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Sink: &config.SinkConfig{
Protocol: "open-protocol",
AdvanceTimeoutInSec: putil.AddressOf(uint(150)),
},
Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Consistent: config.GetDefaultReplicaConfig().Consistent,
Integrity: config.GetDefaultReplicaConfig().Integrity,
ChangefeedErrorStuckDuration: config.
GetDefaultReplicaConfig().ChangefeedErrorStuckDuration,
},
},
Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713},
Expand Down Expand Up @@ -182,9 +180,11 @@ func TestChangefeedStateUpdate(t *testing.T) {
Protocol: "open-protocol",
AdvanceTimeoutInSec: putil.AddressOf(uint(150)),
},
Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"},
Consistent: config.GetDefaultReplicaConfig().Consistent,
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
ChangefeedErrorStuckDuration: config.
GetDefaultReplicaConfig().ChangefeedErrorStuckDuration,
},
},
Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713},
Expand Down Expand Up @@ -240,9 +240,11 @@ func TestChangefeedStateUpdate(t *testing.T) {
Protocol: "open-protocol",
AdvanceTimeoutInSec: putil.AddressOf(uint(150)),
},
Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"},
Consistent: config.GetDefaultReplicaConfig().Consistent,
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
ChangefeedErrorStuckDuration: config.
GetDefaultReplicaConfig().ChangefeedErrorStuckDuration,
},
},
Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713},
Expand Down Expand Up @@ -327,12 +329,13 @@ func TestPatchInfo(t *testing.T) {
SinkURI: "123",
Engine: model.SortUnified,
Config: &config.ReplicaConfig{
Filter: defaultConfig.Filter,
Mounter: defaultConfig.Mounter,
Sink: defaultConfig.Sink,
Consistent: defaultConfig.Consistent,
Scheduler: defaultConfig.Scheduler,
Integrity: defaultConfig.Integrity,
Filter: defaultConfig.Filter,
Mounter: defaultConfig.Mounter,
Sink: defaultConfig.Sink,
Consistent: defaultConfig.Consistent,
Scheduler: defaultConfig.Scheduler,
Integrity: defaultConfig.Integrity,
ChangefeedErrorStuckDuration: defaultConfig.ChangefeedErrorStuckDuration,
},
})
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
Expand All @@ -345,12 +348,13 @@ func TestPatchInfo(t *testing.T) {
StartTs: 6,
Engine: model.SortUnified,
Config: &config.ReplicaConfig{
Filter: defaultConfig.Filter,
Mounter: defaultConfig.Mounter,
Sink: defaultConfig.Sink,
Consistent: defaultConfig.Consistent,
Scheduler: defaultConfig.Scheduler,
Integrity: defaultConfig.Integrity,
Filter: defaultConfig.Filter,
Mounter: defaultConfig.Mounter,
Sink: defaultConfig.Sink,
Consistent: defaultConfig.Consistent,
Scheduler: defaultConfig.Scheduler,
Integrity: defaultConfig.Integrity,
ChangefeedErrorStuckDuration: defaultConfig.ChangefeedErrorStuckDuration,
},
})
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
Expand Down

0 comments on commit 172a8e7

Please sign in to comment.