diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 514f2514de7..30c62e86aff 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -39,11 +39,6 @@ const ( defaultBackoffMaxElapsedTime = 30 * time.Minute defaultBackoffRandomizationFactor = 0.1 defaultBackoffMultiplier = 2.0 - - // If all states recorded in window are 'normal', it can be assumed that the changefeed - // is running steady. And then if we enter a state other than normal at next tick, - // the backoff must be reset. - defaultStateWindowSize = 512 ) // feedStateManager manages the ReactorState of a changefeed @@ -59,7 +54,7 @@ type feedStateManager struct { shouldBeRemoved bool adminJobQueue []*model.AdminJob - stateHistory [defaultStateWindowSize]model.FeedState + isRetrying bool lastErrorRetryTime time.Time // time of last error for a changefeed lastErrorRetryCheckpointTs model.Ts // checkpoint ts of last retry lastWarningReportCheckpointTs model.Ts // checkpoint ts of last warning report @@ -93,26 +88,6 @@ func newFeedStateManager(up *upstream.Upstream) *feedStateManager { return f } -// isChangefeedStable check if there are states other than 'normal' in this sliding window. -func (m *feedStateManager) isChangefeedStable() bool { - for _, val := range m.stateHistory { - if val != model.StateNormal { - return false - } - } - - return true -} - -// shiftStateWindow shift the sliding window -func (m *feedStateManager) shiftStateWindow(state model.FeedState) { - for i := 0; i < defaultStateWindowSize-1; i++ { - m.stateHistory[i] = m.stateHistory[i+1] - } - - m.stateHistory[defaultStateWindowSize-1] = state -} - func (m *feedStateManager) Tick( state *orchestrator.ChangefeedReactorState, resolvedTs model.Ts, @@ -128,7 +103,6 @@ func (m *feedStateManager) Tick( } } - m.shiftStateWindow(state.Info.State) m.checkAndInitLastRetryCheckpointTs(state.Status) m.state = state @@ -167,13 +141,13 @@ func (m *feedStateManager) Tick( // NextBackOff() will return -1 once the MaxElapsedTime has elapsed, // set the changefeed to failed state. if m.backoffInterval == m.errBackoff.Stop { - log.Warn("The changefeed won't be restarted "+ - "as it has been experiencing failures for "+ + log.Error("The changefeed won't be restarted as it has been experiencing failures for "+ "an extended duration", - zap.Duration( - "maxElapsedTime", - m.errBackoff.MaxElapsedTime, - ), + zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime), + zap.String("namespace", m.state.ID.Namespace), + zap.String("changefeed", m.state.ID.ID), + zap.Time("lastRetryTime", m.lastErrorRetryTime), + zap.Uint64("lastRetryCheckpointTs", m.lastErrorRetryCheckpointTs), ) m.shouldBeRunning = false m.patchState(model.StateFailed) @@ -563,13 +537,10 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { }) } - // If we enter into an abnormal state 'error' for this changefeed now - // but haven't seen abnormal states in a sliding window (512 ticks), - // it can be assumed that this changefeed meets a sudden change from a stable condition. - // So we can reset the exponential backoff and re-backoff from the InitialInterval. - // TODO: this detection policy should be added into unit test. - if m.isChangefeedStable() { + // The errBackoff needs to be reset before the first retry. + if !m.isRetrying { m.resetErrRetry() + m.isRetrying = true } } @@ -646,6 +617,7 @@ func (m *feedStateManager) checkAndChangeState() { zap.Uint64("checkpointTs", m.state.Status.CheckpointTs), zap.Uint64("lastRetryCheckpointTs", m.lastErrorRetryCheckpointTs)) m.patchState(model.StateNormal) + m.isRetrying = false } } diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index e91a8d8bc02..ed738e04b55 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -26,6 +26,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/upstream" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" @@ -46,19 +47,18 @@ func (p *mockPD) GetTS(_ context.Context) (int64, int64, error) { // newFeedStateManager4Test creates feedStateManager for test func newFeedStateManager4Test( - initialIntervalInMs time.Duration, - maxIntervalInMs time.Duration, - maxElapsedTimeInMs time.Duration, + initialIntervalInMs, maxIntervalInMs, maxElapsedTimeInMs int, multiplier float64, ) *feedStateManager { f := new(feedStateManager) f.upstream = new(upstream.Upstream) f.upstream.PDClient = &mockPD{} + f.upstream.PDClock = pdutil.NewClock4Test() f.errBackoff = backoff.NewExponentialBackOff() - f.errBackoff.InitialInterval = initialIntervalInMs * time.Millisecond - f.errBackoff.MaxInterval = maxIntervalInMs * time.Millisecond - f.errBackoff.MaxElapsedTime = maxElapsedTimeInMs * time.Millisecond + f.errBackoff.InitialInterval = time.Duration(initialIntervalInMs) * time.Millisecond + f.errBackoff.MaxInterval = time.Duration(maxIntervalInMs) * time.Millisecond + f.errBackoff.MaxElapsedTime = time.Duration(maxElapsedTimeInMs) * time.Millisecond f.errBackoff.Multiplier = multiplier f.errBackoff.RandomizationFactor = 0 @@ -690,7 +690,7 @@ func TestBackoffNeverStops(t *testing.T) { func TestUpdateChangefeedEpoch(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) // Set a long backoff time - manager := newFeedStateManager4Test(time.Hour, time.Hour, 0, 1.0) + manager := newFeedStateManager4Test(int(time.Hour), int(time.Hour), 0, 1.0) state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(t, state, nil) @@ -739,3 +739,199 @@ func TestUpdateChangefeedEpoch(t *testing.T) { } } } + +func TestHandleWarning(t *testing.T) { + ctx := cdcContext.NewBackendContext4Test(true) + manager := newFeedStateManager4Test(200, 1600, 0, 2.0) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) + tester := orchestrator.NewReactorStateTester(t, state, nil) + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + require.Nil(t, info) + return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil + }) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.Nil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + + tester.MustApplyPatches() + manager.Tick(state, 0) + tester.MustApplyPatches() + require.Equal(t, model.StateNormal, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 1. test when an warning occurs, the changefeed state will be changed to warning + // and it will still keep running + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 0) + // some patches will be generated when the manager.Tick is called + // so we need to apply the patches before we check the state + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 2. test when the changefeed is in warning state, and the checkpointTs is not progressing, + // the changefeed state will remain warning + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 0) + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 3. test when the changefeed is in warning state, and the checkpointTs is progressing, + // the changefeed state will be changed to normal + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 201, + }, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 0) + tester.MustApplyPatches() + require.Equal(t, model.StateNormal, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 4. test when the changefeed is in warning state, and the checkpointTs is not progressing + // for defaultBackoffMaxElapsedTime time, the changefeed state will be changed to failed + // and it will stop running + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 0) + // some patches will be generated when the manager.Tick is called + // so we need to apply the patches before we check the state + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + // mock the checkpointTs is not progressing for defaultBackoffMaxElapsedTime time + manager.checkpointTsAdvanced = manager. + checkpointTsAdvanced.Add(-(defaultBackoffMaxElapsedTime + 1)) + // resolveTs = 202 > checkpointTs = 201 + manager.Tick(state, 202) + // some patches will be generated when the manager.Tick is called + // so we need to apply the patches before we check the state + tester.MustApplyPatches() + require.Equal(t, model.StateFailed, state.Info.State) + require.False(t, manager.ShouldRunning()) +} + +func TestErrorAfterWarning(t *testing.T) { + t.Parallel() + + maxElapsedTimeInMs := 2000 + ctx := cdcContext.NewBackendContext4Test(true) + manager := newFeedStateManager4Test(200, 1600, maxElapsedTimeInMs, 2.0) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) + tester := orchestrator.NewReactorStateTester(t, state, nil) + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + require.Nil(t, info) + return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil + }) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.Nil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + + tester.MustApplyPatches() + manager.Tick(state, 0) + tester.MustApplyPatches() + require.Equal(t, model.StateNormal, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 1. test when an warning occurs, the changefeed state will be changed to warning + // and it will still keep running + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 0) + // some patches will be generated when the manager.Tick is called + // so we need to apply the patches before we check the state + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 2. test when the changefeed is in warning state, and the checkpointTs is not progressing, + // the changefeed state will remain warning + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 0) + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 3. Sleep maxElapsedTimeInMs to wait backoff timeout. And when an error occurs after an warning, + // the backoff will be reseted, and changefeed state will be changed to warning and it will still + // keep running. + time.Sleep(time.Millisecond * time.Duration(maxElapsedTimeInMs)) + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Error: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + + manager.Tick(state, 0) + // some patches will be generated when the manager.Tick is called + // so we need to apply the patches before we check the state + tester.MustApplyPatches() + require.Equal(t, model.StatePending, state.Info.State) + require.False(t, manager.ShouldRunning()) + manager.Tick(state, 0) + + // some patches will be generated when the manager.Tick is called + // so we need to apply the patches before we check the state + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) +}