From c7fa8ce3de32e74ea9eccd76a541be1fd18d10a1 Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 30 Oct 2023 12:51:57 +0800 Subject: [PATCH] fix Signed-off-by: qupeng --- cdc/kv/shared_region_worker.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/cdc/kv/shared_region_worker.go b/cdc/kv/shared_region_worker.go index da073474e0f..829ac5f2838 100644 --- a/cdc/kv/shared_region_worker.go +++ b/cdc/kv/shared_region_worker.go @@ -102,12 +102,7 @@ func (w *sharedRegionWorker) run(ctx context.Context) error { } func (w *sharedRegionWorker) handleSingleRegionError(state *regionFeedState, stream *requestedStream) { - if stream != nil { - // stream can be nil if it's obviously unnecessary to re-schedule the region. - stream.takeState(SubscriptionID(state.requestID), state.getRegionID()) - } - - reschedule := state.markRemoved() + stepsToRemoved := state.markRemoved() err := state.takeError() log.Info("region worker get a region error", zap.String("namespace", w.changefeed.Namespace), @@ -115,10 +110,13 @@ func (w *sharedRegionWorker) handleSingleRegionError(state *regionFeedState, str zap.Uint64("streamID", stream.streamID), zap.Any("subscriptionID", state.getRegionID()), zap.Uint64("regionID", state.sri.verID.GetID()), - zap.Bool("reschedule", reschedule), + zap.Bool("reschedule", stepsToRemoved), zap.Error(err)) - if reschedule { + if stepsToRemoved { + if stream != nil { + stream.takeState(SubscriptionID(state.requestID), state.getRegionID()) + } // For SharedClient and SharedWorker, err will never be nil. w.client.onRegionFail(newRegionErrorInfo(state.getRegionInfo(), err)) }