From 00facb8ccb30785c9f7dce128d9ef5ee8c1284b4 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Mon, 16 Oct 2023 18:46:59 +0800 Subject: [PATCH] This is an automated cherry-pick of #9893 Signed-off-by: ti-chi-bot --- cdc/owner/feed_state_manager.go | 105 +++++++++++++++++------- cdc/owner/feed_state_manager_test.go | 116 +++++++++++++++++++++++++++ cdc/owner/owner.go | 7 ++ 3 files changed, 200 insertions(+), 28 deletions(-) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 30c62e86aff..a1ee60c9d5c 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -63,11 +63,16 @@ type feedStateManager struct { // resolvedTs and initCheckpointTs is for checking whether resolved timestamp // has been advanced or not. - resolvedTs model.Ts - initCheckpointTs model.Ts + resolvedTs model.Ts + checkpointTs model.Ts + checkpointTsAdvanced time.Time +<<<<<<< HEAD checkpointTsAdvanced time.Time lastCheckpointTs model.Ts +======= + changefeedErrorStuckDuration time.Duration +>>>>>>> 432828b5b4 (reset internal state of feedStateManager when shouldBeRunning is false (#9893)) } // newFeedStateManager creates feedStateManager and initialize the exponential backoff @@ -83,34 +88,65 @@ func newFeedStateManager(up *upstream.Upstream) *feedStateManager { // backoff will stop once the defaultBackoffMaxElapsedTime has elapsed. f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime +<<<<<<< HEAD f.resetErrRetry() return f +======= + m.resetErrRetry() + m.isRetrying = false + return m +} + +func (m *feedStateManager) shouldRetry() bool { + // changefeed should not retry within [m.lastErrorRetryTime, m.lastErrorRetryTime + m.backoffInterval). + return time.Since(m.lastErrorRetryTime) >= m.backoffInterval +} + +func (m *feedStateManager) shouldFailWhenRetry() bool { + // retry the changefeed + m.backoffInterval = m.errBackoff.NextBackOff() + // NextBackOff() will return -1 once the MaxElapsedTime has elapsed, + // set the changefeed to failed state. + if m.backoffInterval == m.errBackoff.Stop { + return true + } + + m.lastErrorRetryTime = time.Now() + return false +} + +// resetErrRetry reset the error retry related fields +func (m *feedStateManager) resetErrRetry() { + m.errBackoff.Reset() + m.backoffInterval = m.errBackoff.NextBackOff() + m.lastErrorRetryTime = time.Unix(0, 0) +>>>>>>> 432828b5b4 (reset internal state of feedStateManager when shouldBeRunning is false (#9893)) } func (m *feedStateManager) Tick( state *orchestrator.ChangefeedReactorState, resolvedTs model.Ts, ) (adminJobPending bool) { + m.checkAndInitLastRetryCheckpointTs(state.Status) + if state.Status != nil { - if m.lastCheckpointTs < state.Status.CheckpointTs { - m.lastCheckpointTs = state.Status.CheckpointTs + if m.checkpointTs < state.Status.CheckpointTs { + m.checkpointTs = state.Status.CheckpointTs m.checkpointTsAdvanced = time.Now() } - if m.state == nil || m.state.Status == nil { - // It's the first time `m.state.Status` gets filled. - m.initCheckpointTs = state.Status.CheckpointTs + if m.resolvedTs < resolvedTs { + m.resolvedTs = resolvedTs + } + if m.checkpointTs >= m.resolvedTs { + m.checkpointTsAdvanced = time.Now() } } - - m.checkAndInitLastRetryCheckpointTs(state.Status) - m.state = state - m.resolvedTs = resolvedTs m.shouldBeRunning = true defer func() { if !m.shouldBeRunning { - m.cleanUpTaskPositions() + m.cleanUp() } }() if m.handleAdminJob() { @@ -131,16 +167,12 @@ func (m *feedStateManager) Tick( m.shouldBeRunning = false return case model.StatePending: - if time.Since(m.lastErrorRetryTime) < m.backoffInterval { + if !m.shouldRetry() { m.shouldBeRunning = false return } - // retry the changefeed - oldBackoffInterval := m.backoffInterval - m.backoffInterval = m.errBackoff.NextBackOff() - // NextBackOff() will return -1 once the MaxElapsedTime has elapsed, - // set the changefeed to failed state. - if m.backoffInterval == m.errBackoff.Stop { + + if m.shouldFailWhenRetry() { log.Error("The changefeed won't be restarted as it has been experiencing failures for "+ "an extended duration", zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime), @@ -154,18 +186,17 @@ func (m *feedStateManager) Tick( return } - m.lastErrorRetryTime = time.Now() + // retry the changefeed + m.shouldBeRunning = true if m.state.Status != nil { m.lastErrorRetryCheckpointTs = m.state.Status.CheckpointTs } - m.shouldBeRunning = true m.patchState(model.StateWarning) log.Info("changefeed retry backoff interval is elapsed,"+ "chengefeed will be restarted", zap.String("namespace", m.state.ID.Namespace), zap.String("changefeed", m.state.ID.ID), zap.Time("lastErrorRetryTime", m.lastErrorRetryTime), - zap.Duration("lastRetryInterval", oldBackoffInterval), zap.Duration("nextRetryInterval", m.backoffInterval)) case model.StateNormal, model.StateWarning: m.checkAndChangeState() @@ -274,7 +305,11 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) { m.shouldBeRunning = true // when the changefeed is manually resumed, we must reset the backoff m.resetErrRetry() +<<<<<<< HEAD // The lastErrorTime also needs to be cleared before a fresh run. +======= + m.isRetrying = false +>>>>>>> 432828b5b4 (reset internal state of feedStateManager when shouldBeRunning is false (#9893)) jobsPending = true m.patchState(model.StateNormal) @@ -407,12 +442,15 @@ func (m *feedStateManager) patchState(feedState model.FeedState) { }) } -func (m *feedStateManager) cleanUpTaskPositions() { +func (m *feedStateManager) cleanUp() { for captureID := range m.state.TaskPositions { m.state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return nil, position != nil, nil }) } + m.checkpointTs = 0 + m.checkpointTsAdvanced = time.Time{} + m.resolvedTs = 0 } func (m *feedStateManager) errorsReportedByProcessors() []*model.RunningError { @@ -535,12 +573,12 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { info.Error = lastError return info, true, nil }) - } - // The errBackoff needs to be reset before the first retry. - if !m.isRetrying { - m.resetErrRetry() - m.isRetrying = true + // The errBackoff needs to be reset before the first retry. + if !m.isRetrying { + m.resetErrRetry() + m.isRetrying = true + } } } @@ -554,6 +592,7 @@ func (m *feedStateManager) handleWarning(errs ...*model.RunningError) { currTime := m.upstream.PDClock.CurrentTime() ckptTime := oracle.GetTimeFromTS(m.state.Status.CheckpointTs) m.lastWarningReportCheckpointTs = m.state.Status.CheckpointTs +<<<<<<< HEAD // Conditions: // 1. checkpoint lag is large enough; // 2. checkpoint hasn't been advanced for a long while; @@ -561,6 +600,16 @@ func (m *feedStateManager) handleWarning(errs ...*model.RunningError) { if currTime.Sub(ckptTime) > defaultBackoffMaxElapsedTime && time.Since(m.checkpointTsAdvanced) > defaultBackoffMaxElapsedTime && m.resolvedTs > m.initCheckpointTs { +======= + + checkpointTsStuck := time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration + if checkpointTsStuck { + log.Info("changefeed retry on warning for a very long time and does not resume, "+ + "it will be failed", zap.String("changefeed", m.state.ID.ID), + zap.Uint64("checkpointTs", m.state.Status.CheckpointTs), + zap.Duration("checkpointTime", currTime.Sub(ckptTime)), + ) +>>>>>>> 432828b5b4 (reset internal state of feedStateManager when shouldBeRunning is false (#9893)) code, _ := cerrors.RFCCode(cerrors.ErrChangefeedUnretryable) m.handleError(&model.RunningError{ Time: lastError.Time, diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index ed738e04b55..6710063f9fe 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -64,6 +64,8 @@ func newFeedStateManager4Test( f.resetErrRetry() + f.changefeedErrorStuckDuration = time.Second * 3 + return f } @@ -208,6 +210,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { require.Equal(t, state.Status.AdminJobType, model.AdminStop) // resume the changefeed in failed state + manager.isRetrying = true manager.PushAdminJob(&model.AdminJob{ CfID: ctx.ChangefeedVars().ID, Type: model.AdminResume, @@ -220,6 +223,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { require.Equal(t, state.Info.State, model.StateNormal) require.Equal(t, state.Info.AdminJobType, model.AdminNone) require.Equal(t, state.Status.AdminJobType, model.AdminNone) + require.False(t, manager.isRetrying) } func TestMarkFinished(t *testing.T) { @@ -743,6 +747,7 @@ func TestUpdateChangefeedEpoch(t *testing.T) { func TestHandleWarning(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) manager := newFeedStateManager4Test(200, 1600, 0, 2.0) + manager.changefeedErrorStuckDuration = 100 * time.Millisecond state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(t, state, nil) @@ -935,3 +940,114 @@ func TestErrorAfterWarning(t *testing.T) { require.Equal(t, model.StateWarning, state.Info.State) require.True(t, manager.ShouldRunning()) } + +func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { + t.Parallel() + + maxElapsedTimeInMs := 2000 + ctx := cdcContext.NewBackendContext4Test(true) + manager := newFeedStateManager4Test(200, 1600, maxElapsedTimeInMs, 2.0) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) + tester := orchestrator.NewReactorStateTester(t, state, nil) + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + require.Nil(t, info) + return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil + }) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.Nil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + + tester.MustApplyPatches() + manager.Tick(state, 200) + tester.MustApplyPatches() + require.Equal(t, model.StateNormal, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 1. test when an warning occurs, the changefeed state will be changed to warning + // and it will still keep running + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 200) + // some patches will be generated when the manager.Tick is called + // so we need to apply the patches before we check the state + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 2. test when the changefeed is in warning state, and the resolvedTs and checkpointTs is not progressing, + // the changefeed state will remain warning whena new warning is encountered. + time.Sleep(manager.changefeedErrorStuckDuration + 10) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 200) + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 3. Test changefeed remain warning when resolvedTs is progressing after stuck beyond the detection time. + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 400) + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 4. Test changefeed failed when checkpointTs is not progressing for changefeedErrorStuckDuration time. + time.Sleep(manager.changefeedErrorStuckDuration + 10) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 400) + tester.MustApplyPatches() + require.Equal(t, model.StateFailed, state.Info.State) + require.False(t, manager.ShouldRunning()) +} diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 54ca0aed646..90ec5ca2d31 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -194,6 +194,13 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt // Tick all changefeeds. ctx := stdCtx.(cdcContext.Context) for changefeedID, changefeedState := range state.Changefeeds { +<<<<<<< HEAD +======= + // check if we are the changefeed owner to handle this changefeed + if !o.shouldHandleChangefeed(changefeedState) { + continue + } +>>>>>>> 432828b5b4 (reset internal state of feedStateManager when shouldBeRunning is false (#9893)) if changefeedState.Info == nil { o.cleanUpChangefeed(changefeedState) if cfReactor, ok := o.changefeeds[changefeedID]; ok {