Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv-client(cdc): fix the bug that region errors can be lost (#9963) #9977

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 25 additions & 21 deletions cdc/kv/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package kv
import (
"runtime"
"sync"
"sync/atomic"

"github.com/pingcap/tiflow/cdc/kv/regionlock"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand Down Expand Up @@ -67,13 +66,12 @@ type regionFeedState struct {
// stopped: some error happens.
// removed: the region is returned into the pending list,
// will be re-resolved and re-scheduled later.
state atomic.Uint32

// All region errors should be handled in region workers.
// `err` is used to retrieve errors generated outside.
err struct {
sync.Mutex
e error
state struct {
sync.RWMutex
v uint32
// All region errors should be handled in region workers.
// `err` is used to retrieve errors generated outside.
err error
}
}

Expand All @@ -90,30 +88,36 @@ func (s *regionFeedState) start() {

// mark regionFeedState as stopped with the given error if possible.
func (s *regionFeedState) markStopped(err error) {
if s.state.CompareAndSwap(stateNormal, stateStopped) {
if err != nil {
s.err.Lock()
defer s.err.Unlock()
s.err.e = err
}
s.state.Lock()
defer s.state.Unlock()
if s.state.v == stateNormal {
s.state.v = stateStopped
s.state.err = err
}
}

// mark regionFeedState as removed if possible.
func (s *regionFeedState) markRemoved() (changed bool) {
return s.state.CompareAndSwap(stateStopped, stateRemoved)
s.state.Lock()
defer s.state.Unlock()
if s.state.v == stateStopped {
s.state.v = stateRemoved
changed = true
}
return
}

func (s *regionFeedState) isStale() bool {
state := s.state.Load()
return state == stateStopped || state == stateRemoved
s.state.RLock()
defer s.state.RUnlock()
return s.state.v == stateStopped || s.state.v == stateRemoved
}

func (s *regionFeedState) takeError() (err error) {
s.err.Lock()
defer s.err.Unlock()
err = s.err.e
s.err.e = nil
s.state.Lock()
defer s.state.Unlock()
err = s.state.err
s.state.err = nil
return
}

Expand Down
13 changes: 9 additions & 4 deletions cdc/kv/shared_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,22 @@ import (
)

func TestRequestedStreamRequestedRegions(t *testing.T) {
stream := &requestedStream{streamID: 100, requests: chann.NewAutoDrainChann[singleRegionInfo]()}
defer stream.requests.CloseAndDrain()
stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState)
stream := newRequestedStream(100)

require.Nil(t, stream.getState(1, 2))
require.Nil(t, stream.takeState(1, 2))

stream.setState(1, 2, &regionFeedState{sri: singleRegionInfo{requestedTable: &requestedTable{}}})
stream.setState(1, 2, &regionFeedState{})
require.NotNil(t, stream.getState(1, 2))
require.NotNil(t, stream.takeState(1, 2))
require.Nil(t, stream.getState(1, 2))
require.Equal(t, 0, len(stream.requestedRegions.m))

stream.setState(1, 2, &regionFeedState{})
require.NotNil(t, stream.getState(1, 2))
require.NotNil(t, stream.takeState(1, 2))
require.Nil(t, stream.getState(1, 2))
require.Equal(t, 0, len(stream.requestedRegions.m))
}

func TestRequestedTable(t *testing.T) {
Expand Down
29 changes: 18 additions & 11 deletions cdc/kv/shared_region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,20 @@ func (w *sharedRegionWorker) run(ctx context.Context) error {
}

func (w *sharedRegionWorker) handleSingleRegionError(state *regionFeedState, stream *requestedStream) {
if stream != nil {
// stream can be nil if it's obviously unnecessary to re-schedule the region.
stream.takeState(SubscriptionID(state.requestID), state.getRegionID())
stepsToRemoved := state.markRemoved()
err := state.takeError()
if err != nil {
log.Info("region worker get a region error",
zap.String("namespace", w.changefeed.Namespace),
zap.String("changefeed", w.changefeed.ID),
zap.Uint64("streamID", stream.streamID),
zap.Any("subscriptionID", state.getRegionID()),
zap.Uint64("regionID", state.sri.verID.GetID()),
zap.Bool("reschedule", stepsToRemoved),
zap.Error(err))
}
if state.markRemoved() {
// For SharedClient and SharedWorker, err will never be nil.
err := state.takeError()
if stepsToRemoved {
stream.takeState(SubscriptionID(state.requestID), state.getRegionID())
w.client.onRegionFail(newRegionErrorInfo(state.getRegionInfo(), err))
}
}
Expand All @@ -128,16 +135,16 @@ func (w *sharedRegionWorker) processEvent(ctx context.Context, event statefulEve
w.handleSingleRegionError(state, event.stream)
return
}
case *cdcpb.Event_Admin_:
case *cdcpb.Event_Error:
state.markStopped(&eventError{err: x.Error})
w.handleSingleRegionError(state, event.stream)
return
case *cdcpb.Event_ResolvedTs:
w.handleResolvedTs(ctx, resolvedTsBatch{
ts: x.ResolvedTs,
regions: []*regionFeedState{state},
})
case *cdcpb.Event_Error:
state.markStopped(&eventError{err: x.Error})
w.handleSingleRegionError(state, event.stream)
return
case *cdcpb.Event_Admin_:
}
} else if len(event.resolvedTsBatch.regions) > 0 {
w.handleResolvedTs(ctx, event.resolvedTsBatch)
Expand Down
36 changes: 28 additions & 8 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,8 @@ type tableExclusive struct {
}

func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *requestedStore) *requestedStream {
stream := &requestedStream{
streamID: streamIDGen.Add(1),
requests: chann.NewAutoDrainChann[singleRegionInfo](),
}
stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState)
stream := newRequestedStream(streamIDGen.Add(1))
stream.requests = chann.NewAutoDrainChann[singleRegionInfo]()

waitForPreFetching := func() error {
if stream.preFetchForConnecting != nil {
Expand Down Expand Up @@ -96,8 +93,8 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
for _, m := range stream.clearStates() {
for _, state := range m {
state.markStopped(&sendRequestToStoreErr{})
sfEvent := newEventItem(nil, state, stream)
slot := hashRegionID(state.sri.verID.GetID(), len(c.workers))
sfEvent := statefulEvent{eventItem: eventItem{state: state}}
_ = c.workers[slot].sendEvent(ctx, sfEvent)
}
}
Expand All @@ -119,6 +116,12 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
return stream
}

func newRequestedStream(streamID uint64) *requestedStream {
stream := &requestedStream{streamID: streamID}
stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState)
return stream
}

func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requestedStore) (canceled bool) {
isCanceled := func() bool {
select {
Expand Down Expand Up @@ -352,8 +355,8 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request
// and then those regions from the bad requestID will be unsubscribed finally.
for _, state := range s.takeStates(subscriptionID) {
state.markStopped(&sendRequestToStoreErr{})
sfEvent := newEventItem(nil, state, s)
slot := hashRegionID(state.sri.verID.GetID(), len(c.workers))
sfEvent := statefulEvent{eventItem: eventItem{state: state}}
if err = c.workers[slot].sendEvent(ctx, sfEvent); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -423,6 +426,9 @@ func (s *requestedStream) takeState(subscriptionID SubscriptionID, regionID uint
if m, ok := s.requestedRegions.m[subscriptionID]; ok {
state = m[regionID]
delete(m, regionID)
if len(m) == 0 {
delete(s.requestedRegions.m, subscriptionID)
}
}
return
}
Expand Down Expand Up @@ -468,7 +474,21 @@ func (s *requestedStream) sendRegionChangeEvents(
} else {
subscriptionID = tableSubID
}
if state := s.getState(subscriptionID, regionID); state != nil {

state := s.getState(subscriptionID, regionID)
switch x := event.Event.(type) {
case *cdcpb.Event_Error:
log.Info("event feed receives a region error",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", event.RegionId),
zap.Bool("stateIsNil", state == nil),
zap.Any("error", x.Error))
}

if state != nil {
sfEvent := newEventItem(event, state, s)
slot := hashRegionID(regionID, len(c.workers))
if err := c.workers[slot].sendEvent(ctx, sfEvent); err != nil {
Expand Down
Loading