Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#9893
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Oct 16, 2023
1 parent f96213d commit 00facb8
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 28 deletions.
105 changes: 77 additions & 28 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,16 @@ type feedStateManager struct {

// resolvedTs and initCheckpointTs is for checking whether resolved timestamp
// has been advanced or not.
resolvedTs model.Ts
initCheckpointTs model.Ts
resolvedTs model.Ts
checkpointTs model.Ts
checkpointTsAdvanced time.Time

<<<<<<< HEAD
checkpointTsAdvanced time.Time
lastCheckpointTs model.Ts
=======
changefeedErrorStuckDuration time.Duration
>>>>>>> 432828b5b4 (reset internal state of feedStateManager when shouldBeRunning is false (#9893))
}

// newFeedStateManager creates feedStateManager and initialize the exponential backoff
Expand All @@ -83,34 +88,65 @@ func newFeedStateManager(up *upstream.Upstream) *feedStateManager {
// backoff will stop once the defaultBackoffMaxElapsedTime has elapsed.
f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime

<<<<<<< HEAD
f.resetErrRetry()

return f
=======
m.resetErrRetry()
m.isRetrying = false
return m
}

func (m *feedStateManager) shouldRetry() bool {
// changefeed should not retry within [m.lastErrorRetryTime, m.lastErrorRetryTime + m.backoffInterval).
return time.Since(m.lastErrorRetryTime) >= m.backoffInterval
}

func (m *feedStateManager) shouldFailWhenRetry() bool {
// retry the changefeed
m.backoffInterval = m.errBackoff.NextBackOff()
// NextBackOff() will return -1 once the MaxElapsedTime has elapsed,
// set the changefeed to failed state.
if m.backoffInterval == m.errBackoff.Stop {
return true
}

m.lastErrorRetryTime = time.Now()
return false
}

// resetErrRetry reset the error retry related fields
func (m *feedStateManager) resetErrRetry() {
m.errBackoff.Reset()
m.backoffInterval = m.errBackoff.NextBackOff()
m.lastErrorRetryTime = time.Unix(0, 0)
>>>>>>> 432828b5b4 (reset internal state of feedStateManager when shouldBeRunning is false (#9893))
}

func (m *feedStateManager) Tick(
state *orchestrator.ChangefeedReactorState,
resolvedTs model.Ts,
) (adminJobPending bool) {
m.checkAndInitLastRetryCheckpointTs(state.Status)

if state.Status != nil {
if m.lastCheckpointTs < state.Status.CheckpointTs {
m.lastCheckpointTs = state.Status.CheckpointTs
if m.checkpointTs < state.Status.CheckpointTs {
m.checkpointTs = state.Status.CheckpointTs
m.checkpointTsAdvanced = time.Now()
}
if m.state == nil || m.state.Status == nil {
// It's the first time `m.state.Status` gets filled.
m.initCheckpointTs = state.Status.CheckpointTs
if m.resolvedTs < resolvedTs {
m.resolvedTs = resolvedTs
}
if m.checkpointTs >= m.resolvedTs {
m.checkpointTsAdvanced = time.Now()
}
}

m.checkAndInitLastRetryCheckpointTs(state.Status)

m.state = state
m.resolvedTs = resolvedTs
m.shouldBeRunning = true
defer func() {
if !m.shouldBeRunning {
m.cleanUpTaskPositions()
m.cleanUp()
}
}()
if m.handleAdminJob() {
Expand All @@ -131,16 +167,12 @@ func (m *feedStateManager) Tick(
m.shouldBeRunning = false
return
case model.StatePending:
if time.Since(m.lastErrorRetryTime) < m.backoffInterval {
if !m.shouldRetry() {
m.shouldBeRunning = false
return
}
// retry the changefeed
oldBackoffInterval := m.backoffInterval
m.backoffInterval = m.errBackoff.NextBackOff()
// NextBackOff() will return -1 once the MaxElapsedTime has elapsed,
// set the changefeed to failed state.
if m.backoffInterval == m.errBackoff.Stop {

if m.shouldFailWhenRetry() {
log.Error("The changefeed won't be restarted as it has been experiencing failures for "+
"an extended duration",
zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime),
Expand All @@ -154,18 +186,17 @@ func (m *feedStateManager) Tick(
return
}

m.lastErrorRetryTime = time.Now()
// retry the changefeed
m.shouldBeRunning = true
if m.state.Status != nil {
m.lastErrorRetryCheckpointTs = m.state.Status.CheckpointTs
}
m.shouldBeRunning = true
m.patchState(model.StateWarning)
log.Info("changefeed retry backoff interval is elapsed,"+
"chengefeed will be restarted",
zap.String("namespace", m.state.ID.Namespace),
zap.String("changefeed", m.state.ID.ID),
zap.Time("lastErrorRetryTime", m.lastErrorRetryTime),
zap.Duration("lastRetryInterval", oldBackoffInterval),
zap.Duration("nextRetryInterval", m.backoffInterval))
case model.StateNormal, model.StateWarning:
m.checkAndChangeState()
Expand Down Expand Up @@ -274,7 +305,11 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
m.shouldBeRunning = true
// when the changefeed is manually resumed, we must reset the backoff
m.resetErrRetry()
<<<<<<< HEAD
// The lastErrorTime also needs to be cleared before a fresh run.
=======
m.isRetrying = false
>>>>>>> 432828b5b4 (reset internal state of feedStateManager when shouldBeRunning is false (#9893))
jobsPending = true
m.patchState(model.StateNormal)

Expand Down Expand Up @@ -407,12 +442,15 @@ func (m *feedStateManager) patchState(feedState model.FeedState) {
})
}

func (m *feedStateManager) cleanUpTaskPositions() {
func (m *feedStateManager) cleanUp() {
for captureID := range m.state.TaskPositions {
m.state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return nil, position != nil, nil
})
}
m.checkpointTs = 0
m.checkpointTsAdvanced = time.Time{}
m.resolvedTs = 0
}

func (m *feedStateManager) errorsReportedByProcessors() []*model.RunningError {
Expand Down Expand Up @@ -535,12 +573,12 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
info.Error = lastError
return info, true, nil
})
}

// The errBackoff needs to be reset before the first retry.
if !m.isRetrying {
m.resetErrRetry()
m.isRetrying = true
// The errBackoff needs to be reset before the first retry.
if !m.isRetrying {
m.resetErrRetry()
m.isRetrying = true
}
}
}

Expand All @@ -554,13 +592,24 @@ func (m *feedStateManager) handleWarning(errs ...*model.RunningError) {
currTime := m.upstream.PDClock.CurrentTime()
ckptTime := oracle.GetTimeFromTS(m.state.Status.CheckpointTs)
m.lastWarningReportCheckpointTs = m.state.Status.CheckpointTs
<<<<<<< HEAD
// Conditions:
// 1. checkpoint lag is large enough;
// 2. checkpoint hasn't been advanced for a long while;
// 3. the changefeed has been initialized.
if currTime.Sub(ckptTime) > defaultBackoffMaxElapsedTime &&
time.Since(m.checkpointTsAdvanced) > defaultBackoffMaxElapsedTime &&
m.resolvedTs > m.initCheckpointTs {
=======

checkpointTsStuck := time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration
if checkpointTsStuck {
log.Info("changefeed retry on warning for a very long time and does not resume, "+
"it will be failed", zap.String("changefeed", m.state.ID.ID),
zap.Uint64("checkpointTs", m.state.Status.CheckpointTs),
zap.Duration("checkpointTime", currTime.Sub(ckptTime)),
)
>>>>>>> 432828b5b4 (reset internal state of feedStateManager when shouldBeRunning is false (#9893))
code, _ := cerrors.RFCCode(cerrors.ErrChangefeedUnretryable)
m.handleError(&model.RunningError{
Time: lastError.Time,
Expand Down
116 changes: 116 additions & 0 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func newFeedStateManager4Test(

f.resetErrRetry()

f.changefeedErrorStuckDuration = time.Second * 3

return f
}

Expand Down Expand Up @@ -208,6 +210,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) {
require.Equal(t, state.Status.AdminJobType, model.AdminStop)

// resume the changefeed in failed state
manager.isRetrying = true
manager.PushAdminJob(&model.AdminJob{
CfID: ctx.ChangefeedVars().ID,
Type: model.AdminResume,
Expand All @@ -220,6 +223,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) {
require.Equal(t, state.Info.State, model.StateNormal)
require.Equal(t, state.Info.AdminJobType, model.AdminNone)
require.Equal(t, state.Status.AdminJobType, model.AdminNone)
require.False(t, manager.isRetrying)
}

func TestMarkFinished(t *testing.T) {
Expand Down Expand Up @@ -743,6 +747,7 @@ func TestUpdateChangefeedEpoch(t *testing.T) {
func TestHandleWarning(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(200, 1600, 0, 2.0)
manager.changefeedErrorStuckDuration = 100 * time.Millisecond
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
Expand Down Expand Up @@ -935,3 +940,114 @@ func TestErrorAfterWarning(t *testing.T) {
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())
}

func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) {
t.Parallel()

maxElapsedTimeInMs := 2000
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(200, 1600, maxElapsedTimeInMs, 2.0)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.Nil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})

tester.MustApplyPatches()
manager.Tick(state, 200)
tester.MustApplyPatches()
require.Equal(t, model.StateNormal, state.Info.State)
require.True(t, manager.ShouldRunning())

// 1. test when an warning occurs, the changefeed state will be changed to warning
// and it will still keep running
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 200)
// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 2. test when the changefeed is in warning state, and the resolvedTs and checkpointTs is not progressing,
// the changefeed state will remain warning whena new warning is encountered.
time.Sleep(manager.changefeedErrorStuckDuration + 10)
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 200)
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 3. Test changefeed remain warning when resolvedTs is progressing after stuck beyond the detection time.
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 400)
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 4. Test changefeed failed when checkpointTs is not progressing for changefeedErrorStuckDuration time.
time.Sleep(manager.changefeedErrorStuckDuration + 10)
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 400)
tester.MustApplyPatches()
require.Equal(t, model.StateFailed, state.Info.State)
require.False(t, manager.ShouldRunning())
}
7 changes: 7 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt
// Tick all changefeeds.
ctx := stdCtx.(cdcContext.Context)
for changefeedID, changefeedState := range state.Changefeeds {
<<<<<<< HEAD
=======
// check if we are the changefeed owner to handle this changefeed
if !o.shouldHandleChangefeed(changefeedState) {
continue
}
>>>>>>> 432828b5b4 (reset internal state of feedStateManager when shouldBeRunning is false (#9893))
if changefeedState.Info == nil {
o.cleanUpChangefeed(changefeedState)
if cfReactor, ok := o.changefeeds[changefeedID]; ok {
Expand Down

0 comments on commit 00facb8

Please sign in to comment.