From cb256d79dc775f729de822362f916b8230568caf Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Mon, 16 Oct 2023 18:46:59 +0800 Subject: [PATCH] reset internal state of feedStateManager when shouldBeRunning is false (#9893) close pingcap/tiflow#9892 --- cdc/owner/feed_state_manager.go | 109 ++++++++++++++----------- cdc/owner/feed_state_manager_test.go | 116 +++++++++++++++++++++++++++ 2 files changed, 179 insertions(+), 46 deletions(-) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 30c62e86aff..85c939d9594 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -63,11 +63,11 @@ type feedStateManager struct { // resolvedTs and initCheckpointTs is for checking whether resolved timestamp // has been advanced or not. - resolvedTs model.Ts - initCheckpointTs model.Ts - + resolvedTs model.Ts + checkpointTs model.Ts checkpointTsAdvanced time.Time - lastCheckpointTs model.Ts + + changefeedErrorStuckDuration time.Duration } // newFeedStateManager creates feedStateManager and initialize the exponential backoff @@ -84,33 +84,58 @@ func newFeedStateManager(up *upstream.Upstream) *feedStateManager { f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime f.resetErrRetry() - + f.isRetrying = false return f } +func (m *feedStateManager) shouldRetry() bool { + // changefeed should not retry within [m.lastErrorRetryTime, m.lastErrorRetryTime + m.backoffInterval). + return time.Since(m.lastErrorRetryTime) >= m.backoffInterval +} + +func (m *feedStateManager) shouldFailWhenRetry() bool { + // retry the changefeed + m.backoffInterval = m.errBackoff.NextBackOff() + // NextBackOff() will return -1 once the MaxElapsedTime has elapsed, + // set the changefeed to failed state. + if m.backoffInterval == m.errBackoff.Stop { + return true + } + + m.lastErrorRetryTime = time.Now() + return false +} + +// resetErrRetry reset the error retry related fields +func (m *feedStateManager) resetErrRetry() { + m.errBackoff.Reset() + m.backoffInterval = m.errBackoff.NextBackOff() + m.lastErrorRetryTime = time.Unix(0, 0) +} + func (m *feedStateManager) Tick( state *orchestrator.ChangefeedReactorState, resolvedTs model.Ts, ) (adminJobPending bool) { + m.checkAndInitLastRetryCheckpointTs(state.Status) + if state.Status != nil { - if m.lastCheckpointTs < state.Status.CheckpointTs { - m.lastCheckpointTs = state.Status.CheckpointTs + if m.checkpointTs < state.Status.CheckpointTs { + m.checkpointTs = state.Status.CheckpointTs m.checkpointTsAdvanced = time.Now() } - if m.state == nil || m.state.Status == nil { - // It's the first time `m.state.Status` gets filled. - m.initCheckpointTs = state.Status.CheckpointTs + if m.resolvedTs < resolvedTs { + m.resolvedTs = resolvedTs + } + if m.checkpointTs >= m.resolvedTs { + m.checkpointTsAdvanced = time.Now() } } - - m.checkAndInitLastRetryCheckpointTs(state.Status) - m.state = state - m.resolvedTs = resolvedTs m.shouldBeRunning = true defer func() { if !m.shouldBeRunning { - m.cleanUpTaskPositions() + m.cleanUp() } }() if m.handleAdminJob() { @@ -131,16 +156,12 @@ func (m *feedStateManager) Tick( m.shouldBeRunning = false return case model.StatePending: - if time.Since(m.lastErrorRetryTime) < m.backoffInterval { + if !m.shouldRetry() { m.shouldBeRunning = false return } - // retry the changefeed - oldBackoffInterval := m.backoffInterval - m.backoffInterval = m.errBackoff.NextBackOff() - // NextBackOff() will return -1 once the MaxElapsedTime has elapsed, - // set the changefeed to failed state. - if m.backoffInterval == m.errBackoff.Stop { + + if m.shouldFailWhenRetry() { log.Error("The changefeed won't be restarted as it has been experiencing failures for "+ "an extended duration", zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime), @@ -154,18 +175,17 @@ func (m *feedStateManager) Tick( return } - m.lastErrorRetryTime = time.Now() + // retry the changefeed + m.shouldBeRunning = true if m.state.Status != nil { m.lastErrorRetryCheckpointTs = m.state.Status.CheckpointTs } - m.shouldBeRunning = true m.patchState(model.StateWarning) log.Info("changefeed retry backoff interval is elapsed,"+ "chengefeed will be restarted", zap.String("namespace", m.state.ID.Namespace), zap.String("changefeed", m.state.ID.ID), zap.Time("lastErrorRetryTime", m.lastErrorRetryTime), - zap.Duration("lastRetryInterval", oldBackoffInterval), zap.Duration("nextRetryInterval", m.backoffInterval)) case model.StateNormal, model.StateWarning: m.checkAndChangeState() @@ -274,7 +294,7 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) { m.shouldBeRunning = true // when the changefeed is manually resumed, we must reset the backoff m.resetErrRetry() - // The lastErrorTime also needs to be cleared before a fresh run. + m.isRetrying = false jobsPending = true m.patchState(model.StateNormal) @@ -407,12 +427,15 @@ func (m *feedStateManager) patchState(feedState model.FeedState) { }) } -func (m *feedStateManager) cleanUpTaskPositions() { +func (m *feedStateManager) cleanUp() { for captureID := range m.state.TaskPositions { m.state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return nil, position != nil, nil }) } + m.checkpointTs = 0 + m.checkpointTsAdvanced = time.Time{} + m.resolvedTs = 0 } func (m *feedStateManager) errorsReportedByProcessors() []*model.RunningError { @@ -535,12 +558,12 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { info.Error = lastError return info, true, nil }) - } - // The errBackoff needs to be reset before the first retry. - if !m.isRetrying { - m.resetErrRetry() - m.isRetrying = true + // The errBackoff needs to be reset before the first retry. + if !m.isRetrying { + m.resetErrRetry() + m.isRetrying = true + } } } @@ -554,13 +577,14 @@ func (m *feedStateManager) handleWarning(errs ...*model.RunningError) { currTime := m.upstream.PDClock.CurrentTime() ckptTime := oracle.GetTimeFromTS(m.state.Status.CheckpointTs) m.lastWarningReportCheckpointTs = m.state.Status.CheckpointTs - // Conditions: - // 1. checkpoint lag is large enough; - // 2. checkpoint hasn't been advanced for a long while; - // 3. the changefeed has been initialized. - if currTime.Sub(ckptTime) > defaultBackoffMaxElapsedTime && - time.Since(m.checkpointTsAdvanced) > defaultBackoffMaxElapsedTime && - m.resolvedTs > m.initCheckpointTs { + + checkpointTsStuck := time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration + if checkpointTsStuck { + log.Info("changefeed retry on warning for a very long time and does not resume, "+ + "it will be failed", zap.String("changefeed", m.state.ID.ID), + zap.Uint64("checkpointTs", m.state.Status.CheckpointTs), + zap.Duration("checkpointTime", currTime.Sub(ckptTime)), + ) code, _ := cerrors.RFCCode(cerrors.ErrChangefeedUnretryable) m.handleError(&model.RunningError{ Time: lastError.Time, @@ -592,13 +616,6 @@ func GenerateChangefeedEpoch(ctx context.Context, pdClient pd.Client) uint64 { return oracle.ComposeTS(phyTs, logical) } -// resetErrRetry reset the error retry related fields -func (m *feedStateManager) resetErrRetry() { - m.errBackoff.Reset() - m.backoffInterval = m.errBackoff.NextBackOff() - m.lastErrorRetryTime = time.Unix(0, 0) -} - // checkAndChangeState checks the state of the changefeed and change it if needed. // if the state of the changefeed is warning and the changefeed's checkpointTs is // greater than the lastRetryCheckpointTs, it will change the state to normal. diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index ed738e04b55..6710063f9fe 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -64,6 +64,8 @@ func newFeedStateManager4Test( f.resetErrRetry() + f.changefeedErrorStuckDuration = time.Second * 3 + return f } @@ -208,6 +210,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { require.Equal(t, state.Status.AdminJobType, model.AdminStop) // resume the changefeed in failed state + manager.isRetrying = true manager.PushAdminJob(&model.AdminJob{ CfID: ctx.ChangefeedVars().ID, Type: model.AdminResume, @@ -220,6 +223,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { require.Equal(t, state.Info.State, model.StateNormal) require.Equal(t, state.Info.AdminJobType, model.AdminNone) require.Equal(t, state.Status.AdminJobType, model.AdminNone) + require.False(t, manager.isRetrying) } func TestMarkFinished(t *testing.T) { @@ -743,6 +747,7 @@ func TestUpdateChangefeedEpoch(t *testing.T) { func TestHandleWarning(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) manager := newFeedStateManager4Test(200, 1600, 0, 2.0) + manager.changefeedErrorStuckDuration = 100 * time.Millisecond state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(t, state, nil) @@ -935,3 +940,114 @@ func TestErrorAfterWarning(t *testing.T) { require.Equal(t, model.StateWarning, state.Info.State) require.True(t, manager.ShouldRunning()) } + +func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { + t.Parallel() + + maxElapsedTimeInMs := 2000 + ctx := cdcContext.NewBackendContext4Test(true) + manager := newFeedStateManager4Test(200, 1600, maxElapsedTimeInMs, 2.0) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) + tester := orchestrator.NewReactorStateTester(t, state, nil) + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + require.Nil(t, info) + return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil + }) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.Nil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + + tester.MustApplyPatches() + manager.Tick(state, 200) + tester.MustApplyPatches() + require.Equal(t, model.StateNormal, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 1. test when an warning occurs, the changefeed state will be changed to warning + // and it will still keep running + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 200) + // some patches will be generated when the manager.Tick is called + // so we need to apply the patches before we check the state + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 2. test when the changefeed is in warning state, and the resolvedTs and checkpointTs is not progressing, + // the changefeed state will remain warning whena new warning is encountered. + time.Sleep(manager.changefeedErrorStuckDuration + 10) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 200) + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 3. Test changefeed remain warning when resolvedTs is progressing after stuck beyond the detection time. + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 400) + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 4. Test changefeed failed when checkpointTs is not progressing for changefeedErrorStuckDuration time. + time.Sleep(manager.changefeedErrorStuckDuration + 10) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 400) + tester.MustApplyPatches() + require.Equal(t, model.StateFailed, state.Info.State) + require.False(t, manager.ShouldRunning()) +}