Skip to content

Commit

Permalink
more logs
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu committed Oct 27, 2023
1 parent 7b444d6 commit 0bc0ac5
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 18 deletions.
11 changes: 7 additions & 4 deletions cdc/kv/shared_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@ import (
)

func TestRequestedStreamRequestedRegions(t *testing.T) {
stream := &requestedStream{streamID: 100, requests: chann.NewAutoDrainChann[singleRegionInfo]()}
defer stream.requests.CloseAndDrain()
stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState)
stream := newRequestedStream(100)

require.Nil(t, stream.getState(1, 2))
require.Nil(t, stream.takeState(1, 2))

stream.setState(1, 2, &regionFeedState{sri: singleRegionInfo{requestedTable: &requestedTable{}}})
stream.setState(1, 2, &regionFeedState{})
require.NotNil(t, stream.getState(1, 2))
require.NotNil(t, stream.takeState(1, 2))
require.Nil(t, stream.getState(1, 2))

stream.setState(1, 2, &regionFeedState{})
require.NotNil(t, stream.getState(1, 2))
require.NotNil(t, stream.takeState(1, 2))
require.Nil(t, stream.getState(1, 2))
Expand Down
25 changes: 18 additions & 7 deletions cdc/kv/shared_region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,20 @@ func (w *sharedRegionWorker) handleSingleRegionError(state *regionFeedState, str
// stream can be nil if it's obviously unnecessary to re-schedule the region.
stream.takeState(SubscriptionID(state.requestID), state.getRegionID())
}
if state.markRemoved() {

reschedule := state.markRemoved()
err := state.takeError()
log.Info("region worker get a region error",
zap.String("namespace", w.changefeed.Namespace),
zap.String("changefeed", w.changefeed.ID),
zap.Uint64("streamID", stream.streamID),
zap.Any("subscriptionID", state.getRegionID()),
zap.Uint64("regionID", state.sri.verID.GetID()),
zap.Bool("reschedule", reschedule),
zap.Error(err))

if reschedule {
// For SharedClient and SharedWorker, err will never be nil.
err := state.takeError()
w.client.onRegionFail(newRegionErrorInfo(state.getRegionInfo(), err))
}
}
Expand All @@ -128,16 +139,16 @@ func (w *sharedRegionWorker) processEvent(ctx context.Context, event statefulEve
w.handleSingleRegionError(state, event.stream)
return
}
case *cdcpb.Event_Admin_:
case *cdcpb.Event_Error:
state.markStopped(&eventError{err: x.Error})
w.handleSingleRegionError(state, event.stream)
return
case *cdcpb.Event_ResolvedTs:
w.handleResolvedTs(ctx, resolvedTsBatch{
ts: x.ResolvedTs,
regions: []*regionFeedState{state},
})
case *cdcpb.Event_Error:
state.markStopped(&eventError{err: x.Error})
w.handleSingleRegionError(state, event.stream)
return
case *cdcpb.Event_Admin_:
}
} else if len(event.resolvedTsBatch.regions) > 0 {
w.handleResolvedTs(ctx, event.resolvedTsBatch)
Expand Down
21 changes: 14 additions & 7 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,8 @@ type tableExclusive struct {
}

func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *requestedStore) *requestedStream {
stream := &requestedStream{
streamID: streamIDGen.Add(1),
requests: chann.NewAutoDrainChann[singleRegionInfo](),
}
stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState)
stream := newRequestedStream(streamIDGen.Add(1))
stream.requests = chann.NewAutoDrainChann[singleRegionInfo]()

waitForPreFetching := func() error {
if stream.preFetchForConnecting != nil {
Expand Down Expand Up @@ -119,6 +116,12 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
return stream
}

func newRequestedStream(streamID uint64) *requestedStream {
stream := &requestedStream{streamID: streamID}
stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState)
return stream
}

func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requestedStore) (canceled bool) {
isCanceled := func() bool {
select {
Expand Down Expand Up @@ -423,6 +426,9 @@ func (s *requestedStream) takeState(subscriptionID SubscriptionID, regionID uint
if m, ok := s.requestedRegions.m[subscriptionID]; ok {
state = m[regionID]
delete(m, regionID)
if len(m) == 0 {
delete(s.requestedRegions.m, subscriptionID)
}
}
return
}
Expand Down Expand Up @@ -469,19 +475,20 @@ func (s *requestedStream) sendRegionChangeEvents(
subscriptionID = tableSubID
}

state := s.getState(subscriptionID, regionID)
switch x := event.Event.(type) {
case *cdcpb.Event_Admin_:
case *cdcpb.Event_Error:
log.Info("event feed receives a region error",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", event.RegionId),
zap.Bool("stateIsNil", state == nil),
zap.Any("error", x.Error))
}

if state := s.getState(subscriptionID, regionID); state != nil {
if state != nil {
sfEvent := newEventItem(event, state, s)
slot := hashRegionID(regionID, len(c.workers))
if err := c.workers[slot].sendEvent(ctx, sfEvent); err != nil {
Expand Down

0 comments on commit 0bc0ac5

Please sign in to comment.