Skip to content

Commit

Permalink
reset internal state of feedStateManager when shouldBeRunning is false (
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Nov 2, 2023
1 parent 4febedd commit cb256d7
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 46 deletions.
109 changes: 63 additions & 46 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ type feedStateManager struct {

// resolvedTs and initCheckpointTs is for checking whether resolved timestamp
// has been advanced or not.
resolvedTs model.Ts
initCheckpointTs model.Ts

resolvedTs model.Ts
checkpointTs model.Ts
checkpointTsAdvanced time.Time
lastCheckpointTs model.Ts

changefeedErrorStuckDuration time.Duration
}

// newFeedStateManager creates feedStateManager and initialize the exponential backoff
Expand All @@ -84,33 +84,58 @@ func newFeedStateManager(up *upstream.Upstream) *feedStateManager {
f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime

f.resetErrRetry()

f.isRetrying = false
return f
}

func (m *feedStateManager) shouldRetry() bool {
// changefeed should not retry within [m.lastErrorRetryTime, m.lastErrorRetryTime + m.backoffInterval).
return time.Since(m.lastErrorRetryTime) >= m.backoffInterval
}

func (m *feedStateManager) shouldFailWhenRetry() bool {
// retry the changefeed
m.backoffInterval = m.errBackoff.NextBackOff()
// NextBackOff() will return -1 once the MaxElapsedTime has elapsed,
// set the changefeed to failed state.
if m.backoffInterval == m.errBackoff.Stop {
return true
}

m.lastErrorRetryTime = time.Now()
return false
}

// resetErrRetry reset the error retry related fields
func (m *feedStateManager) resetErrRetry() {
m.errBackoff.Reset()
m.backoffInterval = m.errBackoff.NextBackOff()
m.lastErrorRetryTime = time.Unix(0, 0)
}

func (m *feedStateManager) Tick(
state *orchestrator.ChangefeedReactorState,
resolvedTs model.Ts,
) (adminJobPending bool) {
m.checkAndInitLastRetryCheckpointTs(state.Status)

if state.Status != nil {
if m.lastCheckpointTs < state.Status.CheckpointTs {
m.lastCheckpointTs = state.Status.CheckpointTs
if m.checkpointTs < state.Status.CheckpointTs {
m.checkpointTs = state.Status.CheckpointTs
m.checkpointTsAdvanced = time.Now()
}
if m.state == nil || m.state.Status == nil {
// It's the first time `m.state.Status` gets filled.
m.initCheckpointTs = state.Status.CheckpointTs
if m.resolvedTs < resolvedTs {
m.resolvedTs = resolvedTs
}
if m.checkpointTs >= m.resolvedTs {
m.checkpointTsAdvanced = time.Now()
}
}

m.checkAndInitLastRetryCheckpointTs(state.Status)

m.state = state
m.resolvedTs = resolvedTs
m.shouldBeRunning = true
defer func() {
if !m.shouldBeRunning {
m.cleanUpTaskPositions()
m.cleanUp()
}
}()
if m.handleAdminJob() {
Expand All @@ -131,16 +156,12 @@ func (m *feedStateManager) Tick(
m.shouldBeRunning = false
return
case model.StatePending:
if time.Since(m.lastErrorRetryTime) < m.backoffInterval {
if !m.shouldRetry() {
m.shouldBeRunning = false
return
}
// retry the changefeed
oldBackoffInterval := m.backoffInterval
m.backoffInterval = m.errBackoff.NextBackOff()
// NextBackOff() will return -1 once the MaxElapsedTime has elapsed,
// set the changefeed to failed state.
if m.backoffInterval == m.errBackoff.Stop {

if m.shouldFailWhenRetry() {
log.Error("The changefeed won't be restarted as it has been experiencing failures for "+
"an extended duration",
zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime),
Expand All @@ -154,18 +175,17 @@ func (m *feedStateManager) Tick(
return
}

m.lastErrorRetryTime = time.Now()
// retry the changefeed
m.shouldBeRunning = true
if m.state.Status != nil {
m.lastErrorRetryCheckpointTs = m.state.Status.CheckpointTs
}
m.shouldBeRunning = true
m.patchState(model.StateWarning)
log.Info("changefeed retry backoff interval is elapsed,"+
"chengefeed will be restarted",
zap.String("namespace", m.state.ID.Namespace),
zap.String("changefeed", m.state.ID.ID),
zap.Time("lastErrorRetryTime", m.lastErrorRetryTime),
zap.Duration("lastRetryInterval", oldBackoffInterval),
zap.Duration("nextRetryInterval", m.backoffInterval))
case model.StateNormal, model.StateWarning:
m.checkAndChangeState()
Expand Down Expand Up @@ -274,7 +294,7 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
m.shouldBeRunning = true
// when the changefeed is manually resumed, we must reset the backoff
m.resetErrRetry()
// The lastErrorTime also needs to be cleared before a fresh run.
m.isRetrying = false
jobsPending = true
m.patchState(model.StateNormal)

Expand Down Expand Up @@ -407,12 +427,15 @@ func (m *feedStateManager) patchState(feedState model.FeedState) {
})
}

func (m *feedStateManager) cleanUpTaskPositions() {
func (m *feedStateManager) cleanUp() {
for captureID := range m.state.TaskPositions {
m.state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return nil, position != nil, nil
})
}
m.checkpointTs = 0
m.checkpointTsAdvanced = time.Time{}
m.resolvedTs = 0
}

func (m *feedStateManager) errorsReportedByProcessors() []*model.RunningError {
Expand Down Expand Up @@ -535,12 +558,12 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
info.Error = lastError
return info, true, nil
})
}

// The errBackoff needs to be reset before the first retry.
if !m.isRetrying {
m.resetErrRetry()
m.isRetrying = true
// The errBackoff needs to be reset before the first retry.
if !m.isRetrying {
m.resetErrRetry()
m.isRetrying = true
}
}
}

Expand All @@ -554,13 +577,14 @@ func (m *feedStateManager) handleWarning(errs ...*model.RunningError) {
currTime := m.upstream.PDClock.CurrentTime()
ckptTime := oracle.GetTimeFromTS(m.state.Status.CheckpointTs)
m.lastWarningReportCheckpointTs = m.state.Status.CheckpointTs
// Conditions:
// 1. checkpoint lag is large enough;
// 2. checkpoint hasn't been advanced for a long while;
// 3. the changefeed has been initialized.
if currTime.Sub(ckptTime) > defaultBackoffMaxElapsedTime &&
time.Since(m.checkpointTsAdvanced) > defaultBackoffMaxElapsedTime &&
m.resolvedTs > m.initCheckpointTs {

checkpointTsStuck := time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration
if checkpointTsStuck {
log.Info("changefeed retry on warning for a very long time and does not resume, "+
"it will be failed", zap.String("changefeed", m.state.ID.ID),
zap.Uint64("checkpointTs", m.state.Status.CheckpointTs),
zap.Duration("checkpointTime", currTime.Sub(ckptTime)),
)
code, _ := cerrors.RFCCode(cerrors.ErrChangefeedUnretryable)
m.handleError(&model.RunningError{
Time: lastError.Time,
Expand Down Expand Up @@ -592,13 +616,6 @@ func GenerateChangefeedEpoch(ctx context.Context, pdClient pd.Client) uint64 {
return oracle.ComposeTS(phyTs, logical)
}

// resetErrRetry reset the error retry related fields
func (m *feedStateManager) resetErrRetry() {
m.errBackoff.Reset()
m.backoffInterval = m.errBackoff.NextBackOff()
m.lastErrorRetryTime = time.Unix(0, 0)
}

// checkAndChangeState checks the state of the changefeed and change it if needed.
// if the state of the changefeed is warning and the changefeed's checkpointTs is
// greater than the lastRetryCheckpointTs, it will change the state to normal.
Expand Down
116 changes: 116 additions & 0 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func newFeedStateManager4Test(

f.resetErrRetry()

f.changefeedErrorStuckDuration = time.Second * 3

return f
}

Expand Down Expand Up @@ -208,6 +210,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) {
require.Equal(t, state.Status.AdminJobType, model.AdminStop)

// resume the changefeed in failed state
manager.isRetrying = true
manager.PushAdminJob(&model.AdminJob{
CfID: ctx.ChangefeedVars().ID,
Type: model.AdminResume,
Expand All @@ -220,6 +223,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) {
require.Equal(t, state.Info.State, model.StateNormal)
require.Equal(t, state.Info.AdminJobType, model.AdminNone)
require.Equal(t, state.Status.AdminJobType, model.AdminNone)
require.False(t, manager.isRetrying)
}

func TestMarkFinished(t *testing.T) {
Expand Down Expand Up @@ -743,6 +747,7 @@ func TestUpdateChangefeedEpoch(t *testing.T) {
func TestHandleWarning(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(200, 1600, 0, 2.0)
manager.changefeedErrorStuckDuration = 100 * time.Millisecond
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
Expand Down Expand Up @@ -935,3 +940,114 @@ func TestErrorAfterWarning(t *testing.T) {
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())
}

func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) {
t.Parallel()

maxElapsedTimeInMs := 2000
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(200, 1600, maxElapsedTimeInMs, 2.0)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.Nil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})

tester.MustApplyPatches()
manager.Tick(state, 200)
tester.MustApplyPatches()
require.Equal(t, model.StateNormal, state.Info.State)
require.True(t, manager.ShouldRunning())

// 1. test when an warning occurs, the changefeed state will be changed to warning
// and it will still keep running
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 200)
// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 2. test when the changefeed is in warning state, and the resolvedTs and checkpointTs is not progressing,
// the changefeed state will remain warning whena new warning is encountered.
time.Sleep(manager.changefeedErrorStuckDuration + 10)
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 200)
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 3. Test changefeed remain warning when resolvedTs is progressing after stuck beyond the detection time.
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 400)
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 4. Test changefeed failed when checkpointTs is not progressing for changefeedErrorStuckDuration time.
time.Sleep(manager.changefeedErrorStuckDuration + 10)
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 400)
tester.MustApplyPatches()
require.Equal(t, model.StateFailed, state.Info.State)
require.False(t, manager.ShouldRunning())
}

0 comments on commit cb256d7

Please sign in to comment.