From 172a8e74d72a99f390513d1215150a44f5c40c49 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 16 Oct 2023 16:16:59 +0800 Subject: [PATCH] config, changefeed (ticdc): add changefeed error stuck duration config (#9872) (#9878) close pingcap/tiflow#9875 --- Makefile | 9 ++---- cdc/api/v2/model.go | 20 ++++++++---- cdc/api/v2/model_test.go | 2 ++ cdc/model/changefeed.go | 10 ++++++ cdc/owner/changefeed.go | 2 +- cdc/owner/changefeed_test.go | 2 +- cdc/owner/feed_state_manager.go | 36 +++++++++++---------- cdc/owner/feed_state_manager_test.go | 2 +- pkg/config/config_test_data.go | 9 ++++-- pkg/config/replica_config.go | 18 +++++++++-- pkg/config/replica_config_test.go | 11 +++++++ pkg/orchestrator/reactor_state_test.go | 44 ++++++++++++++------------ 12 files changed, 106 insertions(+), 59 deletions(-) diff --git a/Makefile b/Makefile index 83561c33ccb..932d2ed34d9 100644 --- a/Makefile +++ b/Makefile @@ -319,12 +319,12 @@ check-static: tools/bin/golangci-lint cd dm && ../tools/bin/golangci-lint run --timeout 10m0s check: check-copyright generate_mock go-generate fmt check-static tidy terror_check errdoc \ - check-merge-conflicts check-ticdc-dashboard check-diff-line-width swagger-spec check-makefiles \ + check-merge-conflicts check-ticdc-dashboard check-diff-line-width check-makefiles \ check_cdc_integration_test check_dm_integration_test check_engine_integration_test @git --no-pager diff --exit-code || (echo "Please add changed files!" && false) fast_check: check-copyright fmt check-static tidy terror_check errdoc \ - check-merge-conflicts check-ticdc-dashboard check-diff-line-width swagger-spec check-makefiles \ + check-merge-conflicts check-ticdc-dashboard check-diff-line-width check-makefiles \ check_cdc_integration_test check_dm_integration_test check_engine_integration_test @git --no-pager diff --exit-code || (echo "Please add changed files!" && false) @@ -344,10 +344,7 @@ unit_test_coverage: data-flow-diagram: docs/data-flow.dot dot -Tsvg docs/data-flow.dot > docs/data-flow.svg - -swagger-spec: tools/bin/swag - tools/bin/swag init --exclude dm,engine --parseVendor -generalInfo cdc/api/v1/api.go --output docs/swagger - + generate_mock: ## Generate mock code. generate_mock: tools/bin/mockgen scripts/generate-mock.sh diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 764fbddb68e..3206f2f1899 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -187,12 +187,13 @@ type ReplicaConfig struct { SyncPointInterval *JSONDuration `json:"sync_point_interval" swaggertype:"string"` SyncPointRetention *JSONDuration `json:"sync_point_retention" swaggertype:"string"` - Filter *FilterConfig `json:"filter"` - Mounter *MounterConfig `json:"mounter"` - Sink *SinkConfig `json:"sink"` - Consistent *ConsistentConfig `json:"consistent"` - Scheduler *ChangefeedSchedulerConfig `json:"scheduler"` - Integrity *IntegrityConfig `json:"integrity"` + Filter *FilterConfig `json:"filter"` + Mounter *MounterConfig `json:"mounter"` + Sink *SinkConfig `json:"sink"` + Consistent *ConsistentConfig `json:"consistent,omitempty"` + Scheduler *ChangefeedSchedulerConfig `json:"scheduler"` + Integrity *IntegrityConfig `json:"integrity"` + ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty" swaggertype:"string"` } // ToInternalReplicaConfig coverts *v2.ReplicaConfig into *config.ReplicaConfig @@ -423,6 +424,9 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( CorruptionHandleLevel: c.Integrity.CorruptionHandleLevel, } } + if c.ChangefeedErrorStuckDuration != nil { + res.ChangefeedErrorStuckDuration = &c.ChangefeedErrorStuckDuration.duration + } return res } @@ -649,7 +653,9 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { CorruptionHandleLevel: cloned.Integrity.CorruptionHandleLevel, } } - + if cloned.ChangefeedErrorStuckDuration != nil { + res.ChangefeedErrorStuckDuration = &JSONDuration{*cloned.ChangefeedErrorStuckDuration} + } return res } diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index c7da5ac83bb..2ed6dc97f84 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -76,6 +76,8 @@ var defaultAPIConfig = &ReplicaConfig{ IntegrityCheckLevel: config.GetDefaultReplicaConfig().Integrity.IntegrityCheckLevel, CorruptionHandleLevel: config.GetDefaultReplicaConfig().Integrity.CorruptionHandleLevel, }, + ChangefeedErrorStuckDuration: &JSONDuration{*config. + GetDefaultReplicaConfig().ChangefeedErrorStuckDuration}, } func TestDefaultReplicaConfig(t *testing.T) { diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 8e60ca43b48..0ed708b388a 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -332,6 +332,10 @@ func (info *ChangeFeedInfo) VerifyAndComplete() { if info.Config.Integrity == nil { info.Config.Integrity = defaultConfig.Integrity } + + if info.Config.ChangefeedErrorStuckDuration == nil { + info.Config.ChangefeedErrorStuckDuration = defaultConfig.ChangefeedErrorStuckDuration + } } // FixIncompatible fixes incompatible changefeed meta info. @@ -361,6 +365,12 @@ func (info *ChangeFeedInfo) FixIncompatible() { log.Info("Fix incompatible memory quota completed", zap.String("changefeed", info.String())) } + if info.Config.ChangefeedErrorStuckDuration == nil { + log.Info("Start fixing incompatible error stuck duration", zap.String("changefeed", info.String())) + info.Config.ChangefeedErrorStuckDuration = config.GetDefaultReplicaConfig().ChangefeedErrorStuckDuration + log.Info("Fix incompatible error stuck duration completed", zap.String("changefeed", info.String())) + } + log.Info("Start fixing incompatible scheduler", zap.String("changefeed", info.String())) inheritV66 := creatorVersionGate.ChangefeedInheritSchedulerConfigFromV66() info.fixScheduler(inheritV66) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 6dfe1252d03..9b6e0413738 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -156,7 +156,7 @@ func newChangefeed( // The scheduler will be created lazily. scheduler: nil, barriers: newBarriers(), - feedStateManager: newFeedStateManager(up), + feedStateManager: newFeedStateManager(up, state.Info.Config), upstream: up, errCh: make(chan error, defaultErrChSize), diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 8d091aeebb8..81fb4716b2b 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -201,7 +201,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T, info = ctx.ChangefeedVars().Info return info, true, nil }) - + tester.MustApplyPatches() cf := newChangefeed4Test(ctx.ChangefeedVars().ID, state, up, // new ddl puller func(ctx context.Context, diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index c7bc916f112..64ccf60bb9e 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/upstream" @@ -36,7 +37,6 @@ const ( // To avoid thunderherd, a random factor is also added. defaultBackoffInitInterval = 10 * time.Second defaultBackoffMaxInterval = 10 * time.Minute - defaultBackoffMaxElapsedTime = 30 * time.Minute defaultBackoffRandomizationFactor = 0.1 defaultBackoffMultiplier = 2.0 ) @@ -66,25 +66,27 @@ type feedStateManager struct { resolvedTs model.Ts initCheckpointTs model.Ts - checkpointTsAdvanced time.Time - lastCheckpointTs model.Ts + checkpointTsAdvanced time.Time + lastCheckpointTs model.Ts + changefeedErrorStuckDuration time.Duration } // newFeedStateManager creates feedStateManager and initialize the exponential backoff -func newFeedStateManager(up *upstream.Upstream) *feedStateManager { - f := new(feedStateManager) - f.upstream = up - - f.errBackoff = backoff.NewExponentialBackOff() - f.errBackoff.InitialInterval = defaultBackoffInitInterval - f.errBackoff.MaxInterval = defaultBackoffMaxInterval - f.errBackoff.Multiplier = defaultBackoffMultiplier - f.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor +func newFeedStateManager(up *upstream.Upstream, cfg *config.ReplicaConfig) *feedStateManager { + m := new(feedStateManager) + m.upstream = up + + m.errBackoff = backoff.NewExponentialBackOff() + m.errBackoff.InitialInterval = defaultBackoffInitInterval + m.errBackoff.MaxInterval = defaultBackoffMaxInterval + m.errBackoff.Multiplier = defaultBackoffMultiplier + m.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor // backoff will stop once the defaultBackoffMaxElapsedTime has elapsed. - f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime + m.errBackoff.MaxElapsedTime = *cfg.ChangefeedErrorStuckDuration + m.changefeedErrorStuckDuration = *cfg.ChangefeedErrorStuckDuration - f.resetErrRetry() - return f + m.resetErrRetry() + return m } // resetErrRetry reset the error retry related fields @@ -567,8 +569,8 @@ func (m *feedStateManager) handleWarning(errs ...*model.RunningError) { // 1. checkpoint lag is large enough; // 2. checkpoint hasn't been advanced for a long while; // 3. the changefeed has been initialized. - if currTime.Sub(ckptTime) > defaultBackoffMaxElapsedTime && - time.Since(m.checkpointTsAdvanced) > defaultBackoffMaxElapsedTime && + if currTime.Sub(ckptTime) > m.changefeedErrorStuckDuration && + time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration && m.resolvedTs > m.initCheckpointTs { log.Info("changefeed retry on warning for a very long time and does not resume, "+ "it will be failed", zap.String("changefeed", m.state.ID.ID), diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index fa48e5be9e8..6c90ad93f12 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -838,7 +838,7 @@ func TestHandleWarning(t *testing.T) { tester.MustApplyPatches() // mock the checkpointTs is not progressing for defaultBackoffMaxElapsedTime time manager.checkpointTsAdvanced = manager. - checkpointTsAdvanced.Add(-(defaultBackoffMaxElapsedTime + 1)) + checkpointTsAdvanced.Add(-(manager.changefeedErrorStuckDuration + 1)) // resolveTs = 202 > checkpointTs = 201 manager.Tick(state, 202) // some patches will be generated when the manager.Tick is called diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index daa11682d59..5a44aac5aac 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -70,7 +70,8 @@ const ( "integrity": { "integrity-check-level": "none", "corruption-handle-level": "warn" - } + }, + "changefeed-error-stuck-duration": 1800000000000 }` testCfgTestServerConfigMarshal = `{ @@ -285,7 +286,8 @@ const ( "integrity": { "integrity-check-level": "none", "corruption-handle-level": "warn" - } + }, + "changefeed-error-stuck-duration": 1800000000000 }` testCfgTestReplicaConfigMarshal2 = `{ @@ -415,6 +417,7 @@ const ( "integrity": { "integrity-check-level": "none", "corruption-handle-level": "warn" - } + }, + "changefeed-error-stuck-duration": 1800000000000 }` ) diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index b15f8c5a4ac..c08b1bc6879 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -35,7 +35,8 @@ const ( // minSyncPointInterval is the minimum of SyncPointInterval can be set. minSyncPointInterval = time.Second * 30 // minSyncPointRetention is the minimum of SyncPointRetention can be set. - minSyncPointRetention = time.Hour * 1 + minSyncPointRetention = time.Hour * 1 + minChangeFeedErrorStuckDuration = time.Minute * 30 ) var defaultReplicaConfig = &ReplicaConfig{ @@ -83,6 +84,7 @@ var defaultReplicaConfig = &ReplicaConfig{ IntegrityCheckLevel: integrity.CheckLevelNone, CorruptionHandleLevel: integrity.CorruptionHandleLevelWarn, }, + ChangefeedErrorStuckDuration: util.AddressOf(time.Minute * 30), } // GetDefaultReplicaConfig returns the default replica config. @@ -123,8 +125,9 @@ type replicaConfig struct { Sink *SinkConfig `toml:"sink" json:"sink"` Consistent *ConsistentConfig `toml:"consistent" json:"consistent"` // Scheduler is the configuration for scheduler. - Scheduler *ChangefeedSchedulerConfig `toml:"scheduler" json:"scheduler"` - Integrity *integrity.Config `toml:"integrity" json:"integrity"` + Scheduler *ChangefeedSchedulerConfig `toml:"scheduler" json:"scheduler"` + Integrity *integrity.Config `toml:"integrity" json:"integrity"` + ChangefeedErrorStuckDuration *time.Duration `toml:"changefeed-error-stuck-duration" json:"changefeed-error-stuck-duration,omitempty"` } // Marshal returns the json marshal format of a ReplicationConfig @@ -251,6 +254,15 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin } } + if c.ChangefeedErrorStuckDuration != nil && + *c.ChangefeedErrorStuckDuration < minChangeFeedErrorStuckDuration { + return cerror.ErrInvalidReplicaConfig. + FastGenByArgs( + fmt.Sprintf("The ChangefeedErrorStuckDuration:%f must be larger than %f Seconds", + c.ChangefeedErrorStuckDuration.Seconds(), + minChangeFeedErrorStuckDuration.Seconds())) + } + return nil } diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index 7113e866eb0..5a67b8db7d9 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -263,6 +263,17 @@ func TestValidateAndAdjust(t *testing.T) { cfg.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness require.NoError(t, cfg.ValidateAndAdjust(sinkURL)) require.Equal(t, integrity.CheckLevelNone, cfg.Integrity.IntegrityCheckLevel) + + // changefeed error stuck duration is less than 30 minutes + cfg = GetDefaultReplicaConfig() + duration := minChangeFeedErrorStuckDuration - time.Second*1 + cfg.ChangefeedErrorStuckDuration = &duration + err = cfg.ValidateAndAdjust(sinkURL) + require.Error(t, err) + require.Contains(t, err.Error(), "The ChangefeedErrorStuckDuration") + duration = minChangeFeedErrorStuckDuration + cfg.ChangefeedErrorStuckDuration = &duration + require.NoError(t, cfg.ValidateAndAdjust(sinkURL)) } func TestIsSinkCompatibleWithSpanReplication(t *testing.T) { diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index 892b682f76b..ed403faf1b9 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -72,10 +72,6 @@ func TestChangefeedStateUpdate(t *testing.T) { "dispatchers": null, "protocol": "open-protocol", "advance-timeout-in-sec": 150 - }, - "consistent": { - "level": "normal", - "storage": "local" } }, "state": "normal", @@ -125,13 +121,15 @@ func TestChangefeedStateUpdate(t *testing.T) { CheckGCSafePoint: true, Filter: &config.FilterConfig{Rules: []string{"*.*"}}, Mounter: &config.MounterConfig{WorkerNum: 16}, + Scheduler: config.GetDefaultReplicaConfig().Scheduler, Sink: &config.SinkConfig{ Protocol: "open-protocol", AdvanceTimeoutInSec: putil.AddressOf(uint(150)), }, - Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"}, - Scheduler: config.GetDefaultReplicaConfig().Scheduler, + Consistent: config.GetDefaultReplicaConfig().Consistent, Integrity: config.GetDefaultReplicaConfig().Integrity, + ChangefeedErrorStuckDuration: config. + GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -182,9 +180,11 @@ func TestChangefeedStateUpdate(t *testing.T) { Protocol: "open-protocol", AdvanceTimeoutInSec: putil.AddressOf(uint(150)), }, - Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"}, + Consistent: config.GetDefaultReplicaConfig().Consistent, Scheduler: config.GetDefaultReplicaConfig().Scheduler, Integrity: config.GetDefaultReplicaConfig().Integrity, + ChangefeedErrorStuckDuration: config. + GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -240,9 +240,11 @@ func TestChangefeedStateUpdate(t *testing.T) { Protocol: "open-protocol", AdvanceTimeoutInSec: putil.AddressOf(uint(150)), }, - Consistent: &config.ConsistentConfig{Level: "normal", Storage: "local"}, + Consistent: config.GetDefaultReplicaConfig().Consistent, Scheduler: config.GetDefaultReplicaConfig().Scheduler, Integrity: config.GetDefaultReplicaConfig().Integrity, + ChangefeedErrorStuckDuration: config. + GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -327,12 +329,13 @@ func TestPatchInfo(t *testing.T) { SinkURI: "123", Engine: model.SortUnified, Config: &config.ReplicaConfig{ - Filter: defaultConfig.Filter, - Mounter: defaultConfig.Mounter, - Sink: defaultConfig.Sink, - Consistent: defaultConfig.Consistent, - Scheduler: defaultConfig.Scheduler, - Integrity: defaultConfig.Integrity, + Filter: defaultConfig.Filter, + Mounter: defaultConfig.Mounter, + Sink: defaultConfig.Sink, + Consistent: defaultConfig.Consistent, + Scheduler: defaultConfig.Scheduler, + Integrity: defaultConfig.Integrity, + ChangefeedErrorStuckDuration: defaultConfig.ChangefeedErrorStuckDuration, }, }) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { @@ -345,12 +348,13 @@ func TestPatchInfo(t *testing.T) { StartTs: 6, Engine: model.SortUnified, Config: &config.ReplicaConfig{ - Filter: defaultConfig.Filter, - Mounter: defaultConfig.Mounter, - Sink: defaultConfig.Sink, - Consistent: defaultConfig.Consistent, - Scheduler: defaultConfig.Scheduler, - Integrity: defaultConfig.Integrity, + Filter: defaultConfig.Filter, + Mounter: defaultConfig.Mounter, + Sink: defaultConfig.Sink, + Consistent: defaultConfig.Consistent, + Scheduler: defaultConfig.Scheduler, + Integrity: defaultConfig.Integrity, + ChangefeedErrorStuckDuration: defaultConfig.ChangefeedErrorStuckDuration, }, }) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {