Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu committed Oct 30, 2023
1 parent 0bc0ac5 commit c7fa8ce
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions cdc/kv/shared_region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,23 +102,21 @@ func (w *sharedRegionWorker) run(ctx context.Context) error {
}

func (w *sharedRegionWorker) handleSingleRegionError(state *regionFeedState, stream *requestedStream) {
if stream != nil {
// stream can be nil if it's obviously unnecessary to re-schedule the region.
stream.takeState(SubscriptionID(state.requestID), state.getRegionID())
}

reschedule := state.markRemoved()
stepsToRemoved := state.markRemoved()
err := state.takeError()
log.Info("region worker get a region error",
zap.String("namespace", w.changefeed.Namespace),
zap.String("changefeed", w.changefeed.ID),
zap.Uint64("streamID", stream.streamID),
zap.Any("subscriptionID", state.getRegionID()),
zap.Uint64("regionID", state.sri.verID.GetID()),
zap.Bool("reschedule", reschedule),
zap.Bool("reschedule", stepsToRemoved),
zap.Error(err))

if reschedule {
if stepsToRemoved {
if stream != nil {
stream.takeState(SubscriptionID(state.requestID), state.getRegionID())
}
// For SharedClient and SharedWorker, err will never be nil.
w.client.onRegionFail(newRegionErrorInfo(state.getRegionInfo(), err))
}
Expand Down

0 comments on commit c7fa8ce

Please sign in to comment.