From 06c6ae27c6af930ea303ab45356fa7d52e818dca Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 8 Nov 2023 14:01:23 +0800 Subject: [PATCH 1/3] init Signed-off-by: qupeng --- cdc/kv/client.go | 32 +++--- cdc/kv/region_state.go | 116 ++++++++++++------- cdc/kv/region_worker.go | 29 ++--- cdc/kv/regionlock/region_range_lock.go | 148 +++++++++++++++++++++---- 4 files changed, 228 insertions(+), 97 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 8805713e234..a5e322b42c8 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -375,9 +375,9 @@ func newEventFeedSession( startTs uint64, eventCh chan<- model.RegionFeedEvent, ) *eventFeedSession { - id := strconv.FormatUint(allocID(), 10) + id := allocID() rangeLock := regionlock.NewRegionRangeLock( - totalSpan.StartKey, totalSpan.EndKey, startTs, + id, totalSpan.StartKey, totalSpan.EndKey, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) return &eventFeedSession{ client: client, @@ -390,7 +390,7 @@ func newEventFeedSession( eventCh: eventCh, rangeLock: rangeLock, lockResolver: lockResolver, - id: id, + id: strconv.FormatUint(id, 10), regionChSizeGauge: clientChannelSize.WithLabelValues("region"), errChSizeGauge: clientChannelSize.WithLabelValues("err"), rangeChSizeGauge: clientChannelSize.WithLabelValues("range"), @@ -501,7 +501,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single handleResult := func(res regionlock.LockRangeResult) { switch res.Status { case regionlock.LockRangeStatusSuccess: - sri.resolvedTs = res.CheckpointTs + sri.lockedRange = res.LockedRange select { case s.regionCh.In() <- sri: s.regionChSizeGauge.Inc() @@ -513,12 +513,12 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single zap.String("changefeed", s.changefeed.ID), zap.Uint64("regionID", sri.verID.GetID()), zap.Stringer("span", &sri.span), - zap.Uint64("resolvedTs", sri.resolvedTs), + zap.Uint64("resolvedTs", sri.resolvedTs()), zap.Any("retrySpans", res.RetryRanges)) for _, r := range res.RetryRanges { // This call is always blocking, otherwise if scheduling in a new // goroutine, it won't block the caller of `schedulerRegionRequest`. - s.scheduleDivideRegionAndRequest(ctx, r, sri.resolvedTs) + s.scheduleDivideRegionAndRequest(ctx, r, sri.resolvedTs()) } case regionlock.LockRangeStatusCancel: return @@ -533,7 +533,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single // short sleep to wait region has split time.Sleep(time.Second) s.rangeLock.UnlockRange(sri.span.StartKey, sri.span.EndKey, - sri.verID.GetID(), sri.verID.GetVer(), sri.resolvedTs) + sri.verID.GetID(), sri.verID.GetVer(), sri.resolvedTs()) regionNum := val.(int) retryRanges := make([]tablepb.Span, 0, regionNum) start := []byte("a") @@ -561,7 +561,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single // CAUTION: Note that this should only be called in a context that the region has locked its range. func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo) { s.rangeLock.UnlockRange(errorInfo.span.StartKey, errorInfo.span.EndKey, - errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs) + errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs()) log.Info("region failed", zap.Stringer("span", &errorInfo.span), zap.Any("regionId", errorInfo.verID.GetID()), zap.Error(errorInfo.err)) @@ -611,7 +611,7 @@ func (s *eventFeedSession) requestRegionToStore( RegionId: regionID, RequestId: requestID, RegionEpoch: regionEpoch, - CheckpointTs: sri.resolvedTs, + CheckpointTs: sri.resolvedTs(), StartKey: sri.span.StartKey, EndKey: sri.span.EndKey, ExtraOp: extraOp, @@ -782,7 +782,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { Region: sri.verID.GetID(), }, }, - ResolvedTs: sri.resolvedTs, + ResolvedTs: sri.resolvedTs(), }, } select { @@ -804,7 +804,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { zap.String("tableName", s.tableName), zap.Uint64("regionID", sri.verID.GetID()), zap.Stringer("span", &sri.span), - zap.Uint64("resolvedTs", sri.resolvedTs)) + zap.Uint64("resolvedTs", sri.resolvedTs())) errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID}) s.onRegionFail(ctx, errInfo) continue @@ -870,7 +870,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( // the End key return by the PD API will be nil to represent the biggest key, partialSpan = spanz.HackSpan(partialSpan) - sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, ts, nil) + sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, nil) s.scheduleRegionRequest(ctx, sri) // return if no more regions if spanz.EndCompare(nextSpan.StartKey, span.EndKey) >= 0 { @@ -895,11 +895,11 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI } else if innerErr.GetEpochNotMatch() != nil { // TODO: If only confver is updated, we don't need to reload the region from region cache. metricFeedEpochNotMatchCounter.Inc() - s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs) + s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs()) return nil } else if innerErr.GetRegionNotFound() != nil { metricFeedRegionNotFoundCounter.Inc() - s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs) + s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs()) return nil } else if duplicatedRequest := innerErr.GetDuplicateRequest(); duplicatedRequest != nil { metricFeedDuplicateRequestCounter.Inc() @@ -929,7 +929,7 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI } case *rpcCtxUnavailableErr: metricFeedRPCCtxUnavailable.Inc() - s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs) + s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs()) return nil case *connectToStoreErr: metricConnectToStoreErr.Inc() @@ -1206,7 +1206,7 @@ func (s *eventFeedSession) sendRegionChangeEvents( } state.start() worker.setRegionState(event.RegionId, state) - } else if state.isStopped() { + } else if state.isStale() { log.Warn("drop event due to region feed stopped", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), diff --git a/cdc/kv/region_state.go b/cdc/kv/region_state.go index 52113f6c970..65518d24ca6 100644 --- a/cdc/kv/region_state.go +++ b/cdc/kv/region_state.go @@ -16,9 +16,8 @@ package kv import ( "runtime" "sync" - "sync/atomic" - "time" + "github.com/pingcap/tiflow/cdc/kv/regionlock" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/tikv/client-go/v2/tikv" ) @@ -26,68 +25,107 @@ import ( const ( minRegionStateBucket = 4 maxRegionStateBucket = 16 + + stateNormal uint32 = 0 + stateStopped uint32 = 1 + stateRemoved uint32 = 2 ) type singleRegionInfo struct { - verID tikv.RegionVerID - span tablepb.Span - resolvedTs uint64 - rpcCtx *tikv.RPCContext + verID tikv.RegionVerID + span tablepb.Span + rpcCtx *tikv.RPCContext + + lockedRange *regionlock.LockedRange } func newSingleRegionInfo( verID tikv.RegionVerID, span tablepb.Span, - ts uint64, rpcCtx *tikv.RPCContext, ) singleRegionInfo { return singleRegionInfo{ - verID: verID, - span: span, - resolvedTs: ts, - rpcCtx: rpcCtx, + verID: verID, + span: span, + rpcCtx: rpcCtx, } } +func (s singleRegionInfo) resolvedTs() uint64 { + return s.lockedRange.CheckpointTs.Load() +} + type regionFeedState struct { sri singleRegionInfo requestID uint64 - stopped int32 - - initialized atomic.Bool - matcher *matcher - startFeedTime time.Time - lastResolvedTs uint64 + matcher *matcher + + // Transform: normal -> stopped -> removed. + // normal: the region is in replicating. + // stopped: some error happens. + // removed: the region is returned into the pending list, + // will be re-resolved and re-scheduled later. + state struct { + sync.RWMutex + v uint32 + // All region errors should be handled in region workers. + // `err` is used to retrieve errors generated outside. + err error + } } func newRegionFeedState(sri singleRegionInfo, requestID uint64) *regionFeedState { return ®ionFeedState{ sri: sri, requestID: requestID, - stopped: 0, } } func (s *regionFeedState) start() { - s.startFeedTime = time.Now() - s.lastResolvedTs = s.sri.resolvedTs s.matcher = newMatcher() } -func (s *regionFeedState) markStopped() { - atomic.StoreInt32(&s.stopped, 1) +// mark regionFeedState as stopped with the given error if possible. +func (s *regionFeedState) markStopped(err error) { + s.state.Lock() + defer s.state.Unlock() + if s.state.v == stateNormal { + s.state.v = stateStopped + s.state.err = err + } +} + +// mark regionFeedState as removed if possible. +func (s *regionFeedState) markRemoved() (changed bool) { + s.state.Lock() + defer s.state.Unlock() + if s.state.v == stateStopped { + s.state.v = stateRemoved + changed = true + } + return +} + +func (s *regionFeedState) isStale() bool { + s.state.RLock() + defer s.state.RUnlock() + return s.state.v == stateStopped || s.state.v == stateRemoved } -func (s *regionFeedState) isStopped() bool { - return atomic.LoadInt32(&s.stopped) > 0 +func (s *regionFeedState) takeError() (err error) { + s.state.Lock() + defer s.state.Unlock() + err = s.state.err + s.state.err = nil + return } func (s *regionFeedState) isInitialized() bool { - return s.initialized.Load() + return s.sri.lockedRange.Initialzied.Load() } func (s *regionFeedState) setInitialized() { - s.initialized.Store(true) + s.sri.lockedRange.Initialzied.Store(true) } func (s *regionFeedState) getRegionID() uint64 { @@ -95,31 +133,29 @@ func (s *regionFeedState) getRegionID() uint64 { } func (s *regionFeedState) getLastResolvedTs() uint64 { - return atomic.LoadUint64(&s.lastResolvedTs) + return s.sri.lockedRange.CheckpointTs.Load() } // updateResolvedTs update the resolved ts of the current region feed func (s *regionFeedState) updateResolvedTs(resolvedTs uint64) { - if resolvedTs > s.getLastResolvedTs() { - atomic.StoreUint64(&s.lastResolvedTs, resolvedTs) - } -} - -// setRegionInfoResolvedTs is only called when the region disconnect, -// to update the `singleRegionInfo` which is reused by reconnect. -func (s *regionFeedState) setRegionInfoResolvedTs() { - if s.getLastResolvedTs() <= s.sri.resolvedTs { - return + state := s.sri.lockedRange + for { + last := state.CheckpointTs.Load() + if last > resolvedTs { + return + } + if state.CheckpointTs.CompareAndSwap(last, resolvedTs) { + break + } } - s.sri.resolvedTs = s.lastResolvedTs } func (s *regionFeedState) getRegionInfo() singleRegionInfo { return s.sri } -func (s *regionFeedState) getRegionMeta() (uint64, tablepb.Span, time.Time, string) { - return s.sri.verID.GetID(), s.sri.span, s.startFeedTime, s.sri.rpcCtx.Addr +func (s *regionFeedState) getRegionMeta() (uint64, tablepb.Span, string) { + return s.sri.verID.GetID(), s.sri.span, s.sri.rpcCtx.Addr } type syncRegionFeedStateMap struct { diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index d41d55f8355..db386c6dfc0 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -193,7 +193,6 @@ func (w *regionWorker) checkShouldExit() error { } func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState) error { - state.setRegionInfoResolvedTs() regionID := state.getRegionID() log.Info("single region event feed disconnected", zap.String("namespace", w.session.client.changefeed.Namespace), @@ -201,14 +200,14 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState zap.Uint64("regionID", regionID), zap.Uint64("requestID", state.requestID), zap.Stringer("span", &state.sri.span), - zap.Uint64("resolvedTs", state.sri.resolvedTs), + zap.Uint64("resolvedTs", state.sri.resolvedTs()), zap.Error(err)) // if state is already marked stopped, it must have been or would be processed by `onRegionFail` - if state.isStopped() { + if state.isStale() { return w.checkShouldExit() } // We need to ensure when the error is handled, `isStopped` must be set. So set it before sending the error. - state.markStopped() + state.markStopped(err) w.delRegionState(regionID) failpoint.Inject("kvClientSingleFeedProcessDelay", nil) @@ -295,7 +294,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { maxVersion := oracle.ComposeTS(oracle.GetPhysical(currentTimeFromPD.Add(-10*time.Second)), 0) for _, rts := range expired { state, ok := w.getRegionState(rts.regionID) - if !ok || state.isStopped() { + if !ok || state.isStale() { // state is already deleted or stopped, just continue, // and don't need to push resolved ts back to heap. continue @@ -356,7 +355,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { func (w *regionWorker) processEvent(ctx context.Context, event *regionStatefulEvent) error { // event.state is nil when resolvedTsEvent is not nil - skipEvent := event.state != nil && event.state.isStopped() + skipEvent := event.state != nil && event.state.isStale() if skipEvent { return nil } @@ -616,7 +615,7 @@ func (w *regionWorker) handleEventEntry( x *cdcpb.Event_Entries_, state *regionFeedState, ) error { - regionID, regionSpan, startTime, _ := state.getRegionMeta() + regionID, regionSpan, _ := state.getRegionMeta() for _, entry := range x.Entries.GetEntries() { // if a region with kv range [a, z), and we only want the get [b, c) from this region, // tikv will return all key events in the region, although specified [b, c) int the request. @@ -630,13 +629,6 @@ func (w *regionWorker) handleEventEntry( } switch entry.Type { case cdcpb.Event_INITIALIZED: - if time.Since(startTime) > 20*time.Second { - log.Warn("The time cost of initializing is too much", - zap.String("namespace", w.session.client.changefeed.Namespace), - zap.String("changefeed", w.session.client.changefeed.ID), - zap.Duration("duration", time.Since(startTime)), - zap.Uint64("regionID", regionID)) - } w.metrics.metricPullEventInitializedCounter.Inc() state.setInitialized() @@ -743,7 +735,7 @@ func (w *regionWorker) handleResolvedTs( regions := make([]uint64, 0, len(revents.regions)) for _, state := range revents.regions { - if state.isStopped() || !state.isInitialized() { + if state.isStale() || !state.isInitialized() { continue } regionID := state.getRegionID() @@ -778,7 +770,7 @@ func (w *regionWorker) handleResolvedTs( default: } for _, state := range revents.regions { - if state.isStopped() || !state.isInitialized() { + if state.isStale() || !state.isInitialized() { continue } state.updateResolvedTs(resolvedTs) @@ -804,10 +796,10 @@ func (w *regionWorker) evictAllRegions() { for _, states := range w.statesManager.states { deletes = deletes[:0] states.iter(func(regionID uint64, regionState *regionFeedState) bool { - if regionState.isStopped() { + if regionState.isStale() { return true } - regionState.markStopped() + regionState.markStopped(nil) deletes = append(deletes, struct { regionID uint64 regionState *regionFeedState @@ -818,7 +810,6 @@ func (w *regionWorker) evictAllRegions() { }) for _, del := range deletes { w.delRegionState(del.regionID) - del.regionState.setRegionInfoResolvedTs() // since the context used in region worker will be cancelled after // region worker exits, we must use the parent context to prevent // regionErrorInfo loss. diff --git a/cdc/kv/regionlock/region_range_lock.go b/cdc/kv/regionlock/region_range_lock.go index 1fa5762173b..3c375f843df 100644 --- a/cdc/kv/regionlock/region_range_lock.go +++ b/cdc/kv/regionlock/region_range_lock.go @@ -21,10 +21,12 @@ import ( "math" "sync" "sync/atomic" + "time" "github.com/google/btree" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/spanz" "go.uber.org/zap" ) @@ -116,6 +118,7 @@ type rangeLockEntry struct { regionID uint64 version uint64 waiters []chan<- interface{} + state LockedRange } func rangeLockEntryWithKey(key []byte) *rangeLockEntry { @@ -137,36 +140,36 @@ func (e *rangeLockEntry) String() string { len(e.waiters)) } -var currentID uint64 = 0 - -func allocID() uint64 { - return atomic.AddUint64(¤tID, 1) -} - // RegionRangeLock is specifically used for kv client to manage exclusive region ranges. Acquiring lock will be blocked // if part of its range is already locked. It also manages checkpoint ts of all ranges. The ranges are marked by a // version number, which should comes from the Region's Epoch version. The version is used to compare which range is // new and which is old if two ranges are overlapping. type RegionRangeLock struct { + // ID to identify different RegionRangeLock instances, so logs of different instances can be distinguished. + id uint64 + totalSpan tablepb.Span changefeedLogInfo string + mu sync.Mutex rangeCheckpointTs *rangeTsMap rangeLock *btree.BTreeG[*rangeLockEntry] regionIDLock map[uint64]*rangeLockEntry - // ID to identify different RegionRangeLock instances, so logs of different instances can be distinguished. - id uint64 + stopped bool + refCount uint64 } // NewRegionRangeLock creates a new RegionRangeLock. func NewRegionRangeLock( + id uint64, startKey, endKey []byte, startTs uint64, changefeedLogInfo string, ) *RegionRangeLock { return &RegionRangeLock{ + id: id, + totalSpan: tablepb.Span{StartKey: startKey, EndKey: endKey}, changefeedLogInfo: changefeedLogInfo, rangeCheckpointTs: newRangeTsMap(startKey, endKey, startTs), rangeLock: btree.NewG(16, rangeLockEntryLess), regionIDLock: make(map[uint64]*rangeLockEntry), - id: allocID(), } } @@ -208,6 +211,9 @@ func (l *RegionRangeLock) getOverlappedEntries(startKey, endKey []byte, regionID func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, version uint64) (LockRangeResult, []<-chan interface{}) { l.mu.Lock() defer l.mu.Unlock() + if l.stopped { + return LockRangeResult{Status: LockRangeStatusCancel}, nil + } overlappingEntries := l.getOverlappedEntries(startKey, endKey, regionID) @@ -219,6 +225,8 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio regionID: regionID, version: version, } + newEntry.state.CheckpointTs.Store(checkpointTs) + newEntry.state.Created = time.Now() l.rangeLock.ReplaceOrInsert(newEntry) l.regionIDLock[regionID] = newEntry @@ -230,9 +238,11 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey))) + l.refCount += 1 return LockRangeResult{ Status: LockRangeStatusSuccess, CheckpointTs: checkpointTs, + LockedRange: &newEntry.state, }, nil } @@ -308,7 +318,9 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio } // LockRange locks a range with specified version. -func (l *RegionRangeLock) LockRange(ctx context.Context, startKey, endKey []byte, regionID, version uint64) LockRangeResult { +func (l *RegionRangeLock) LockRange( + ctx context.Context, startKey, endKey []byte, regionID, version uint64, +) LockRangeResult { res, signalChs := l.tryLockRange(startKey, endKey, regionID, version) if res.Status != LockRangeStatusWait { @@ -337,22 +349,23 @@ func (l *RegionRangeLock) LockRange(ctx context.Context, startKey, endKey []byte } // UnlockRange unlocks a range and update checkpointTs of the range to specified value. -func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version uint64, checkpointTs uint64) { +// If it returns true it means it is stopped and all ranges are unlocked correctly. +func (l *RegionRangeLock) UnlockRange( + startKey, endKey []byte, regionID, version uint64, + checkpointTs ...uint64, +) (drained bool) { l.mu.Lock() defer l.mu.Unlock() entry, ok := l.rangeLock.Get(rangeLockEntryWithKey(startKey)) - if !ok { log.Panic("unlocking a not locked range", zap.String("changefeed", l.changefeedLogInfo), zap.Uint64("regionID", regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), - zap.Uint64("version", version), - zap.Uint64("checkpointTs", checkpointTs)) + zap.Uint64("version", version)) } - if entry.regionID != regionID { log.Panic("unlocked a range but regionID mismatch", zap.String("changefeed", l.changefeedLogInfo), @@ -369,6 +382,8 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version zap.String("regionIDLockEntry", l.regionIDLock[regionID].String())) } delete(l.regionIDLock, regionID) + l.refCount -= 1 + drained = l.stopped && l.refCount == 0 if entry.version != version || !bytes.Equal(entry.endKey, endKey) { log.Panic("unlocking region doesn't match the locked region", @@ -377,7 +392,6 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), zap.Uint64("version", version), - zap.Uint64("checkpointTs", checkpointTs), zap.String("foundLockEntry", entry.String())) } @@ -385,17 +399,40 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version ch <- nil } - _, ok = l.rangeLock.Delete(entry) - if !ok { + if entry, ok = l.rangeLock.Delete(entry); !ok { panic("unreachable") } - l.rangeCheckpointTs.Set(startKey, endKey, checkpointTs) + + var newCheckpointTs uint64 + if len(checkpointTs) > 0 { + newCheckpointTs = checkpointTs[0] + } else { + newCheckpointTs = entry.state.CheckpointTs.Load() + } + + l.rangeCheckpointTs.Set(startKey, endKey, newCheckpointTs) log.Debug("unlocked range", zap.String("changefeed", l.changefeedLogInfo), zap.Uint64("lockID", l.id), zap.Uint64("regionID", entry.regionID), - zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("checkpointTs", newCheckpointTs), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey))) + return +} + +// RefCount returns how many ranges are locked. +func (l *RegionRangeLock) RefCount() uint64 { + l.mu.Lock() + defer l.mu.Unlock() + return l.refCount +} + +// Stop stops the instance. +func (l *RegionRangeLock) Stop() (drained bool) { + l.mu.Lock() + defer l.mu.Unlock() + l.stopped = true + return l.stopped && l.refCount == 0 } const ( @@ -410,15 +447,82 @@ const ( ) // LockRangeResult represents the result of LockRange method of RegionRangeLock. -// If Status is LockRangeStatusSuccess, the CheckpointTs field will be the minimal checkpoint ts among the locked -// range. +// If Status is LockRangeStatusSuccess: +// - CheckpointTs will be the minimal checkpoint ts among the locked range; +// - LockedRange is for recording real-time state changes; +// // If Status is LockRangeStatusWait, it means the lock cannot be acquired immediately. WaitFn must be invoked to // continue waiting and acquiring the lock. +// // If Status is LockRangeStatusStale, it means the LockRange request is stale because there's already a overlapping // locked range, whose version is greater or equals to the requested one. type LockRangeResult struct { Status int CheckpointTs uint64 + LockedRange *LockedRange WaitFn func() LockRangeResult RetryRanges []tablepb.Span } + +// LockedRange is returned by `RegionRangeLock.LockRange`, which can be used to +// collect informations for the range. And collected informations can be accessed +// by iterating `RegionRangeLock`. +type LockedRange struct { + CheckpointTs atomic.Uint64 + Initialzied atomic.Bool + Created time.Time +} + +// CollectLockedRangeAttrs collects locked range attributes. +func (l *RegionRangeLock) CollectLockedRangeAttrs( + action func(regionID uint64, state *LockedRange), +) (r CollectedLockedRangeAttrs) { + l.mu.Lock() + defer l.mu.Unlock() + r.FastestRegion.CheckpointTs = 0 + r.SlowestRegion.CheckpointTs = math.MaxUint64 + + lastEnd := l.totalSpan.StartKey + l.rangeLock.Ascend(func(item *rangeLockEntry) bool { + if action != nil { + action(item.regionID, &item.state) + } + if spanz.EndCompare(lastEnd, item.startKey) < 0 { + r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: item.startKey}) + } + ckpt := item.state.CheckpointTs.Load() + if ckpt > r.FastestRegion.CheckpointTs { + r.FastestRegion.RegionID = item.regionID + r.FastestRegion.CheckpointTs = ckpt + r.FastestRegion.Initialized = item.state.Initialzied.Load() + r.FastestRegion.Created = item.state.Created + } + if ckpt < r.SlowestRegion.CheckpointTs { + r.SlowestRegion.RegionID = item.regionID + r.SlowestRegion.CheckpointTs = ckpt + r.SlowestRegion.Initialized = item.state.Initialzied.Load() + r.SlowestRegion.Created = item.state.Created + } + lastEnd = item.endKey + return true + }) + if spanz.EndCompare(lastEnd, l.totalSpan.EndKey) < 0 { + r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: l.totalSpan.EndKey}) + } + return +} + +// CollectedLockedRangeAttrs returns by `RegionRangeLock.CollectedLockedRangeAttrs`. +type CollectedLockedRangeAttrs struct { + Holes []tablepb.Span + FastestRegion LockedRangeAttrs + SlowestRegion LockedRangeAttrs +} + +// LockedRangeAttrs is like `LockedRange`, but only contains some read-only attributes. +type LockedRangeAttrs struct { + RegionID uint64 + CheckpointTs uint64 + Initialized bool + Created time.Time +} From 92161676d6a6d2aac43c6f1dc1f34f8e01a09afe Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 8 Nov 2023 15:09:16 +0800 Subject: [PATCH 2/3] pick more to 7.1 Signed-off-by: qupeng --- cdc/kv/client.go | 116 +++++++++++++++---- cdc/kv/region_state_bench_test.go | 12 +- cdc/kv/region_worker.go | 121 +++++++++++--------- cdc/kv/region_worker_test.go | 40 ++++--- cdc/kv/regionlock/region_range_lock_test.go | 8 +- 5 files changed, 194 insertions(+), 103 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index a5e322b42c8..a419c787ecd 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -19,6 +19,7 @@ import ( "io" "math/rand" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -42,6 +43,7 @@ import ( "github.com/pingcap/tiflow/pkg/version" "github.com/prometheus/client_golang/prometheus" tidbkv "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -271,7 +273,7 @@ func (c *CDCClient) EventFeed( eventCh chan<- model.RegionFeedEvent, ) error { s := newEventFeedSession(c, span, lockResolver, ts, eventCh) - return s.eventFeed(ctx, ts) + return s.eventFeed(ctx) } // RegionCount returns the number of captured regions. @@ -365,7 +367,6 @@ type eventFeedSession struct { type rangeRequestTask struct { span tablepb.Span - ts uint64 } func newEventFeedSession( @@ -376,6 +377,7 @@ func newEventFeedSession( eventCh chan<- model.RegionFeedEvent, ) *eventFeedSession { id := allocID() + idStr := strconv.FormatUint(id, 10) rangeLock := regionlock.NewRegionRangeLock( id, totalSpan.StartKey, totalSpan.EndKey, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) @@ -390,7 +392,7 @@ func newEventFeedSession( eventCh: eventCh, rangeLock: rangeLock, lockResolver: lockResolver, - id: strconv.FormatUint(id, 10), + id: idStr, regionChSizeGauge: clientChannelSize.WithLabelValues("region"), errChSizeGauge: clientChannelSize.WithLabelValues("err"), rangeChSizeGauge: clientChannelSize.WithLabelValues("range"), @@ -406,7 +408,7 @@ func newEventFeedSession( } } -func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { +func (s *eventFeedSession) eventFeed(ctx context.Context) error { s.requestRangeCh = chann.NewAutoDrainChann[rangeRequestTask]() s.regionCh = chann.NewAutoDrainChann[singleRegionInfo]() s.regionRouter = chann.NewAutoDrainChann[singleRegionInfo]() @@ -423,13 +425,11 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - return s.dispatchRequest(ctx) - }) + g.Go(func() error { return s.dispatchRequest(ctx) }) - g.Go(func() error { - return s.requestRegionToStore(ctx, g) - }) + g.Go(func() error { return s.requestRegionToStore(ctx, g) }) + + g.Go(func() error { return s.logSlowRegions(ctx) }) g.Go(func() error { for { @@ -447,7 +447,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { // Besides the count or frequency of range request is limited, // we use ephemeral goroutine instead of permanent goroutine. g.Go(func() error { - return s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts) + return s.divideAndSendEventFeedToRegions(ctx, task.span) }) } } @@ -468,7 +468,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { } }) - s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan, ts: ts} + s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan} s.rangeChSizeGauge.Inc() log.Info("event feed started", @@ -476,7 +476,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), - zap.Uint64("startTs", ts), + zap.Uint64("startTs", s.startTs), zap.Stringer("span", &s.totalSpan)) return g.Wait() @@ -485,9 +485,9 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { // scheduleDivideRegionAndRequest schedules a range to be divided by regions, // and these regions will be then scheduled to send ChangeData requests. func (s *eventFeedSession) scheduleDivideRegionAndRequest( - ctx context.Context, span tablepb.Span, ts uint64, + ctx context.Context, span tablepb.Span, ) { - task := rangeRequestTask{span: span, ts: ts} + task := rangeRequestTask{span: span} select { case s.requestRangeCh.In() <- task: s.rangeChSizeGauge.Inc() @@ -513,12 +513,11 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single zap.String("changefeed", s.changefeed.ID), zap.Uint64("regionID", sri.verID.GetID()), zap.Stringer("span", &sri.span), - zap.Uint64("resolvedTs", sri.resolvedTs()), zap.Any("retrySpans", res.RetryRanges)) for _, r := range res.RetryRanges { // This call is always blocking, otherwise if scheduling in a new // goroutine, it won't block the caller of `schedulerRegionRequest`. - s.scheduleDivideRegionAndRequest(ctx, r, sri.resolvedTs()) + s.scheduleDivideRegionAndRequest(ctx, r) } case regionlock.LockRangeStatusCancel: return @@ -529,11 +528,12 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single res := s.rangeLock.LockRange( ctx, sri.span.StartKey, sri.span.EndKey, sri.verID.GetID(), sri.verID.GetVer()) + failpoint.Inject("kvClientMockRangeLock", func(val failpoint.Value) { // short sleep to wait region has split time.Sleep(time.Second) s.rangeLock.UnlockRange(sri.span.StartKey, sri.span.EndKey, - sri.verID.GetID(), sri.verID.GetVer(), sri.resolvedTs()) + sri.verID.GetID(), sri.verID.GetVer()) regionNum := val.(int) retryRanges := make([]tablepb.Span, 0, regionNum) start := []byte("a") @@ -689,13 +689,13 @@ func (s *eventFeedSession) requestRegionToStore( state := newRegionFeedState(sri, requestID) pendingRegions.setByRequestID(requestID, state) - log.Debug("start new request", + log.Info("start new request", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), - zap.String("addr", storeAddr), - zap.Any("request", req)) + zap.Uint64("regionID", sri.verID.GetID()), + zap.String("addr", storeAddr)) err = stream.client.Send(req) @@ -818,7 +818,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { // to region boundaries. When region merging happens, it's possible that it // will produce some overlapping spans. func (s *eventFeedSession) divideAndSendEventFeedToRegions( - ctx context.Context, span tablepb.Span, ts uint64, + ctx context.Context, span tablepb.Span, ) error { limit := 20 nextSpan := span @@ -889,17 +889,24 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI switch eerr := errors.Cause(err).(type) { case *eventError: innerErr := eerr.err + log.Info("cdc region error", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Stringer("error", innerErr)) + if notLeader := innerErr.GetNotLeader(); notLeader != nil { metricFeedNotLeaderCounter.Inc() s.client.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx) } else if innerErr.GetEpochNotMatch() != nil { // TODO: If only confver is updated, we don't need to reload the region from region cache. metricFeedEpochNotMatchCounter.Inc() - s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs()) + s.scheduleDivideRegionAndRequest(ctx, errInfo.span) return nil } else if innerErr.GetRegionNotFound() != nil { metricFeedRegionNotFoundCounter.Inc() - s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs()) + s.scheduleDivideRegionAndRequest(ctx, errInfo.span) return nil } else if duplicatedRequest := innerErr.GetDuplicateRequest(); duplicatedRequest != nil { metricFeedDuplicateRequestCounter.Inc() @@ -929,7 +936,7 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI } case *rpcCtxUnavailableErr: metricFeedRPCCtxUnavailable.Inc() - s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs()) + s.scheduleDivideRegionAndRequest(ctx, errInfo.span) return nil case *connectToStoreErr: metricConnectToStoreErr.Inc() @@ -1216,6 +1223,17 @@ func (s *eventFeedSession) sendRegionChangeEvents( continue } + switch x := event.Event.(type) { + case *cdcpb.Event_Error: + log.Info("event feed receives a region error", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("regionID", event.RegionId), + zap.Any("error", x.Error)) + } + slot := worker.inputCalcSlot(event.RegionId) statefulEvents[slot] = append(statefulEvents[slot], ®ionStatefulEvent{ changeEvent: event, @@ -1308,6 +1326,54 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can return } +func (s *eventFeedSession) logSlowRegions(ctx context.Context) error { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + + currTime, err := s.client.pdClock.CurrentTime() + if err != nil { + continue + } + attr := s.rangeLock.CollectLockedRangeAttrs(nil) + if attr.SlowestRegion.Initialized { + ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs) + if currTime.Sub(ckptTime) > 20*time.Second { + log.Info("event feed finds a slow region", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Any("slowRegion", attr.SlowestRegion)) + } + } else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute { + log.Info("event feed initializes a region too slow", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Any("slowRegion", attr.SlowestRegion)) + } + if len(attr.Holes) > 0 { + holes := make([]string, 0, len(attr.Holes)) + for _, hole := range attr.Holes { + holes = append(holes, fmt.Sprintf("[%s,%s)", hole.StartKey, hole.EndKey)) + } + log.Info("event feed holes exist", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.String("holes", strings.Join(holes, ", "))) + } + } +} + func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row) (model.RegionFeedEvent, error) { var opType model.OpType switch entry.GetOpType() { diff --git a/cdc/kv/region_state_bench_test.go b/cdc/kv/region_state_bench_test.go index 5b9fe592080..250f265d28c 100644 --- a/cdc/kv/region_state_bench_test.go +++ b/cdc/kv/region_state_bench_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/pingcap/tiflow/cdc/kv/regionlock" "github.com/pingcap/tiflow/pkg/spanz" "github.com/tikv/client-go/v2/tikv" ) @@ -40,9 +41,9 @@ func TestSyncRegionFeedStateMapConcurrentAccess(t *testing.T) { return default: } - m.setByRequestID(1, ®ionFeedState{}) - m.setByRequestID(2, ®ionFeedState{}) - m.setByRequestID(3, ®ionFeedState{}) + m.setByRequestID(1, ®ionFeedState{sri: singleRegionInfo{lockedRange: ®ionlock.LockedRange{}}}) + m.setByRequestID(2, ®ionFeedState{sri: singleRegionInfo{lockedRange: ®ionlock.LockedRange{}}}) + m.setByRequestID(3, ®ionFeedState{sri: singleRegionInfo{lockedRange: ®ionlock.LockedRange{}}}) } }() wg.Add(1) @@ -55,7 +56,7 @@ func TestSyncRegionFeedStateMapConcurrentAccess(t *testing.T) { default: } m.iter(func(requestID uint64, state *regionFeedState) bool { - _ = state.initialized.Load() + state.isInitialized() return true }) } @@ -118,7 +119,8 @@ func benchmarkGetRegionState(b *testing.B, bench func(b *testing.B, sm regionSta state := newRegionFeedState(newSingleRegionInfo( tikv.RegionVerID{}, spanz.ToSpan([]byte{}, spanz.UpperBoundKey), - 0, &tikv.RPCContext{}), 0) + &tikv.RPCContext{}), 0) + state.sri.lockedRange = ®ionlock.LockedRange{} regionCount := []int{100, 1000, 10000, 20000, 40000, 80000, 160000, 320000} for _, count := range regionCount { diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index db386c6dfc0..bdb2d788d9b 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -61,19 +61,18 @@ const ( ) type regionWorkerMetrics struct { - // kv events related metrics - metricReceivedEventSize prometheus.Observer - metricDroppedEventSize prometheus.Observer + metricReceivedEventSize prometheus.Observer + metricDroppedEventSize prometheus.Observer + metricPullEventInitializedCounter prometheus.Counter + metricPullEventCommittedCounter prometheus.Counter metricPullEventPrewriteCounter prometheus.Counter metricPullEventCommitCounter prometheus.Counter - metricPullEventCommittedCounter prometheus.Counter metricPullEventRollbackCounter prometheus.Counter - metricSendEventResolvedCounter prometheus.Counter - metricSendEventCommitCounter prometheus.Counter - metricSendEventCommittedCounter prometheus.Counter - // TODO: add region runtime related metrics + metricSendEventResolvedCounter prometheus.Counter + metricSendEventCommitCounter prometheus.Counter + metricSendEventCommittedCounter prometheus.Counter } /* @@ -114,22 +113,22 @@ type regionWorker struct { inputPending int32 } -func newRegionWorker( - ctx context.Context, changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, -) *regionWorker { +func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics { metrics := ®ionWorkerMetrics{} metrics.metricReceivedEventSize = eventSize.WithLabelValues("received") metrics.metricDroppedEventSize = eventSize.WithLabelValues("dropped") + metrics.metricPullEventInitializedCounter = pullEventCounter. WithLabelValues(cdcpb.Event_INITIALIZED.String(), changefeedID.Namespace, changefeedID.ID) metrics.metricPullEventCommittedCounter = pullEventCounter. WithLabelValues(cdcpb.Event_COMMITTED.String(), changefeedID.Namespace, changefeedID.ID) - metrics.metricPullEventCommitCounter = pullEventCounter. - WithLabelValues(cdcpb.Event_COMMIT.String(), changefeedID.Namespace, changefeedID.ID) metrics.metricPullEventPrewriteCounter = pullEventCounter. WithLabelValues(cdcpb.Event_PREWRITE.String(), changefeedID.Namespace, changefeedID.ID) + metrics.metricPullEventCommitCounter = pullEventCounter. + WithLabelValues(cdcpb.Event_COMMIT.String(), changefeedID.Namespace, changefeedID.ID) metrics.metricPullEventRollbackCounter = pullEventCounter. WithLabelValues(cdcpb.Event_ROLLBACK.String(), changefeedID.Namespace, changefeedID.ID) + metrics.metricSendEventResolvedCounter = sendEventCounter. WithLabelValues("native-resolved", changefeedID.Namespace, changefeedID.ID) metrics.metricSendEventCommitCounter = sendEventCounter. @@ -137,6 +136,12 @@ func newRegionWorker( metrics.metricSendEventCommittedCounter = sendEventCounter. WithLabelValues("committed", changefeedID.Namespace, changefeedID.ID) + return metrics +} + +func newRegionWorker( + ctx context.Context, changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, +) *regionWorker { return ®ionWorker{ parentCtx: ctx, session: s, @@ -148,7 +153,7 @@ func newRegionWorker( rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), storeAddr: addr, concurrency: s.client.config.KVClient.WorkerConcurrent, - metrics: metrics, + metrics: newRegionWorkerMetrics(changefeedID), inputPending: 0, } } @@ -194,6 +199,7 @@ func (w *regionWorker) checkShouldExit() error { func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState) error { regionID := state.getRegionID() + isStale := state.isStale() log.Info("single region event feed disconnected", zap.String("namespace", w.session.client.changefeed.Namespace), zap.String("changefeed", w.session.client.changefeed.ID), @@ -201,13 +207,14 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState zap.Uint64("requestID", state.requestID), zap.Stringer("span", &state.sri.span), zap.Uint64("resolvedTs", state.sri.resolvedTs()), + zap.Bool("isStale", isStale), zap.Error(err)) // if state is already marked stopped, it must have been or would be processed by `onRegionFail` - if state.isStale() { + if isStale { return w.checkShouldExit() } - // We need to ensure when the error is handled, `isStopped` must be set. So set it before sending the error. - state.markStopped(err) + // We need to ensure when the error is handled, `isStale` must be set. So set it before sending the error. + state.markStopped(nil) w.delRegionState(regionID) failpoint.Inject("kvClientSingleFeedProcessDelay", nil) @@ -335,7 +342,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { zap.Uint64("resolvedTs", lastResolvedTs), ) } - err = w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion) + err := w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion) if err != nil { log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), @@ -614,46 +621,51 @@ func (w *regionWorker) handleEventEntry( ctx context.Context, x *cdcpb.Event_Entries_, state *regionFeedState, +) error { + emit := func(assembled model.RegionFeedEvent) bool { + select { + case w.outputCh <- assembled: + return true + case <-ctx.Done(): + return false + } + } + return handleEventEntry(x, w.session.startTs, state, w.metrics, emit) +} + +func handleEventEntry( + x *cdcpb.Event_Entries_, + startTs uint64, + state *regionFeedState, + metrics *regionWorkerMetrics, + emit func(assembled model.RegionFeedEvent) bool, ) error { regionID, regionSpan, _ := state.getRegionMeta() for _, entry := range x.Entries.GetEntries() { - // if a region with kv range [a, z), and we only want the get [b, c) from this region, - // tikv will return all key events in the region, although specified [b, c) int the request. - // we can make tikv only return the events about the keys in the specified range. + // NOTE: from TiKV 7.0.0, entries are already filtered out in TiKV side. + // We can remove the check in future. comparableKey := spanz.ToComparableKey(entry.GetKey()) - // key for initialized event is nil if entry.Type != cdcpb.Event_INITIALIZED && !spanz.KeyInSpan(comparableKey, regionSpan) { - w.metrics.metricDroppedEventSize.Observe(float64(entry.Size())) + metrics.metricDroppedEventSize.Observe(float64(entry.Size())) continue } switch entry.Type { case cdcpb.Event_INITIALIZED: - w.metrics.metricPullEventInitializedCounter.Inc() - + metrics.metricPullEventInitializedCounter.Inc() state.setInitialized() - // state is just initialized, so we know this must be true - cachedEvents := state.matcher.matchCachedRow(true) - for _, cachedEvent := range cachedEvents { + for _, cachedEvent := range state.matcher.matchCachedRow(true) { revent, err := assembleRowEvent(regionID, cachedEvent) if err != nil { return errors.Trace(err) } - select { - case w.outputCh <- revent: - w.metrics.metricSendEventCommitCounter.Inc() - case <-ctx.Done(): - return errors.Trace(ctx.Err()) + if !emit(revent) { + return nil } + metrics.metricSendEventCommitCounter.Inc() } state.matcher.matchCachedRollbackRow(true) case cdcpb.Event_COMMITTED: - w.metrics.metricPullEventCommittedCounter.Inc() - revent, err := assembleRowEvent(regionID, entry) - if err != nil { - return errors.Trace(err) - } - resolvedTs := state.getLastResolvedTs() if entry.CommitTs <= resolvedTs { logPanic("The CommitTs must be greater than the resolvedTs", @@ -663,17 +675,21 @@ func (w *regionWorker) handleEventEntry( zap.Uint64("regionID", regionID)) return errUnreachable } - select { - case w.outputCh <- revent: - w.metrics.metricSendEventCommittedCounter.Inc() - case <-ctx.Done(): - return errors.Trace(ctx.Err()) + + metrics.metricPullEventCommittedCounter.Inc() + revent, err := assembleRowEvent(regionID, entry) + if err != nil { + return errors.Trace(err) + } + if !emit(revent) { + return nil } + metrics.metricSendEventCommittedCounter.Inc() case cdcpb.Event_PREWRITE: - w.metrics.metricPullEventPrewriteCounter.Inc() + metrics.metricPullEventPrewriteCounter.Inc() state.matcher.putPrewriteRow(entry) case cdcpb.Event_COMMIT: - w.metrics.metricPullEventCommitCounter.Inc() + metrics.metricPullEventCommitCounter.Inc() // NOTE: matchRow should always be called even if the event is stale. if !state.matcher.matchRow(entry, state.isInitialized()) { if !state.isInitialized() { @@ -687,7 +703,7 @@ func (w *regionWorker) handleEventEntry( } // TiKV can send events with StartTs/CommitTs less than startTs. - isStaleEvent := entry.CommitTs <= w.session.startTs + isStaleEvent := entry.CommitTs <= startTs if isStaleEvent { continue } @@ -707,15 +723,12 @@ func (w *regionWorker) handleEventEntry( if err != nil { return errors.Trace(err) } - select { - case w.outputCh <- revent: - w.metrics.metricSendEventCommitCounter.Inc() - case <-ctx.Done(): - return errors.Trace(ctx.Err()) + if !emit(revent) { + return nil } - w.metrics.metricSendEventCommitCounter.Inc() + metrics.metricSendEventCommitCounter.Inc() case cdcpb.Event_ROLLBACK: - w.metrics.metricPullEventRollbackCounter.Inc() + metrics.metricPullEventRollbackCounter.Inc() if !state.isInitialized() { state.matcher.cacheRollbackRow(entry) continue diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index 1bbc4f9a876..9438d518651 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/pingcap/tiflow/cdc/kv/regionlock" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/spanz" @@ -49,7 +50,11 @@ func TestRegionStateManagerThreadSafe(t *testing.T) { for i := 0; i < regionCount; i++ { regionID := uint64(1000 + i) regionIDs[i] = regionID - rsm.setState(regionID, ®ionFeedState{requestID: uint64(i + 1), lastResolvedTs: uint64(1000)}) + + state := ®ionFeedState{requestID: uint64(i + 1)} + state.sri.lockedRange = ®ionlock.LockedRange{} + state.updateResolvedTs(1000) + rsm.setState(regionID, state) } var wg sync.WaitGroup @@ -91,8 +96,8 @@ func TestRegionStateManagerThreadSafe(t *testing.T) { for _, regionID := range regionIDs { s, ok := rsm.getState(regionID) require.True(t, ok) - require.Greater(t, s.lastResolvedTs, uint64(1000)) - totalResolvedTs += s.lastResolvedTs + require.Greater(t, s.getLastResolvedTs(), uint64(1000)) + totalResolvedTs += s.getLastResolvedTs() } } @@ -151,7 +156,8 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { state := newRegionFeedState(newSingleRegionInfo( tikv.RegionVerID{}, spanz.ToSpan([]byte{}, spanz.UpperBoundKey), - 0, &tikv.RPCContext{}), 0) + &tikv.RPCContext{}), 0) + state.sri.lockedRange = ®ionlock.LockedRange{} state.start() worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "") require.Equal(t, 2, cap(worker.outputCh)) @@ -268,28 +274,30 @@ func TestRegionWorkerHandleResolvedTs(t *testing.T) { s1 := newRegionFeedState(singleRegionInfo{ verID: tikv.NewRegionVerID(1, 1, 1), }, 1) - s1.initialized.Store(true) - s1.lastResolvedTs = 9 + s1.sri.lockedRange = ®ionlock.LockedRange{} + s1.setInitialized() + s1.updateResolvedTs(9) s2 := newRegionFeedState(singleRegionInfo{ verID: tikv.NewRegionVerID(2, 2, 2), }, 2) - s2.initialized.Store(true) - s2.lastResolvedTs = 11 + s2.sri.lockedRange = ®ionlock.LockedRange{} + s2.setInitialized() + s2.updateResolvedTs(11) s3 := newRegionFeedState(singleRegionInfo{ verID: tikv.NewRegionVerID(3, 3, 3), }, 3) - s3.initialized.Store(false) - s3.lastResolvedTs = 8 + s3.sri.lockedRange = ®ionlock.LockedRange{} + s3.updateResolvedTs(8) err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 10, regions: []*regionFeedState{s1, s2, s3}, }) require.Nil(t, err) - require.Equal(t, uint64(10), s1.lastResolvedTs) - require.Equal(t, uint64(11), s2.lastResolvedTs) - require.Equal(t, uint64(8), s3.lastResolvedTs) + require.Equal(t, uint64(10), s1.getLastResolvedTs()) + require.Equal(t, uint64(11), s2.getLastResolvedTs()) + require.Equal(t, uint64(8), s3.getLastResolvedTs()) re := <-w.rtsUpdateCh require.Equal(t, uint64(10), re.resolvedTs) @@ -309,8 +317,10 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { s1 := newRegionFeedState(newSingleRegionInfo( tikv.RegionVerID{}, spanz.ToSpan([]byte{}, spanz.UpperBoundKey), - 9, &tikv.RPCContext{}), + &tikv.RPCContext{}), 0) + s1.sri.lockedRange = ®ionlock.LockedRange{} + s1.sri.lockedRange.CheckpointTs.Store(9) s1.start() w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "") @@ -319,7 +329,7 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { regions: []*regionFeedState{s1}, }) require.Nil(t, err) - require.Equal(t, uint64(9), s1.lastResolvedTs) + require.Equal(t, uint64(9), s1.getLastResolvedTs()) timer := time.NewTimer(time.Second) select { diff --git a/cdc/kv/regionlock/region_range_lock_test.go b/cdc/kv/regionlock/region_range_lock_test.go index 8b1a5690190..af887248164 100644 --- a/cdc/kv/regionlock/region_range_lock_test.go +++ b/cdc/kv/regionlock/region_range_lock_test.go @@ -90,7 +90,7 @@ func TestRegionRangeLock(t *testing.T) { t.Parallel() ctx := context.TODO() - l := NewRegionRangeLock([]byte("a"), []byte("h"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("h"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "a", "e", 1, 1, math.MaxUint64) unlockRange(l, "a", "e", 1, 1, 100) @@ -107,7 +107,7 @@ func TestRegionRangeLock(t *testing.T) { func TestRegionRangeLockStale(t *testing.T) { t.Parallel() - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "") ctx := context.TODO() mustLockRangeSuccess(ctx, t, l, "c", "g", 1, 10, math.MaxUint64) mustLockRangeSuccess(ctx, t, l, "j", "n", 2, 8, math.MaxUint64) @@ -130,7 +130,7 @@ func TestRegionRangeLockLockingRegionID(t *testing.T) { t.Parallel() ctx := context.TODO() - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "c", "d", 1, 10, math.MaxUint64) mustLockRangeStale(ctx, t, l, "e", "f", 1, 5, "e", "f") @@ -166,7 +166,7 @@ func TestRegionRangeLockCanBeCancelled(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "g", "h", 1, 10, math.MaxUint64) wait := mustLockRangeWait(ctx, t, l, "g", "h", 1, 12) cancel() From cc105f60c92db6805a7253db7ba5bd2ce499667f Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 16 Nov 2023 13:36:16 +0800 Subject: [PATCH 3/3] fix conflicts Signed-off-by: qupeng --- cdc/kv/region_state.go | 25 +------------------------ cdc/kv/region_worker.go | 4 ++-- 2 files changed, 3 insertions(+), 26 deletions(-) diff --git a/cdc/kv/region_state.go b/cdc/kv/region_state.go index 65518d24ca6..520c6366e8a 100644 --- a/cdc/kv/region_state.go +++ b/cdc/kv/region_state.go @@ -68,9 +68,6 @@ type regionFeedState struct { state struct { sync.RWMutex v uint32 - // All region errors should be handled in region workers. - // `err` is used to retrieve errors generated outside. - err error } } @@ -86,40 +83,20 @@ func (s *regionFeedState) start() { } // mark regionFeedState as stopped with the given error if possible. -func (s *regionFeedState) markStopped(err error) { +func (s *regionFeedState) markStopped() { s.state.Lock() defer s.state.Unlock() if s.state.v == stateNormal { s.state.v = stateStopped - s.state.err = err } } -// mark regionFeedState as removed if possible. -func (s *regionFeedState) markRemoved() (changed bool) { - s.state.Lock() - defer s.state.Unlock() - if s.state.v == stateStopped { - s.state.v = stateRemoved - changed = true - } - return -} - func (s *regionFeedState) isStale() bool { s.state.RLock() defer s.state.RUnlock() return s.state.v == stateStopped || s.state.v == stateRemoved } -func (s *regionFeedState) takeError() (err error) { - s.state.Lock() - defer s.state.Unlock() - err = s.state.err - s.state.err = nil - return -} - func (s *regionFeedState) isInitialized() bool { return s.sri.lockedRange.Initialzied.Load() } diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index bdb2d788d9b..5a3771d65cb 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -214,7 +214,7 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState return w.checkShouldExit() } // We need to ensure when the error is handled, `isStale` must be set. So set it before sending the error. - state.markStopped(nil) + state.markStopped() w.delRegionState(regionID) failpoint.Inject("kvClientSingleFeedProcessDelay", nil) @@ -812,7 +812,7 @@ func (w *regionWorker) evictAllRegions() { if regionState.isStale() { return true } - regionState.markStopped(nil) + regionState.markStopped() deletes = append(deletes, struct { regionID uint64 regionState *regionFeedState