Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reset internal state of feedStateManager when shouldBeRunning is false #9893

Merged
merged 7 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 50 additions & 37 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,10 @@ type feedStateManager struct {

// resolvedTs and initCheckpointTs is for checking whether resolved timestamp
// has been advanced or not.
resolvedTs model.Ts
initCheckpointTs model.Ts
resolvedTs model.Ts
checkpointTs model.Ts
checkpointTsAdvanced time.Time

checkpointTsAdvanced time.Time
lastCheckpointTs model.Ts
changefeedErrorStuckDuration time.Duration
}

Expand All @@ -86,9 +85,28 @@ func newFeedStateManager(up *upstream.Upstream, cfg *config.ReplicaConfig) *feed
m.changefeedErrorStuckDuration = *cfg.ChangefeedErrorStuckDuration

m.resetErrRetry()
m.isRetrying = false
return m
}

func (m *feedStateManager) shouldRetry() bool {
// changefeed should not retry within [m.lastErrorRetryTime, m.lastErrorRetryTime + m.backoffInterval).
return time.Since(m.lastErrorRetryTime) >= m.backoffInterval
}

func (m *feedStateManager) shouldFailWhenRetry() bool {
// retry the changefeed
m.backoffInterval = m.errBackoff.NextBackOff()
// NextBackOff() will return -1 once the MaxElapsedTime has elapsed,
// set the changefeed to failed state.
if m.backoffInterval == m.errBackoff.Stop {
return true
}

m.lastErrorRetryTime = time.Now()
return false
}

