diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 5647aa0306c..a2d0710d8f2 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -63,11 +63,11 @@ type feedStateManager struct { // resolvedTs and initCheckpointTs is for checking whether resolved timestamp // has been advanced or not. - resolvedTs model.Ts - initCheckpointTs model.Ts - + resolvedTs model.Ts + initCheckpointTs model.Ts + resolvedTsAdvanced time.Time + checkpointTs model.Ts checkpointTsAdvanced time.Time - lastCheckpointTs model.Ts changefeedErrorStuckDuration time.Duration } @@ -100,25 +100,23 @@ func (m *feedStateManager) Tick( state *orchestrator.ChangefeedReactorState, resolvedTs model.Ts, ) (adminJobPending bool) { + m.checkAndInitLastRetryCheckpointTs(state.Status) + if state.Status != nil { - if m.lastCheckpointTs < state.Status.CheckpointTs { - m.lastCheckpointTs = state.Status.CheckpointTs + if m.checkpointTs < state.Status.CheckpointTs { + m.checkpointTs = state.Status.CheckpointTs m.checkpointTsAdvanced = time.Now() } - if m.state == nil || m.state.Status == nil { - // It's the first time `m.state.Status` gets filled. - m.initCheckpointTs = state.Status.CheckpointTs + if m.resolvedTs < resolvedTs { + m.resolvedTs = resolvedTs + m.resolvedTsAdvanced = time.Now() } } - - m.checkAndInitLastRetryCheckpointTs(state.Status) - m.state = state - m.resolvedTs = resolvedTs m.shouldBeRunning = true defer func() { if !m.shouldBeRunning { - m.cleanUpTaskPositions() + m.cleanUp() } }() @@ -416,12 +414,14 @@ func (m *feedStateManager) patchState(feedState model.FeedState) { }) } -func (m *feedStateManager) cleanUpTaskPositions() { +func (m *feedStateManager) cleanUp() { for captureID := range m.state.TaskPositions { m.state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return nil, position != nil, nil }) } + m.checkpointTs = 0 + m.resolvedTs = 0 } func (m *feedStateManager) errorsReportedByProcessors() []*model.RunningError { @@ -546,12 +546,12 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { info.Error = lastError return info, true, nil }) - } - // The errBackoff needs to be reset before the first retry. - if !m.isRetrying { - m.resetErrRetry() - m.isRetrying = true + // The errBackoff needs to be reset before the first retry. + if !m.isRetrying { + m.resetErrRetry() + m.isRetrying = true + } } } diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 87af81d205b..8e16ef2038e 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -155,7 +155,7 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt // Tick all changefeeds. ctx := stdCtx.(cdcContext.Context) for changefeedID, changefeedState := range state.Changefeeds { - // check if we are the changefeed owner to handle this changefeed + // check if we are the changefeed owner to handle this changefeed if !o.shouldHandleChangefeed(changefeedState) { continue }