From 0bc0ac52ec83846325179505c8fab1fbe17eb54c Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 27 Oct 2023 14:36:46 +0800 Subject: [PATCH] more logs Signed-off-by: qupeng --- cdc/kv/shared_client_test.go | 11 +++++++---- cdc/kv/shared_region_worker.go | 25 ++++++++++++++++++------- cdc/kv/shared_stream.go | 21 ++++++++++++++------- 3 files changed, 39 insertions(+), 18 deletions(-) diff --git a/cdc/kv/shared_client_test.go b/cdc/kv/shared_client_test.go index 5a28d6e7bef..eae2ae6f81c 100644 --- a/cdc/kv/shared_client_test.go +++ b/cdc/kv/shared_client_test.go @@ -38,14 +38,17 @@ import ( ) func TestRequestedStreamRequestedRegions(t *testing.T) { - stream := &requestedStream{streamID: 100, requests: chann.NewAutoDrainChann[singleRegionInfo]()} - defer stream.requests.CloseAndDrain() - stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState) + stream := newRequestedStream(100) require.Nil(t, stream.getState(1, 2)) require.Nil(t, stream.takeState(1, 2)) - stream.setState(1, 2, ®ionFeedState{sri: singleRegionInfo{requestedTable: &requestedTable{}}}) + stream.setState(1, 2, ®ionFeedState{}) + require.NotNil(t, stream.getState(1, 2)) + require.NotNil(t, stream.takeState(1, 2)) + require.Nil(t, stream.getState(1, 2)) + + stream.setState(1, 2, ®ionFeedState{}) require.NotNil(t, stream.getState(1, 2)) require.NotNil(t, stream.takeState(1, 2)) require.Nil(t, stream.getState(1, 2)) diff --git a/cdc/kv/shared_region_worker.go b/cdc/kv/shared_region_worker.go index 1ec521b7ea6..da073474e0f 100644 --- a/cdc/kv/shared_region_worker.go +++ b/cdc/kv/shared_region_worker.go @@ -106,9 +106,20 @@ func (w *sharedRegionWorker) handleSingleRegionError(state *regionFeedState, str // stream can be nil if it's obviously unnecessary to re-schedule the region. stream.takeState(SubscriptionID(state.requestID), state.getRegionID()) } - if state.markRemoved() { + + reschedule := state.markRemoved() + err := state.takeError() + log.Info("region worker get a region error", + zap.String("namespace", w.changefeed.Namespace), + zap.String("changefeed", w.changefeed.ID), + zap.Uint64("streamID", stream.streamID), + zap.Any("subscriptionID", state.getRegionID()), + zap.Uint64("regionID", state.sri.verID.GetID()), + zap.Bool("reschedule", reschedule), + zap.Error(err)) + + if reschedule { // For SharedClient and SharedWorker, err will never be nil. - err := state.takeError() w.client.onRegionFail(newRegionErrorInfo(state.getRegionInfo(), err)) } } @@ -128,16 +139,16 @@ func (w *sharedRegionWorker) processEvent(ctx context.Context, event statefulEve w.handleSingleRegionError(state, event.stream) return } - case *cdcpb.Event_Admin_: - case *cdcpb.Event_Error: - state.markStopped(&eventError{err: x.Error}) - w.handleSingleRegionError(state, event.stream) - return case *cdcpb.Event_ResolvedTs: w.handleResolvedTs(ctx, resolvedTsBatch{ ts: x.ResolvedTs, regions: []*regionFeedState{state}, }) + case *cdcpb.Event_Error: + state.markStopped(&eventError{err: x.Error}) + w.handleSingleRegionError(state, event.stream) + return + case *cdcpb.Event_Admin_: } } else if len(event.resolvedTsBatch.regions) > 0 { w.handleResolvedTs(ctx, event.resolvedTsBatch) diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go index 87b0c24cd82..7e1d75353da 100644 --- a/cdc/kv/shared_stream.go +++ b/cdc/kv/shared_stream.go @@ -56,11 +56,8 @@ type tableExclusive struct { } func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *requestedStore) *requestedStream { - stream := &requestedStream{ - streamID: streamIDGen.Add(1), - requests: chann.NewAutoDrainChann[singleRegionInfo](), - } - stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState) + stream := newRequestedStream(streamIDGen.Add(1)) + stream.requests = chann.NewAutoDrainChann[singleRegionInfo]() waitForPreFetching := func() error { if stream.preFetchForConnecting != nil { @@ -119,6 +116,12 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque return stream } +func newRequestedStream(streamID uint64) *requestedStream { + stream := &requestedStream{streamID: streamID} + stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState) + return stream +} + func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requestedStore) (canceled bool) { isCanceled := func() bool { select { @@ -423,6 +426,9 @@ func (s *requestedStream) takeState(subscriptionID SubscriptionID, regionID uint if m, ok := s.requestedRegions.m[subscriptionID]; ok { state = m[regionID] delete(m, regionID) + if len(m) == 0 { + delete(s.requestedRegions.m, subscriptionID) + } } return } @@ -469,8 +475,8 @@ func (s *requestedStream) sendRegionChangeEvents( subscriptionID = tableSubID } + state := s.getState(subscriptionID, regionID) switch x := event.Event.(type) { - case *cdcpb.Event_Admin_: case *cdcpb.Event_Error: log.Info("event feed receives a region error", zap.String("namespace", c.changefeed.Namespace), @@ -478,10 +484,11 @@ func (s *requestedStream) sendRegionChangeEvents( zap.Uint64("streamID", s.streamID), zap.Any("subscriptionID", subscriptionID), zap.Uint64("regionID", event.RegionId), + zap.Bool("stateIsNil", state == nil), zap.Any("error", x.Error)) } - if state := s.getState(subscriptionID, regionID); state != nil { + if state != nil { sfEvent := newEventItem(event, state, s) slot := hashRegionID(regionID, len(c.workers)) if err := c.workers[slot].sendEvent(ctx, sfEvent); err != nil {