diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 64ccf60bb9e..b469d03ff43 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -63,11 +63,10 @@ type feedStateManager struct { // resolvedTs and initCheckpointTs is for checking whether resolved timestamp // has been advanced or not. - resolvedTs model.Ts - initCheckpointTs model.Ts + resolvedTs model.Ts + checkpointTs model.Ts + checkpointTsAdvanced time.Time - checkpointTsAdvanced time.Time - lastCheckpointTs model.Ts changefeedErrorStuckDuration time.Duration } @@ -86,9 +85,28 @@ func newFeedStateManager(up *upstream.Upstream, cfg *config.ReplicaConfig) *feed m.changefeedErrorStuckDuration = *cfg.ChangefeedErrorStuckDuration m.resetErrRetry() + m.isRetrying = false return m } +func (m *feedStateManager) shouldRetry() bool { + // changefeed should not retry within [m.lastErrorRetryTime, m.lastErrorRetryTime + m.backoffInterval). + return time.Since(m.lastErrorRetryTime) >= m.backoffInterval +} + +func (m *feedStateManager) shouldFailWhenRetry() bool { + // retry the changefeed + m.backoffInterval = m.errBackoff.NextBackOff() + // NextBackOff() will return -1 once the MaxElapsedTime has elapsed, + // set the changefeed to failed state. + if m.backoffInterval == m.errBackoff.Stop { + return true + } + + m.lastErrorRetryTime = time.Now() + return false +} + // resetErrRetry reset the error retry related fields func (m *feedStateManager) resetErrRetry() { m.errBackoff.Reset() @@ -100,25 +118,25 @@ func (m *feedStateManager) Tick( state *orchestrator.ChangefeedReactorState, resolvedTs model.Ts, ) (adminJobPending bool) { + m.checkAndInitLastRetryCheckpointTs(state.Status) + if state.Status != nil { - if m.lastCheckpointTs < state.Status.CheckpointTs { - m.lastCheckpointTs = state.Status.CheckpointTs + if m.checkpointTs < state.Status.CheckpointTs { + m.checkpointTs = state.Status.CheckpointTs m.checkpointTsAdvanced = time.Now() } - if m.state == nil || m.state.Status == nil { - // It's the first time `m.state.Status` gets filled. - m.initCheckpointTs = state.Status.CheckpointTs + if m.resolvedTs < resolvedTs { + m.resolvedTs = resolvedTs + } + if m.checkpointTs >= m.resolvedTs { + m.checkpointTsAdvanced = time.Now() } } - - m.checkAndInitLastRetryCheckpointTs(state.Status) - m.state = state - m.resolvedTs = resolvedTs m.shouldBeRunning = true defer func() { if !m.shouldBeRunning { - m.cleanUpTaskPositions() + m.cleanUp() } }() @@ -141,16 +159,12 @@ func (m *feedStateManager) Tick( m.shouldBeRunning = false return case model.StatePending: - if time.Since(m.lastErrorRetryTime) < m.backoffInterval { + if !m.shouldRetry() { m.shouldBeRunning = false return } - // retry the changefeed - oldBackoffInterval := m.backoffInterval - m.backoffInterval = m.errBackoff.NextBackOff() - // NextBackOff() will return -1 once the MaxElapsedTime has elapsed, - // set the changefeed to failed state. - if m.backoffInterval == m.errBackoff.Stop { + + if m.shouldFailWhenRetry() { log.Error("The changefeed won't be restarted as it has been experiencing failures for "+ "an extended duration", zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime), @@ -164,18 +178,17 @@ func (m *feedStateManager) Tick( return } - m.lastErrorRetryTime = time.Now() + // retry the changefeed + m.shouldBeRunning = true if m.state.Status != nil { m.lastErrorRetryCheckpointTs = m.state.Status.CheckpointTs } - m.shouldBeRunning = true m.patchState(model.StateWarning) log.Info("changefeed retry backoff interval is elapsed,"+ "chengefeed will be restarted", zap.String("namespace", m.state.ID.Namespace), zap.String("changefeed", m.state.ID.ID), zap.Time("lastErrorRetryTime", m.lastErrorRetryTime), - zap.Duration("lastRetryInterval", oldBackoffInterval), zap.Duration("nextRetryInterval", m.backoffInterval)) case model.StateNormal, model.StateWarning: m.checkAndChangeState() @@ -284,6 +297,7 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) { m.shouldBeRunning = true // when the changefeed is manually resumed, we must reset the backoff m.resetErrRetry() + m.isRetrying = false jobsPending = true m.patchState(model.StateNormal) @@ -416,12 +430,15 @@ func (m *feedStateManager) patchState(feedState model.FeedState) { }) } -func (m *feedStateManager) cleanUpTaskPositions() { +func (m *feedStateManager) cleanUp() { for captureID := range m.state.TaskPositions { m.state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return nil, position != nil, nil }) } + m.checkpointTs = 0 + m.checkpointTsAdvanced = time.Time{} + m.resolvedTs = 0 } func (m *feedStateManager) errorsReportedByProcessors() []*model.RunningError { @@ -546,12 +563,12 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { info.Error = lastError return info, true, nil }) - } - // The errBackoff needs to be reset before the first retry. - if !m.isRetrying { - m.resetErrRetry() - m.isRetrying = true + // The errBackoff needs to be reset before the first retry. + if !m.isRetrying { + m.resetErrRetry() + m.isRetrying = true + } } } @@ -565,13 +582,9 @@ func (m *feedStateManager) handleWarning(errs ...*model.RunningError) { currTime, _ := m.upstream.PDClock.CurrentTime() ckptTime := oracle.GetTimeFromTS(m.state.Status.CheckpointTs) m.lastWarningReportCheckpointTs = m.state.Status.CheckpointTs - // Conditions: - // 1. checkpoint lag is large enough; - // 2. checkpoint hasn't been advanced for a long while; - // 3. the changefeed has been initialized. - if currTime.Sub(ckptTime) > m.changefeedErrorStuckDuration && - time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration && - m.resolvedTs > m.initCheckpointTs { + + checkpointTsStuck := time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration + if checkpointTsStuck { log.Info("changefeed retry on warning for a very long time and does not resume, "+ "it will be failed", zap.String("changefeed", m.state.ID.ID), zap.Uint64("checkpointTs", m.state.Status.CheckpointTs), diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 6c90ad93f12..e81ae7584ac 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -64,6 +64,8 @@ func newFeedStateManager4Test( f.resetErrRetry() + f.changefeedErrorStuckDuration = time.Second * 3 + return f } @@ -208,6 +210,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { require.Equal(t, state.Status.AdminJobType, model.AdminStop) // resume the changefeed in failed state + manager.isRetrying = true manager.PushAdminJob(&model.AdminJob{ CfID: ctx.ChangefeedVars().ID, Type: model.AdminResume, @@ -220,6 +223,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { require.Equal(t, state.Info.State, model.StateNormal) require.Equal(t, state.Info.AdminJobType, model.AdminNone) require.Equal(t, state.Status.AdminJobType, model.AdminNone) + require.False(t, manager.isRetrying) } func TestMarkFinished(t *testing.T) { @@ -742,6 +746,7 @@ func TestUpdateChangefeedEpoch(t *testing.T) { func TestHandleWarning(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) manager := newFeedStateManager4Test(200, 1600, 0, 2.0) + manager.changefeedErrorStuckDuration = 100 * time.Millisecond state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(t, state, nil) @@ -934,3 +939,114 @@ func TestErrorAfterWarning(t *testing.T) { require.Equal(t, model.StateWarning, state.Info.State) require.True(t, manager.ShouldRunning()) } + +func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { + t.Parallel() + + maxElapsedTimeInMs := 2000 + ctx := cdcContext.NewBackendContext4Test(true) + manager := newFeedStateManager4Test(200, 1600, maxElapsedTimeInMs, 2.0) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) + tester := orchestrator.NewReactorStateTester(t, state, nil) + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + require.Nil(t, info) + return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil + }) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.Nil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + + tester.MustApplyPatches() + manager.Tick(state, 200) + tester.MustApplyPatches() + require.Equal(t, model.StateNormal, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 1. test when an warning occurs, the changefeed state will be changed to warning + // and it will still keep running + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 200) + // some patches will be generated when the manager.Tick is called + // so we need to apply the patches before we check the state + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 2. test when the changefeed is in warning state, and the resolvedTs and checkpointTs is not progressing, + // the changefeed state will remain warning whena new warning is encountered. + time.Sleep(manager.changefeedErrorStuckDuration + 10) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 200) + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 3. Test changefeed remain warning when resolvedTs is progressing after stuck beyond the detection time. + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 400) + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 4. Test changefeed failed when checkpointTs is not progressing for changefeedErrorStuckDuration time. + time.Sleep(manager.changefeedErrorStuckDuration + 10) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 400) + tester.MustApplyPatches() + require.Equal(t, model.StateFailed, state.Info.State) + require.False(t, manager.ShouldRunning()) +}