// resetErrRetry reset the error retry related fields
func (m *feedStateManager) resetErrRetry() {
m.errBackoff.Reset()
Expand All @@ -100,25 +118,25 @@ func (m *feedStateManager) Tick(
state *orchestrator.ChangefeedReactorState,
resolvedTs model.Ts,
) (adminJobPending bool) {
m.checkAndInitLastRetryCheckpointTs(state.Status)

if state.Status != nil {
if m.lastCheckpointTs < state.Status.CheckpointTs {
m.lastCheckpointTs = state.Status.CheckpointTs
if m.checkpointTs < state.Status.CheckpointTs {
m.checkpointTs = state.Status.CheckpointTs
m.checkpointTsAdvanced = time.Now()
}
if m.state == nil || m.state.Status == nil {
// It's the first time `m.state.Status` gets filled.
m.initCheckpointTs = state.Status.CheckpointTs
if m.resolvedTs < resolvedTs {
m.resolvedTs = resolvedTs
}
if m.checkpointTs >= m.resolvedTs {
m.checkpointTsAdvanced = time.Now()
}
}

m.checkAndInitLastRetryCheckpointTs(state.Status)

m.state = state
m.resolvedTs = resolvedTs
m.shouldBeRunning = true
defer func() {
if !m.shouldBeRunning {
m.cleanUpTaskPositions()
m.cleanUp()
}
}()

Expand All @@ -141,16 +159,12 @@ func (m *feedStateManager) Tick(
m.shouldBeRunning = false
return
case model.StatePending:
if time.Since(m.lastErrorRetryTime) < m.backoffInterval {
if !m.shouldRetry() {
m.shouldBeRunning = false
return
}
// retry the changefeed
oldBackoffInterval := m.backoffInterval
m.backoffInterval = m.errBackoff.NextBackOff()
// NextBackOff() will return -1 once the MaxElapsedTime has elapsed,
// set the changefeed to failed state.
if m.backoffInterval == m.errBackoff.Stop {

if m.shouldFailWhenRetry() {
log.Error("The changefeed won't be restarted as it has been experiencing failures for "+
"an extended duration",
zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime),
Expand All @@ -164,18 +178,17 @@ func (m *feedStateManager) Tick(
return
}

m.lastErrorRetryTime = time.Now()
// retry the changefeed
m.shouldBeRunning = true
if m.state.Status != nil {
m.lastErrorRetryCheckpointTs = m.state.Status.CheckpointTs
}
m.shouldBeRunning = true
m.patchState(model.StateWarning)
log.Info("changefeed retry backoff interval is elapsed,"+
"chengefeed will be restarted",
zap.String("namespace", m.state.ID.Namespace),
zap.String("changefeed", m.state.ID.ID),
zap.Time("lastErrorRetryTime", m.lastErrorRetryTime),
zap.Duration("lastRetryInterval", oldBackoffInterval),
zap.Duration("nextRetryInterval", m.backoffInterval))
case model.StateNormal, model.StateWarning:
m.checkAndChangeState()
Expand Down Expand Up @@ -284,6 +297,7 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
m.shouldBeRunning = true
// when the changefeed is manually resumed, we must reset the backoff
m.resetErrRetry()
m.isRetrying = false
jobsPending = true
m.patchState(model.StateNormal)

Expand Down Expand Up @@ -416,12 +430,15 @@ func (m *feedStateManager) patchState(feedState model.FeedState) {
})
}

func (m *feedStateManager) cleanUpTaskPositions() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how it works?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup checkpointTs and resolvedTs before stopping, and it will be initialized when changefeed resumes.

func (m *feedStateManager) cleanUp() {
for captureID := range m.state.TaskPositions {
m.state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return nil, position != nil, nil
})
}
m.checkpointTs = 0
m.checkpointTsAdvanced = time.Time{}
m.resolvedTs = 0
}

func (m *feedStateManager) errorsReportedByProcessors() []*model.RunningError {
Expand Down Expand Up @@ -546,12 +563,12 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
info.Error = lastError
return info, true, nil
})
}

// The errBackoff needs to be reset before the first retry.
if !m.isRetrying {
m.resetErrRetry()
m.isRetrying = true
// The errBackoff needs to be reset before the first retry.
if !m.isRetrying {
m.resetErrRetry()
m.isRetrying = true
}
}
}

Expand All @@ -565,13 +582,9 @@ func (m *feedStateManager) handleWarning(errs ...*model.RunningError) {
currTime := m.upstream.PDClock.CurrentTime()
ckptTime := oracle.GetTimeFromTS(m.state.Status.CheckpointTs)
m.lastWarningReportCheckpointTs = m.state.Status.CheckpointTs
// Conditions:
// 1. checkpoint lag is large enough;
// 2. checkpoint hasn't been advanced for a long while;
// 3. the changefeed has been initialized.
if currTime.Sub(ckptTime) > m.changefeedErrorStuckDuration &&
time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration &&
m.resolvedTs > m.initCheckpointTs {

checkpointTsStuck := time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration
if checkpointTsStuck {
log.Info("changefeed retry on warning for a very long time and does not resume, "+
"it will be failed", zap.String("changefeed", m.state.ID.ID),
zap.Uint64("checkpointTs", m.state.Status.CheckpointTs),
Expand Down
116 changes: 116 additions & 0 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func newFeedStateManager4Test(

f.resetErrRetry()

f.changefeedErrorStuckDuration = time.Second * 3

return f
}

Expand Down Expand Up @@ -208,6 +210,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) {
require.Equal(t, state.Status.AdminJobType, model.AdminStop)

// resume the changefeed in failed state
manager.isRetrying = true
manager.PushAdminJob(&model.AdminJob{
CfID: ctx.ChangefeedVars().ID,
Type: model.AdminResume,
Expand All @@ -220,6 +223,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) {
require.Equal(t, state.Info.State, model.StateNormal)
require.Equal(t, state.Info.AdminJobType, model.AdminNone)
require.Equal(t, state.Status.AdminJobType, model.AdminNone)
require.False(t, manager.isRetrying)
}

func TestMarkFinished(t *testing.T) {
Expand Down Expand Up @@ -742,6 +746,7 @@ func TestUpdateChangefeedEpoch(t *testing.T) {
func TestHandleWarning(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(200, 1600, 0, 2.0)
manager.changefeedErrorStuckDuration = 100 * time.Millisecond
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
Expand Down Expand Up @@ -934,3 +939,114 @@ func TestErrorAfterWarning(t *testing.T) {
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())
}

func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) {
t.Parallel()

maxElapsedTimeInMs := 2000
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(200, 1600, maxElapsedTimeInMs, 2.0)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.Nil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})

tester.MustApplyPatches()
manager.Tick(state, 200)
tester.MustApplyPatches()
require.Equal(t, model.StateNormal, state.Info.State)
require.True(t, manager.ShouldRunning())

// 1. test when an warning occurs, the changefeed state will be changed to warning
// and it will still keep running
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 200)
// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 2. test when the changefeed is in warning state, and the resolvedTs and checkpointTs is not progressing,
// the changefeed state will remain warning whena new warning is encountered.
time.Sleep(manager.changefeedErrorStuckDuration + 10)
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 200)
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 3. Test changefeed remain warning when resolvedTs is progressing after stuck beyond the detection time.
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 400)
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 4. Test changefeed failed when checkpointTs is not progressing for changefeedErrorStuckDuration time.
time.Sleep(manager.changefeedErrorStuckDuration + 10)
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 400)
tester.MustApplyPatches()
require.Equal(t, model.StateFailed, state.Info.State)
require.False(t, manager.ShouldRunning())
}
2 changes: 1 addition & 1 deletion cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt
// Tick all changefeeds.
ctx := stdCtx.(cdcContext.Context)
for changefeedID, changefeedState := range state.Changefeeds {
// check if we are the changefeed owner to handle this changefeed
// check if we are the changefeed owner to handle this changefeed
if !o.shouldHandleChangefeed(changefeedState) {
continue
}
Expand Down
1 change: 0 additions & 1 deletion pkg/orchestrator/reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er
return errors.Trace(err)
}
if key.Tp == etcd.CDCKeyTypeChangefeedInfo {
log.Info("update changefeed info", zap.Any("info", s.Info))
s.Info.VerifyAndComplete()
}
return nil
Expand Down
Loading