Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv-client(cdc): fix the bug that region errors can be lost #9963

Merged
merged 13 commits into from
Oct 31, 2023
Merged
13 changes: 9 additions & 4 deletions cdc/kv/shared_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,22 @@ import (
)

func TestRequestedStreamRequestedRegions(t *testing.T) {
stream := &requestedStream{streamID: 100, requests: chann.NewAutoDrainChann[singleRegionInfo]()}
defer stream.requests.CloseAndDrain()
stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState)
stream := newRequestedStream(100)

require.Nil(t, stream.getState(1, 2))
require.Nil(t, stream.takeState(1, 2))

stream.setState(1, 2, &regionFeedState{sri: singleRegionInfo{requestedTable: &requestedTable{}}})
stream.setState(1, 2, &regionFeedState{})
require.NotNil(t, stream.getState(1, 2))
require.NotNil(t, stream.takeState(1, 2))
require.Nil(t, stream.getState(1, 2))
require.Equal(t, 0, len(stream.requestedRegions.m))

stream.setState(1, 2, &regionFeedState{})
require.NotNil(t, stream.getState(1, 2))
require.NotNil(t, stream.takeState(1, 2))
require.Nil(t, stream.getState(1, 2))
require.Equal(t, 0, len(stream.requestedRegions.m))
}

func TestRequestedTable(t *testing.T) {
Expand Down
29 changes: 18 additions & 11 deletions cdc/kv/shared_region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,20 @@ func (w *sharedRegionWorker) run(ctx context.Context) error {
}

func (w *sharedRegionWorker) handleSingleRegionError(state *regionFeedState, stream *requestedStream) {
if stream != nil {
// stream can be nil if it's obviously unnecessary to re-schedule the region.
stream.takeState(SubscriptionID(state.requestID), state.getRegionID())
stepsToRemoved := state.markRemoved()
err := state.takeError()
if err != nil {
log.Info("region worker get a region error",
zap.String("namespace", w.changefeed.Namespace),
zap.String("changefeed", w.changefeed.ID),
zap.Uint64("streamID", stream.streamID),
zap.Any("subscriptionID", state.getRegionID()),
zap.Uint64("regionID", state.sri.verID.GetID()),
zap.Bool("reschedule", stepsToRemoved),
zap.Error(err))
}
if state.markRemoved() {
// For SharedClient and SharedWorker, err will never be nil.
err := state.takeError()
if stepsToRemoved {
stream.takeState(SubscriptionID(state.requestID), state.getRegionID())
w.client.onRegionFail(newRegionErrorInfo(state.getRegionInfo(), err))
}
}
Expand All @@ -128,16 +135,16 @@ func (w *sharedRegionWorker) processEvent(ctx context.Context, event statefulEve
w.handleSingleRegionError(state, event.stream)
return
}
case *cdcpb.Event_Admin_:
case *cdcpb.Event_Error:
state.markStopped(&eventError{err: x.Error})
w.handleSingleRegionError(state, event.stream)
return
case *cdcpb.Event_ResolvedTs:
w.handleResolvedTs(ctx, resolvedTsBatch{
ts: x.ResolvedTs,
regions: []*regionFeedState{state},
})
case *cdcpb.Event_Error:
state.markStopped(&eventError{err: x.Error})
w.handleSingleRegionError(state, event.stream)
return
case *cdcpb.Event_Admin_:
}
} else if len(event.resolvedTsBatch.regions) > 0 {
w.handleResolvedTs(ctx, event.resolvedTsBatch)
Expand Down
36 changes: 28 additions & 8 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,8 @@ type tableExclusive struct {
}

func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *requestedStore) *requestedStream {
stream := &requestedStream{
streamID: streamIDGen.Add(1),
requests: chann.NewAutoDrainChann[singleRegionInfo](),
}
stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState)
stream := newRequestedStream(streamIDGen.Add(1))
stream.requests = chann.NewAutoDrainChann[singleRegionInfo]()

waitForPreFetching := func() error {
if stream.preFetchForConnecting != nil {
Expand Down Expand Up @@ -96,8 +93,8 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
for _, m := range stream.clearStates() {
for _, state := range m {
state.markStopped(&sendRequestToStoreErr{})
sfEvent := newEventItem(nil, state, stream)
slot := hashRegionID(state.sri.verID.GetID(), len(c.workers))
sfEvent := statefulEvent{eventItem: eventItem{state: state}}
_ = c.workers[slot].sendEvent(ctx, sfEvent)
}
}
Expand All @@ -119,6 +116,12 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
return stream
}

func newRequestedStream(streamID uint64) *requestedStream {
stream := &requestedStream{streamID: streamID}
stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState)
return stream
}

func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requestedStore) (canceled bool) {
isCanceled := func() bool {
select {
Expand Down Expand Up @@ -352,8 +355,8 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request
// and then those regions from the bad requestID will be unsubscribed finally.
for _, state := range s.takeStates(subscriptionID) {
state.markStopped(&sendRequestToStoreErr{})
sfEvent := newEventItem(nil, state, s)
slot := hashRegionID(state.sri.verID.GetID(), len(c.workers))
sfEvent := statefulEvent{eventItem: eventItem{state: state}}
if err = c.workers[slot].sendEvent(ctx, sfEvent); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -423,6 +426,9 @@ func (s *requestedStream) takeState(subscriptionID SubscriptionID, regionID uint
if m, ok := s.requestedRegions.m[subscriptionID]; ok {
state = m[regionID]
delete(m, regionID)
if len(m) == 0 {
delete(s.requestedRegions.m, subscriptionID)
}
}
return
}
Expand Down Expand Up @@ -468,7 +474,21 @@ func (s *requestedStream) sendRegionChangeEvents(
} else {
subscriptionID = tableSubID
}
if state := s.getState(subscriptionID, regionID); state != nil {

state := s.getState(subscriptionID, regionID)
switch x := event.Event.(type) {
case *cdcpb.Event_Error:
log.Info("event feed receives a region error",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", event.RegionId),
zap.Bool("stateIsNil", state == nil),
zap.Any("error", x.Error))
}

if state != nil {
sfEvent := newEventItem(event, state, s)
slot := hashRegionID(regionID, len(c.workers))
if err := c.workers[slot].sendEvent(ctx, sfEvent); err != nil {
Expand Down
Loading