diff --git a/cdc/kv/region_state.go b/cdc/kv/region_state.go index 0019f3bffb8..00cee2b8572 100644 --- a/cdc/kv/region_state.go +++ b/cdc/kv/region_state.go @@ -16,7 +16,6 @@ package kv import ( "runtime" "sync" - "sync/atomic" "github.com/pingcap/tiflow/cdc/kv/regionlock" "github.com/pingcap/tiflow/cdc/processor/tablepb" @@ -67,13 +66,12 @@ type regionFeedState struct { // stopped: some error happens. // removed: the region is returned into the pending list, // will be re-resolved and re-scheduled later. - state atomic.Uint32 - - // All region errors should be handled in region workers. - // `err` is used to retrieve errors generated outside. - err struct { - sync.Mutex - e error + state struct { + sync.RWMutex + v uint32 + // All region errors should be handled in region workers. + // `err` is used to retrieve errors generated outside. + err error } } @@ -90,30 +88,36 @@ func (s *regionFeedState) start() { // mark regionFeedState as stopped with the given error if possible. func (s *regionFeedState) markStopped(err error) { - if s.state.CompareAndSwap(stateNormal, stateStopped) { - if err != nil { - s.err.Lock() - defer s.err.Unlock() - s.err.e = err - } + s.state.Lock() + defer s.state.Unlock() + if s.state.v == stateNormal { + s.state.v = stateStopped + s.state.err = err } } // mark regionFeedState as removed if possible. func (s *regionFeedState) markRemoved() (changed bool) { - return s.state.CompareAndSwap(stateStopped, stateRemoved) + s.state.Lock() + defer s.state.Unlock() + if s.state.v == stateStopped { + s.state.v = stateRemoved + changed = true + } + return } func (s *regionFeedState) isStale() bool { - state := s.state.Load() - return state == stateStopped || state == stateRemoved + s.state.RLock() + defer s.state.RUnlock() + return s.state.v == stateStopped || s.state.v == stateRemoved } func (s *regionFeedState) takeError() (err error) { - s.err.Lock() - defer s.err.Unlock() - err = s.err.e - s.err.e = nil + s.state.Lock() + defer s.state.Unlock() + err = s.state.err + s.state.err = nil return } diff --git a/cdc/kv/shared_client_test.go b/cdc/kv/shared_client_test.go index 5a28d6e7bef..4764fd1cb2c 100644 --- a/cdc/kv/shared_client_test.go +++ b/cdc/kv/shared_client_test.go @@ -38,17 +38,22 @@ import ( ) func TestRequestedStreamRequestedRegions(t *testing.T) { - stream := &requestedStream{streamID: 100, requests: chann.NewAutoDrainChann[singleRegionInfo]()} - defer stream.requests.CloseAndDrain() - stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState) + stream := newRequestedStream(100) require.Nil(t, stream.getState(1, 2)) require.Nil(t, stream.takeState(1, 2)) - stream.setState(1, 2, ®ionFeedState{sri: singleRegionInfo{requestedTable: &requestedTable{}}}) + stream.setState(1, 2, ®ionFeedState{}) require.NotNil(t, stream.getState(1, 2)) require.NotNil(t, stream.takeState(1, 2)) require.Nil(t, stream.getState(1, 2)) + require.Equal(t, 0, len(stream.requestedRegions.m)) + + stream.setState(1, 2, ®ionFeedState{}) + require.NotNil(t, stream.getState(1, 2)) + require.NotNil(t, stream.takeState(1, 2)) + require.Nil(t, stream.getState(1, 2)) + require.Equal(t, 0, len(stream.requestedRegions.m)) } func TestRequestedTable(t *testing.T) { diff --git a/cdc/kv/shared_region_worker.go b/cdc/kv/shared_region_worker.go index 1ec521b7ea6..83b7805712d 100644 --- a/cdc/kv/shared_region_worker.go +++ b/cdc/kv/shared_region_worker.go @@ -102,13 +102,20 @@ func (w *sharedRegionWorker) run(ctx context.Context) error { } func (w *sharedRegionWorker) handleSingleRegionError(state *regionFeedState, stream *requestedStream) { - if stream != nil { - // stream can be nil if it's obviously unnecessary to re-schedule the region. - stream.takeState(SubscriptionID(state.requestID), state.getRegionID()) + stepsToRemoved := state.markRemoved() + err := state.takeError() + if err != nil { + log.Info("region worker get a region error", + zap.String("namespace", w.changefeed.Namespace), + zap.String("changefeed", w.changefeed.ID), + zap.Uint64("streamID", stream.streamID), + zap.Any("subscriptionID", state.getRegionID()), + zap.Uint64("regionID", state.sri.verID.GetID()), + zap.Bool("reschedule", stepsToRemoved), + zap.Error(err)) } - if state.markRemoved() { - // For SharedClient and SharedWorker, err will never be nil. - err := state.takeError() + if stepsToRemoved { + stream.takeState(SubscriptionID(state.requestID), state.getRegionID()) w.client.onRegionFail(newRegionErrorInfo(state.getRegionInfo(), err)) } } @@ -128,16 +135,16 @@ func (w *sharedRegionWorker) processEvent(ctx context.Context, event statefulEve w.handleSingleRegionError(state, event.stream) return } - case *cdcpb.Event_Admin_: - case *cdcpb.Event_Error: - state.markStopped(&eventError{err: x.Error}) - w.handleSingleRegionError(state, event.stream) - return case *cdcpb.Event_ResolvedTs: w.handleResolvedTs(ctx, resolvedTsBatch{ ts: x.ResolvedTs, regions: []*regionFeedState{state}, }) + case *cdcpb.Event_Error: + state.markStopped(&eventError{err: x.Error}) + w.handleSingleRegionError(state, event.stream) + return + case *cdcpb.Event_Admin_: } } else if len(event.resolvedTsBatch.regions) > 0 { w.handleResolvedTs(ctx, event.resolvedTsBatch) diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go index ad3cc840205..5e13007f096 100644 --- a/cdc/kv/shared_stream.go +++ b/cdc/kv/shared_stream.go @@ -56,11 +56,8 @@ type tableExclusive struct { } func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *requestedStore) *requestedStream { - stream := &requestedStream{ - streamID: streamIDGen.Add(1), - requests: chann.NewAutoDrainChann[singleRegionInfo](), - } - stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState) + stream := newRequestedStream(streamIDGen.Add(1)) + stream.requests = chann.NewAutoDrainChann[singleRegionInfo]() waitForPreFetching := func() error { if stream.preFetchForConnecting != nil { @@ -96,8 +93,8 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque for _, m := range stream.clearStates() { for _, state := range m { state.markStopped(&sendRequestToStoreErr{}) + sfEvent := newEventItem(nil, state, stream) slot := hashRegionID(state.sri.verID.GetID(), len(c.workers)) - sfEvent := statefulEvent{eventItem: eventItem{state: state}} _ = c.workers[slot].sendEvent(ctx, sfEvent) } } @@ -119,6 +116,12 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque return stream } +func newRequestedStream(streamID uint64) *requestedStream { + stream := &requestedStream{streamID: streamID} + stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState) + return stream +} + func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requestedStore) (canceled bool) { isCanceled := func() bool { select { @@ -352,8 +355,8 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request // and then those regions from the bad requestID will be unsubscribed finally. for _, state := range s.takeStates(subscriptionID) { state.markStopped(&sendRequestToStoreErr{}) + sfEvent := newEventItem(nil, state, s) slot := hashRegionID(state.sri.verID.GetID(), len(c.workers)) - sfEvent := statefulEvent{eventItem: eventItem{state: state}} if err = c.workers[slot].sendEvent(ctx, sfEvent); err != nil { return errors.Trace(err) } @@ -423,6 +426,9 @@ func (s *requestedStream) takeState(subscriptionID SubscriptionID, regionID uint if m, ok := s.requestedRegions.m[subscriptionID]; ok { state = m[regionID] delete(m, regionID) + if len(m) == 0 { + delete(s.requestedRegions.m, subscriptionID) + } } return } @@ -468,7 +474,21 @@ func (s *requestedStream) sendRegionChangeEvents( } else { subscriptionID = tableSubID } - if state := s.getState(subscriptionID, regionID); state != nil { + + state := s.getState(subscriptionID, regionID) + switch x := event.Event.(type) { + case *cdcpb.Event_Error: + log.Info("event feed receives a region error", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", s.streamID), + zap.Any("subscriptionID", subscriptionID), + zap.Uint64("regionID", event.RegionId), + zap.Bool("stateIsNil", state == nil), + zap.Any("error", x.Error)) + } + + if state != nil { sfEvent := newEventItem(event, state, s) slot := hashRegionID(regionID, len(c.workers)) if err := c.workers[slot].sendEvent(ctx, sfEvent); err != nil {