Skip to content

Commit

Permalink
kv-client(cdc): fix the bug that region errors can be lost (#9963) (#…
Browse files Browse the repository at this point in the history
…9977)

close #9673
  • Loading branch information
ti-chi-bot authored Oct 31, 2023
1 parent 6439414 commit 453aea8
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 44 deletions.
46 changes: 25 additions & 21 deletions cdc/kv/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package kv
import (
"runtime"
"sync"
"sync/atomic"

"github.com/pingcap/tiflow/cdc/kv/regionlock"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand Down Expand Up @@ -67,13 +66,12 @@ type regionFeedState struct {
// stopped: some error happens.
// removed: the region is returned into the pending list,
// will be re-resolved and re-scheduled later.
state atomic.Uint32

// All region errors should be handled in region workers.
// `err` is used to retrieve errors generated outside.
err struct {
sync.Mutex
e error
state struct {
sync.RWMutex
v uint32
// All region errors should be handled in region workers.
// `err` is used to retrieve errors generated outside.
err error
}
}

Expand All @@ -90,30 +88,36 @@ func (s *regionFeedState) start() {

// mark regionFeedState as stopped with the given error if possible.
func (s *regionFeedState) markStopped(err error) {
if s.state.CompareAndSwap(stateNormal, stateStopped) {
if err != nil {
s.err.Lock()
defer s.err.Unlock()
s.err.e = err
}
s.state.Lock()
defer s.state.Unlock()
if s.state.v == stateNormal {
s.state.v = stateStopped
s.state.err = err
}
}

// mark regionFeedState as removed if possible.
func (s *regionFeedState) markRemoved() (changed bool) {
return s.state.CompareAndSwap(stateStopped, stateRemoved)
s.state.Lock()
defer s.state.Unlock()
if s.state.v == stateStopped {
s.state.v = stateRemoved
changed = true
}
return
}

func (s *regionFeedState) isStale() bool {
state := s.state.Load()
return state == stateStopped || state == stateRemoved
s.state.RLock()
defer s.state.RUnlock()
return s.state.v == stateStopped || s.state.v == stateRemoved
}

func (s *regionFeedState) takeError() (err error) {
s.err.Lock()
defer s.err.Unlock()
err = s.err.e
s.err.e = nil
s.state.Lock()
defer s.state.Unlock()
err = s.state.err
s.state.err = nil
return
}

Expand Down
13 changes: 9 additions & 4 deletions cdc/kv/shared_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,22 @@ import (
)

func TestRequestedStreamRequestedRegions(t *testing.T) {
stream := &requestedStream{streamID: 100, requests: chann.NewAutoDrainChann[singleRegionInfo]()}
defer stream.requests.CloseAndDrain()
stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState)
stream := newRequestedStream(100)

require.Nil(t, stream.getState(1, 2))
require.Nil(t, stream.takeState(1, 2))

stream.setState(1, 2, &regionFeedState{sri: singleRegionInfo{requestedTable: &requestedTable{}}})
stream.setState(1, 2, &regionFeedState{})
require.NotNil(t, stream.getState(1, 2))
require.NotNil(t, stream.takeState(1, 2))
require.Nil(t, stream.getState(1, 2))
require.Equal(t, 0, len(stream.requestedRegions.m))

stream.setState(1, 2, &regionFeedState{})
require.NotNil(t, stream.getState(1, 2))
require.NotNil(t, stream.takeState(1, 2))
require.Nil(t, stream.getState(1, 2))
require.Equal(t, 0, len(stream.requestedRegions.m))
}

func TestRequestedTable(t *testing.T) {
Expand Down
29 changes: 18 additions & 11 deletions cdc/kv/shared_region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,20 @@ func (w *sharedRegionWorker) run(ctx context.Context) error {
}

func (w *sharedRegionWorker) handleSingleRegionError(state *regionFeedState, stream *requestedStream) {
if stream != nil {
// stream can be nil if it's obviously unnecessary to re-schedule the region.
stream.takeState(SubscriptionID(state.requestID), state.getRegionID())
stepsToRemoved := state.markRemoved()
err := state.takeError()
if err != nil {
log.Info("region worker get a region error",
zap.String("namespace", w.changefeed.Namespace),
zap.String("changefeed", w.changefeed.ID),
zap.Uint64("streamID", stream.streamID),
zap.Any("subscriptionID", state.getRegionID()),
zap.Uint64("regionID", state.sri.verID.GetID()),
zap.Bool("reschedule", stepsToRemoved),
zap.Error(err))
}
if state.markRemoved() {
// For SharedClient and SharedWorker, err will never be nil.
err := state.takeError()
if stepsToRemoved {
stream.takeState(SubscriptionID(state.requestID), state.getRegionID())
w.client.onRegionFail(newRegionErrorInfo(state.getRegionInfo(), err))
}
}
Expand All @@ -128,16 +135,16 @@ func (w *sharedRegionWorker) processEvent(ctx context.Context, event statefulEve
w.handleSingleRegionError(state, event.stream)
return
}
case *cdcpb.Event_Admin_:
case *cdcpb.Event_Error:
state.markStopped(&eventError{err: x.Error})
w.handleSingleRegionError(state, event.stream)
return
case *cdcpb.Event_ResolvedTs:
w.handleResolvedTs(ctx, resolvedTsBatch{
ts: x.ResolvedTs,
regions: []*regionFeedState{state},
})
case *cdcpb.Event_Error:
state.markStopped(&eventError{err: x.Error})
w.handleSingleRegionError(state, event.stream)
return
case *cdcpb.Event_Admin_:
}
} else if len(event.resolvedTsBatch.regions) > 0 {
w.handleResolvedTs(ctx, event.resolvedTsBatch)
Expand Down
36 changes: 28 additions & 8 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,8 @@ type tableExclusive struct {
}

func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *requestedStore) *requestedStream {
stream := &requestedStream{
streamID: streamIDGen.Add(1),
requests: chann.NewAutoDrainChann[singleRegionInfo](),
}
stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState)
stream := newRequestedStream(streamIDGen.Add(1))
stream.requests = chann.NewAutoDrainChann[singleRegionInfo]()

waitForPreFetching := func() error {
if stream.preFetchForConnecting != nil {
Expand Down Expand Up @@ -96,8 +93,8 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
for _, m := range stream.clearStates() {
for _, state := range m {
state.markStopped(&sendRequestToStoreErr{})
sfEvent := newEventItem(nil, state, stream)
slot := hashRegionID(state.sri.verID.GetID(), len(c.workers))
sfEvent := statefulEvent{eventItem: eventItem{state: state}}
_ = c.workers[slot].sendEvent(ctx, sfEvent)
}
}
Expand All @@ -119,6 +116,12 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
return stream
}

func newRequestedStream(streamID uint64) *requestedStream {
stream := &requestedStream{streamID: streamID}
stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState)
return stream
}

func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requestedStore) (canceled bool) {
isCanceled := func() bool {
select {
Expand Down Expand Up @@ -352,8 +355,8 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request
// and then those regions from the bad requestID will be unsubscribed finally.
for _, state := range s.takeStates(subscriptionID) {
state.markStopped(&sendRequestToStoreErr{})
sfEvent := newEventItem(nil, state, s)
slot := hashRegionID(state.sri.verID.GetID(), len(c.workers))
sfEvent := statefulEvent{eventItem: eventItem{state: state}}
if err = c.workers[slot].sendEvent(ctx, sfEvent); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -423,6 +426,9 @@ func (s *requestedStream) takeState(subscriptionID SubscriptionID, regionID uint
if m, ok := s.requestedRegions.m[subscriptionID]; ok {
state = m[regionID]
delete(m, regionID)
if len(m) == 0 {
delete(s.requestedRegions.m, subscriptionID)
}
}
return
}
Expand Down Expand Up @@ -468,7 +474,21 @@ func (s *requestedStream) sendRegionChangeEvents(
} else {
subscriptionID = tableSubID
}
if state := s.getState(subscriptionID, regionID); state != nil {

state := s.getState(subscriptionID, regionID)
switch x := event.Event.(type) {
case *cdcpb.Event_Error:
log.Info("event feed receives a region error",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", event.RegionId),
zap.Bool("stateIsNil", state == nil),
zap.Any("error", x.Error))
}

if state != nil {
sfEvent := newEventItem(event, state, s)
slot := hashRegionID(regionID, len(c.workers))
if err := c.workers[slot].sendEvent(ctx, sfEvent); err != nil {
Expand Down

0 comments on commit 453aea8

Please sign in to comment.