Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reset internal state of feedStateManager when shouldBeRunning is false #9893

Merged
merged 7 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 51 additions & 37 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ type feedStateManager struct {

// resolvedTs and initCheckpointTs is for checking whether resolved timestamp
// has been advanced or not.
resolvedTs model.Ts
initCheckpointTs model.Ts
resolvedTs model.Ts
resolvedTsAdvanced time.Time
checkpointTs model.Ts
checkpointTsAdvanced time.Time

checkpointTsAdvanced time.Time
lastCheckpointTs model.Ts
changefeedErrorStuckDuration time.Duration
}

Expand All @@ -89,6 +89,25 @@ func newFeedStateManager(up *upstream.Upstream, cfg *config.ReplicaConfig) *feed
return m
}

func (m *feedStateManager) shouldRetry() (shouldBeRunning, shouldFailed bool) {
if time.Since(m.lastErrorRetryTime) < m.backoffInterval {
return
}

// retry the changefeed
m.backoffInterval = m.errBackoff.NextBackOff()
// NextBackOff() will return -1 once the MaxElapsedTime has elapsed,
// set the changefeed to failed state.
if m.backoffInterval == m.errBackoff.Stop {
shouldFailed = true
return
}

m.lastErrorRetryTime = time.Now()
shouldBeRunning = true
return
}

// resetErrRetry reset the error retry related fields
func (m *feedStateManager) resetErrRetry() {
m.errBackoff.Reset()
Expand All @@ -100,25 +119,23 @@ func (m *feedStateManager) Tick(
state *orchestrator.ChangefeedReactorState,
resolvedTs model.Ts,
) (adminJobPending bool) {
m.checkAndInitLastRetryCheckpointTs(state.Status)

if state.Status != nil {
if m.lastCheckpointTs < state.Status.CheckpointTs {
m.lastCheckpointTs = state.Status.CheckpointTs
if m.checkpointTs < state.Status.CheckpointTs {
m.checkpointTs = state.Status.CheckpointTs
m.checkpointTsAdvanced = time.Now()
}
if m.state == nil || m.state.Status == nil {
// It's the first time `m.state.Status` gets filled.
m.initCheckpointTs = state.Status.CheckpointTs
if m.resolvedTs < resolvedTs {
m.resolvedTs = resolvedTs
m.resolvedTsAdvanced = time.Now()
}
}

m.checkAndInitLastRetryCheckpointTs(state.Status)

m.state = state
m.resolvedTs = resolvedTs
m.shouldBeRunning = true
defer func() {
if !m.shouldBeRunning {
m.cleanUpTaskPositions()
m.cleanUp()
}
}()

Expand All @@ -141,16 +158,8 @@ func (m *feedStateManager) Tick(
m.shouldBeRunning = false
return
case model.StatePending:
if time.Since(m.lastErrorRetryTime) < m.backoffInterval {
m.shouldBeRunning = false
return
}
// retry the changefeed
oldBackoffInterval := m.backoffInterval
m.backoffInterval = m.errBackoff.NextBackOff()
// NextBackOff() will return -1 once the MaxElapsedTime has elapsed,
// set the changefeed to failed state.
if m.backoffInterval == m.errBackoff.Stop {
shouldBeRunning, shouldFailed := m.shouldRetry()
if shouldFailed {
log.Error("The changefeed won't be restarted as it has been experiencing failures for "+
"an extended duration",
zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime),
Expand All @@ -163,19 +172,22 @@ func (m *feedStateManager) Tick(
m.patchState(model.StateFailed)
return
}
if !shouldBeRunning {
m.shouldBeRunning = false
return
}

m.lastErrorRetryTime = time.Now()
// retry the changefeed
m.shouldBeRunning = true
if m.state.Status != nil {
m.lastErrorRetryCheckpointTs = m.state.Status.CheckpointTs
}
m.shouldBeRunning = true
m.patchState(model.StateWarning)
log.Info("changefeed retry backoff interval is elapsed,"+
"chengefeed will be restarted",
zap.String("namespace", m.state.ID.Namespace),
zap.String("changefeed", m.state.ID.ID),
zap.Time("lastErrorRetryTime", m.lastErrorRetryTime),
zap.Duration("lastRetryInterval", oldBackoffInterval),
zap.Duration("nextRetryInterval", m.backoffInterval))
case model.StateNormal, model.StateWarning:
m.checkAndChangeState()
Expand Down Expand Up @@ -416,12 +428,14 @@ func (m *feedStateManager) patchState(feedState model.FeedState) {
})
}

func (m *feedStateManager) cleanUpTaskPositions() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how it works?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup checkpointTs and resolvedTs before stopping, and it will be initialized when changefeed resumes.

func (m *feedStateManager) cleanUp() {
for captureID := range m.state.TaskPositions {
m.state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return nil, position != nil, nil
})
}
m.checkpointTs = 0
m.resolvedTs = 0
}

func (m *feedStateManager) errorsReportedByProcessors() []*model.RunningError {
Expand Down Expand Up @@ -546,12 +560,12 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
info.Error = lastError
return info, true, nil
})
}

// The errBackoff needs to be reset before the first retry.
if !m.isRetrying {
m.resetErrRetry()
m.isRetrying = true
// The errBackoff needs to be reset before the first retry.
if !m.isRetrying {
m.resetErrRetry()
m.isRetrying = true
}
}
}

Expand All @@ -566,12 +580,12 @@ func (m *feedStateManager) handleWarning(errs ...*model.RunningError) {
ckptTime := oracle.GetTimeFromTS(m.state.Status.CheckpointTs)
m.lastWarningReportCheckpointTs = m.state.Status.CheckpointTs
// Conditions:
// 1. checkpoint lag is large enough;
// 1. resolvedTs can advance normally;
// 2. checkpoint hasn't been advanced for a long while;
// 3. the changefeed has been initialized.
if currTime.Sub(ckptTime) > m.changefeedErrorStuckDuration &&
// 3. checkpoint lag is large enough.
if time.Since(m.resolvedTsAdvanced) < m.changefeedErrorStuckDuration &&
time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration &&
m.resolvedTs > m.initCheckpointTs {
currTime.Sub(ckptTime) > m.changefeedErrorStuckDuration {
log.Info("changefeed retry on warning for a very long time and does not resume, "+
"it will be failed", zap.String("changefeed", m.state.ID.ID),
zap.Uint64("checkpointTs", m.state.Status.CheckpointTs),
Expand Down
1 change: 1 addition & 0 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ func TestUpdateChangefeedEpoch(t *testing.T) {
func TestHandleWarning(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(200, 1600, 0, 2.0)
manager.changefeedErrorStuckDuration = 100 * time.Millisecond
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt
// Tick all changefeeds.
ctx := stdCtx.(cdcContext.Context)
for changefeedID, changefeedState := range state.Changefeeds {
// check if we are the changefeed owner to handle this changefeed
// check if we are the changefeed owner to handle this changefeed
if !o.shouldHandleChangefeed(changefeedState) {
continue
}
Expand Down
Loading