diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 21549e87798..5b78e727006 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -19,6 +19,7 @@ import ( "io" "math/rand" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -40,6 +41,7 @@ import ( "github.com/pingcap/tiflow/pkg/version" "github.com/prometheus/client_golang/prometheus" tidbkv "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -69,6 +71,8 @@ const ( // failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we // don't need to force reload region anymore. regionScheduleReload = false + + resolveLockMinInterval = 10 * time.Second ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -248,7 +252,8 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) return nil } if c.config.Debug.EnableKVConnectBackOff { - newStreamErr = retry.Do(ctx, streamFunc, retry.WithBackoffBaseDelay(500), + newStreamErr = retry.Do(ctx, streamFunc, + retry.WithBackoffBaseDelay(100), retry.WithMaxTries(2), retry.WithIsRetryableErr(cerror.IsRetryableError), ) @@ -268,7 +273,7 @@ func (c *CDCClient) EventFeed( eventCh chan<- model.RegionFeedEvent, ) error { s := newEventFeedSession(c, span, lockResolver, ts, eventCh) - return s.eventFeed(ctx, ts) + return s.eventFeed(ctx) } // RegionCount returns the number of captured regions. @@ -362,7 +367,6 @@ type eventFeedSession struct { type rangeRequestTask struct { span regionspan.ComparableSpan - ts uint64 } func newEventFeedSession( @@ -372,9 +376,10 @@ func newEventFeedSession( startTs uint64, eventCh chan<- model.RegionFeedEvent, ) *eventFeedSession { - id := strconv.FormatUint(allocID(), 10) + id := allocID() + idStr := strconv.FormatUint(id, 10) rangeLock := regionspan.NewRegionRangeLock( - totalSpan.Start, totalSpan.End, startTs, + id, totalSpan.Start, totalSpan.End, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) return &eventFeedSession{ client: client, @@ -387,7 +392,7 @@ func newEventFeedSession( eventCh: eventCh, rangeLock: rangeLock, lockResolver: lockResolver, - id: id, + id: idStr, regionChSizeGauge: clientChannelSize.WithLabelValues("region"), errChSizeGauge: clientChannelSize.WithLabelValues("err"), rangeChSizeGauge: clientChannelSize.WithLabelValues("range"), @@ -403,7 +408,7 @@ func newEventFeedSession( } } -func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { +func (s *eventFeedSession) eventFeed(ctx context.Context) error { s.requestRangeCh = chann.NewDrainableChann[rangeRequestTask]() s.regionCh = chann.NewDrainableChann[singleRegionInfo]() s.regionRouter = chann.NewDrainableChann[singleRegionInfo]() @@ -420,13 +425,11 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - return s.dispatchRequest(ctx) - }) + g.Go(func() error { return s.dispatchRequest(ctx) }) - g.Go(func() error { - return s.requestRegionToStore(ctx, g) - }) + g.Go(func() error { return s.requestRegionToStore(ctx, g) }) + + g.Go(func() error { return s.logSlowRegions(ctx) }) g.Go(func() error { for { @@ -444,7 +447,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { // Besides the count or frequency of range request is limited, // we use ephemeral goroutine instead of permanent goroutine. g.Go(func() error { - return s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts) + return s.divideAndSendEventFeedToRegions(ctx, task.span) }) } } @@ -465,7 +468,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { } }) - s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan, ts: ts} + s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan} s.rangeChSizeGauge.Inc() log.Info("event feed started", @@ -473,16 +476,18 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), - zap.Uint64("startTs", ts), + zap.Uint64("startTs", s.startTs), zap.Stringer("span", s.totalSpan)) return g.Wait() } -// scheduleDivideRegionAndRequest schedules a range to be divided by regions, and these regions will be then scheduled -// to send ChangeData requests. -func (s *eventFeedSession) scheduleDivideRegionAndRequest(ctx context.Context, span regionspan.ComparableSpan, ts uint64) { - task := rangeRequestTask{span: span, ts: ts} +// scheduleDivideRegionAndRequest schedules a range to be divided by regions, +// and these regions will be then scheduled to send ChangeData requests. +func (s *eventFeedSession) scheduleDivideRegionAndRequest( + ctx context.Context, span regionspan.ComparableSpan, +) { + task := rangeRequestTask{span: span} select { case s.requestRangeCh.In() <- task: s.rangeChSizeGauge.Inc() @@ -496,7 +501,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single handleResult := func(res regionspan.LockRangeResult) { switch res.Status { case regionspan.LockRangeStatusSuccess: - sri.resolvedTs = res.CheckpointTs + sri.lockedRange = res.LockedRange select { case s.regionCh.In() <- sri: s.regionChSizeGauge.Inc() @@ -508,12 +513,11 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single zap.String("changefeed", s.changefeed.ID), zap.Uint64("regionID", sri.verID.GetID()), zap.Stringer("span", sri.span), - zap.Uint64("resolvedTs", sri.resolvedTs), zap.Any("retrySpans", res.RetryRanges)) for _, r := range res.RetryRanges { // This call is always blocking, otherwise if scheduling in a new // goroutine, it won't block the caller of `schedulerRegionRequest`. - s.scheduleDivideRegionAndRequest(ctx, r, sri.resolvedTs) + s.scheduleDivideRegionAndRequest(ctx, r) } case regionspan.LockRangeStatusCancel: return @@ -527,7 +531,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single // short sleep to wait region has split time.Sleep(time.Second) s.rangeLock.UnlockRange(sri.span.Start, sri.span.End, - sri.verID.GetID(), sri.verID.GetVer(), sri.resolvedTs) + sri.verID.GetID(), sri.verID.GetVer()) regionNum := val.(int) retryRanges := make([]regionspan.ComparableSpan, 0, regionNum) start := []byte("a") @@ -556,7 +560,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single // CAUTION: Note that this should only be called in a context that the region has locked its range. func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo) { s.rangeLock.UnlockRange(errorInfo.span.Start, errorInfo.span.End, - errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs) + errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs()) log.Info("region failed", zap.Stringer("span", &errorInfo.span), zap.Any("regionId", errorInfo.verID.GetID()), zap.Error(errorInfo.err)) @@ -606,7 +610,7 @@ func (s *eventFeedSession) requestRegionToStore( RegionId: regionID, RequestId: requestID, RegionEpoch: regionEpoch, - CheckpointTs: sri.resolvedTs, + CheckpointTs: sri.resolvedTs(), StartKey: sri.span.Start, EndKey: sri.span.End, ExtraOp: extraOp, @@ -684,13 +688,13 @@ func (s *eventFeedSession) requestRegionToStore( state := newRegionFeedState(sri, requestID) pendingRegions.setByRequestID(requestID, state) - log.Debug("start new request", + log.Info("start new request", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), - zap.String("addr", storeAddr), - zap.Any("request", req)) + zap.Uint64("regionID", sri.verID.GetID()), + zap.String("addr", storeAddr)) err = stream.client.Send(req) @@ -777,7 +781,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { Region: sri.verID.GetID(), }, }, - ResolvedTs: sri.resolvedTs, + ResolvedTs: sri.resolvedTs(), }, } select { @@ -799,7 +803,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { zap.String("tableName", s.tableName), zap.Uint64("regionID", sri.verID.GetID()), zap.Stringer("span", sri.span), - zap.Uint64("resolvedTs", sri.resolvedTs)) + zap.Uint64("resolvedTs", sri.resolvedTs())) errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID}) s.onRegionFail(ctx, errInfo) continue @@ -813,7 +817,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { // to region boundaries. When region merging happens, it's possible that it // will produce some overlapping spans. func (s *eventFeedSession) divideAndSendEventFeedToRegions( - ctx context.Context, span regionspan.ComparableSpan, ts uint64, + ctx context.Context, span regionspan.ComparableSpan, ) error { limit := 20 nextSpan := span @@ -826,7 +830,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( retryErr := retry.Do(ctx, func() error { bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) start := time.Now() - regions, err = s.client.regionCache.BatchLoadRegionsWithKeyRange(bo, nextSpan.Start, nextSpan.End, limit) + regions, err = s.client.regionCache.BatchLoadRegionsWithKeyRange( + bo, nextSpan.Start, nextSpan.End, limit) scanRegionsDuration.Observe(time.Since(start).Seconds()) if err != nil { return cerror.WrapError(cerror.ErrPDBatchLoadRegions, err) @@ -855,7 +860,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( for _, tiRegion := range regions { region := tiRegion.GetMeta() - partialSpan, err := regionspan.Intersect(s.totalSpan, regionspan.ComparableSpan{Start: region.StartKey, End: region.EndKey}) + partialSpan, err := regionspan.Intersect( + s.totalSpan, regionspan.ComparableSpan{Start: region.StartKey, End: region.EndKey}) if err != nil { return errors.Trace(err) } @@ -863,7 +869,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( // the End key return by the PD API will be nil to represent the biggest key, partialSpan = partialSpan.Hack() - sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, ts, nil) + sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, nil) s.scheduleRegionRequest(ctx, sri) // return if no more regions if regionspan.EndCompare(nextSpan.Start, span.End) >= 0 { @@ -882,17 +888,24 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI switch eerr := errors.Cause(err).(type) { case *eventError: innerErr := eerr.err + log.Info("cdc region error", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Stringer("error", innerErr)) + if notLeader := innerErr.GetNotLeader(); notLeader != nil { metricFeedNotLeaderCounter.Inc() s.client.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx) } else if innerErr.GetEpochNotMatch() != nil { // TODO: If only confver is updated, we don't need to reload the region from region cache. metricFeedEpochNotMatchCounter.Inc() - s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs) + s.scheduleDivideRegionAndRequest(ctx, errInfo.span) return nil } else if innerErr.GetRegionNotFound() != nil { metricFeedRegionNotFoundCounter.Inc() - s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs) + s.scheduleDivideRegionAndRequest(ctx, errInfo.span) return nil } else if duplicatedRequest := innerErr.GetDuplicateRequest(); duplicatedRequest != nil { metricFeedDuplicateRequestCounter.Inc() @@ -922,7 +935,7 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI } case *rpcCtxUnavailableErr: metricFeedRPCCtxUnavailable.Inc() - s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs) + s.scheduleDivideRegionAndRequest(ctx, errInfo.span) return nil case *connectToStoreErr: metricConnectToStoreErr.Inc() @@ -1199,7 +1212,7 @@ func (s *eventFeedSession) sendRegionChangeEvents( } state.start() worker.setRegionState(event.RegionId, state) - } else if state.isStopped() { + } else if state.isStale() { log.Warn("drop event due to region feed stopped", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), @@ -1209,6 +1222,17 @@ func (s *eventFeedSession) sendRegionChangeEvents( continue } + switch x := event.Event.(type) { + case *cdcpb.Event_Error: + log.Info("event feed receives a region error", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("regionID", event.RegionId), + zap.Any("error", x.Error)) + } + slot := worker.inputCalcSlot(event.RegionId) statefulEvents[slot] = append(statefulEvents[slot], ®ionStatefulEvent{ changeEvent: event, @@ -1301,6 +1325,51 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can return } +func (s *eventFeedSession) logSlowRegions(ctx context.Context) error { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + + currTime := s.client.pdClock.CurrentTime() + attr := s.rangeLock.CollectLockedRangeAttrs(nil) + if attr.SlowestRegion.Initialized { + ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs) + if currTime.Sub(ckptTime) > 2*resolveLockMinInterval { + log.Info("event feed finds a slow region", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Any("slowRegion", attr.SlowestRegion)) + } + } else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute { + log.Info("event feed initializes a region too slow", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Any("slowRegion", attr.SlowestRegion)) + } + if len(attr.Holes) > 0 { + holes := make([]string, 0, len(attr.Holes)) + for _, hole := range attr.Holes { + holes = append(holes, fmt.Sprintf("[%s,%s)", hole.Start, hole.End)) + } + log.Info("event feed holes exist", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.String("holes", strings.Join(holes, ", "))) + } + } +} + func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row) (model.RegionFeedEvent, error) { var opType model.OpType switch entry.GetOpType() { diff --git a/cdc/kv/region_state.go b/cdc/kv/region_state.go index be4eda058ac..cba72e1ec7f 100644 --- a/cdc/kv/region_state.go +++ b/cdc/kv/region_state.go @@ -16,8 +16,6 @@ package kv import ( "runtime" "sync" - "sync/atomic" - "time" "github.com/pingcap/tiflow/pkg/regionspan" "github.com/tikv/client-go/v2/tikv" @@ -26,68 +24,84 @@ import ( const ( minRegionStateBucket = 4 maxRegionStateBucket = 16 + + stateNormal uint32 = 0 + stateStopped uint32 = 1 + stateRemoved uint32 = 2 ) type singleRegionInfo struct { - verID tikv.RegionVerID - span regionspan.ComparableSpan - resolvedTs uint64 - rpcCtx *tikv.RPCContext + verID tikv.RegionVerID + span regionspan.ComparableSpan + rpcCtx *tikv.RPCContext + + lockedRange *regionspan.LockedRange } func newSingleRegionInfo( verID tikv.RegionVerID, span regionspan.ComparableSpan, - ts uint64, rpcCtx *tikv.RPCContext, ) singleRegionInfo { return singleRegionInfo{ - verID: verID, - span: span, - resolvedTs: ts, - rpcCtx: rpcCtx, + verID: verID, + span: span, + rpcCtx: rpcCtx, } } +func (s singleRegionInfo) resolvedTs() uint64 { + return s.lockedRange.CheckpointTs.Load() +} + type regionFeedState struct { sri singleRegionInfo requestID uint64 - stopped int32 - - initialized atomic.Bool - matcher *matcher - startFeedTime time.Time - lastResolvedTs uint64 + matcher *matcher + + // Transform: normal -> stopped -> removed. + // normal: the region is in replicating. + // stopped: some error happens. + // removed: the region is returned into the pending list, + // will be re-resolved and re-scheduled later. + state struct { + sync.RWMutex + v uint32 + } } func newRegionFeedState(sri singleRegionInfo, requestID uint64) *regionFeedState { return ®ionFeedState{ sri: sri, requestID: requestID, - stopped: 0, } } func (s *regionFeedState) start() { - s.startFeedTime = time.Now() - s.lastResolvedTs = s.sri.resolvedTs s.matcher = newMatcher() } +// mark regionFeedState as stopped. func (s *regionFeedState) markStopped() { - atomic.StoreInt32(&s.stopped, 1) + s.state.Lock() + defer s.state.Unlock() + if s.state.v == stateNormal { + s.state.v = stateStopped + } } -func (s *regionFeedState) isStopped() bool { - return atomic.LoadInt32(&s.stopped) > 0 +func (s *regionFeedState) isStale() bool { + s.state.RLock() + defer s.state.RUnlock() + return s.state.v == stateStopped || s.state.v == stateRemoved } func (s *regionFeedState) isInitialized() bool { - return s.initialized.Load() + return s.sri.lockedRange.Initialzied.Load() } func (s *regionFeedState) setInitialized() { - s.initialized.Store(true) + s.sri.lockedRange.Initialzied.Store(true) } func (s *regionFeedState) getRegionID() uint64 { @@ -95,31 +109,29 @@ func (s *regionFeedState) getRegionID() uint64 { } func (s *regionFeedState) getLastResolvedTs() uint64 { - return atomic.LoadUint64(&s.lastResolvedTs) + return s.sri.lockedRange.CheckpointTs.Load() } // updateResolvedTs update the resolved ts of the current region feed func (s *regionFeedState) updateResolvedTs(resolvedTs uint64) { - if resolvedTs > s.getLastResolvedTs() { - atomic.StoreUint64(&s.lastResolvedTs, resolvedTs) - } -} - -// setRegionInfoResolvedTs is only called when the region disconnect, -// to update the `singleRegionInfo` which is reused by reconnect. -func (s *regionFeedState) setRegionInfoResolvedTs() { - if s.getLastResolvedTs() <= s.sri.resolvedTs { - return + state := s.sri.lockedRange + for { + last := state.CheckpointTs.Load() + if last > resolvedTs { + return + } + if state.CheckpointTs.CompareAndSwap(last, resolvedTs) { + break + } } - s.sri.resolvedTs = s.lastResolvedTs } func (s *regionFeedState) getRegionInfo() singleRegionInfo { return s.sri } -func (s *regionFeedState) getRegionMeta() (uint64, regionspan.ComparableSpan, time.Time, string) { - return s.sri.verID.GetID(), s.sri.span, s.startFeedTime, s.sri.rpcCtx.Addr +func (s *regionFeedState) getRegionMeta() (uint64, regionspan.ComparableSpan, string) { + return s.sri.verID.GetID(), s.sri.span, s.sri.rpcCtx.Addr } type syncRegionFeedStateMap struct { diff --git a/cdc/kv/region_state_bench_test.go b/cdc/kv/region_state_bench_test.go index 2282b304769..c1c0eb5e95a 100644 --- a/cdc/kv/region_state_bench_test.go +++ b/cdc/kv/region_state_bench_test.go @@ -40,9 +40,9 @@ func TestSyncRegionFeedStateMapConcurrentAccess(t *testing.T) { return default: } - m.setByRequestID(1, ®ionFeedState{}) - m.setByRequestID(2, ®ionFeedState{}) - m.setByRequestID(3, ®ionFeedState{}) + m.setByRequestID(1, ®ionFeedState{sri: singleRegionInfo{lockedRange: ®ionspan.LockedRange{}}}) + m.setByRequestID(2, ®ionFeedState{sri: singleRegionInfo{lockedRange: ®ionspan.LockedRange{}}}) + m.setByRequestID(3, ®ionFeedState{sri: singleRegionInfo{lockedRange: ®ionspan.LockedRange{}}}) } }() wg.Add(1) @@ -55,7 +55,7 @@ func TestSyncRegionFeedStateMapConcurrentAccess(t *testing.T) { default: } m.iter(func(requestID uint64, state *regionFeedState) bool { - _ = state.initialized.Load() + state.isInitialized() return true }) } @@ -119,7 +119,8 @@ func benchmarkGetRegionState(b *testing.B, bench func(b *testing.B, sm regionSta state := newRegionFeedState(newSingleRegionInfo( tikv.RegionVerID{}, regionspan.ToComparableSpan(span), - 0, &tikv.RPCContext{}), 0) + &tikv.RPCContext{}), 0) + state.sri.lockedRange = ®ionspan.LockedRange{} regionCount := []int{100, 1000, 10000, 20000, 40000, 80000, 160000, 320000} for _, count := range regionCount { diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index c8cdc632530..8dd344676bd 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -61,19 +61,18 @@ const ( ) type regionWorkerMetrics struct { - // kv events related metrics - metricReceivedEventSize prometheus.Observer - metricDroppedEventSize prometheus.Observer + metricReceivedEventSize prometheus.Observer + metricDroppedEventSize prometheus.Observer + metricPullEventInitializedCounter prometheus.Counter + metricPullEventCommittedCounter prometheus.Counter metricPullEventPrewriteCounter prometheus.Counter metricPullEventCommitCounter prometheus.Counter - metricPullEventCommittedCounter prometheus.Counter metricPullEventRollbackCounter prometheus.Counter - metricSendEventResolvedCounter prometheus.Counter - metricSendEventCommitCounter prometheus.Counter - metricSendEventCommittedCounter prometheus.Counter - // TODO: add region runtime related metrics + metricSendEventResolvedCounter prometheus.Counter + metricSendEventCommitCounter prometheus.Counter + metricSendEventCommittedCounter prometheus.Counter } /* @@ -114,22 +113,22 @@ type regionWorker struct { inputPending int32 } -func newRegionWorker( - ctx context.Context, changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, -) *regionWorker { +func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics { metrics := ®ionWorkerMetrics{} metrics.metricReceivedEventSize = eventSize.WithLabelValues("received") metrics.metricDroppedEventSize = eventSize.WithLabelValues("dropped") + metrics.metricPullEventInitializedCounter = pullEventCounter. WithLabelValues(cdcpb.Event_INITIALIZED.String(), changefeedID.Namespace, changefeedID.ID) metrics.metricPullEventCommittedCounter = pullEventCounter. WithLabelValues(cdcpb.Event_COMMITTED.String(), changefeedID.Namespace, changefeedID.ID) - metrics.metricPullEventCommitCounter = pullEventCounter. - WithLabelValues(cdcpb.Event_COMMIT.String(), changefeedID.Namespace, changefeedID.ID) metrics.metricPullEventPrewriteCounter = pullEventCounter. WithLabelValues(cdcpb.Event_PREWRITE.String(), changefeedID.Namespace, changefeedID.ID) + metrics.metricPullEventCommitCounter = pullEventCounter. + WithLabelValues(cdcpb.Event_COMMIT.String(), changefeedID.Namespace, changefeedID.ID) metrics.metricPullEventRollbackCounter = pullEventCounter. WithLabelValues(cdcpb.Event_ROLLBACK.String(), changefeedID.Namespace, changefeedID.ID) + metrics.metricSendEventResolvedCounter = sendEventCounter. WithLabelValues("native-resolved", changefeedID.Namespace, changefeedID.ID) metrics.metricSendEventCommitCounter = sendEventCounter. @@ -137,6 +136,12 @@ func newRegionWorker( metrics.metricSendEventCommittedCounter = sendEventCounter. WithLabelValues("committed", changefeedID.Namespace, changefeedID.ID) + return metrics +} + +func newRegionWorker( + ctx context.Context, changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, +) *regionWorker { return ®ionWorker{ parentCtx: ctx, session: s, @@ -148,7 +153,7 @@ func newRegionWorker( rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), storeAddr: addr, concurrency: s.client.config.KVClient.WorkerConcurrent, - metrics: metrics, + metrics: newRegionWorkerMetrics(changefeedID), inputPending: 0, } } @@ -193,21 +198,22 @@ func (w *regionWorker) checkShouldExit() error { } func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState) error { - state.setRegionInfoResolvedTs() regionID := state.getRegionID() + isStale := state.isStale() log.Info("single region event feed disconnected", zap.String("namespace", w.session.client.changefeed.Namespace), zap.String("changefeed", w.session.client.changefeed.ID), zap.Uint64("regionID", regionID), zap.Uint64("requestID", state.requestID), zap.Stringer("span", state.sri.span), - zap.Uint64("resolvedTs", state.sri.resolvedTs), + zap.Uint64("resolvedTs", state.sri.resolvedTs()), + zap.Bool("isStale", isStale), zap.Error(err)) // if state is already marked stopped, it must have been or would be processed by `onRegionFail` - if state.isStopped() { + if isStale { return w.checkShouldExit() } - // We need to ensure when the error is handled, `isStopped` must be set. So set it before sending the error. + // We need to ensure when the error is handled, `isStale` must be set. So set it before sending the error. state.markStopped() w.delRegionState(regionID) failpoint.Inject("kvClientSingleFeedProcessDelay", nil) @@ -288,7 +294,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { maxVersion := oracle.ComposeTS(oracle.GetPhysical(currentTimeFromPD.Add(-10*time.Second)), 0) for _, rts := range expired { state, ok := w.getRegionState(rts.regionID) - if !ok || state.isStopped() { + if !ok || state.isStale() { // state is already deleted or stopped, just continue, // and don't need to push resolved ts back to heap. continue @@ -349,7 +355,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { func (w *regionWorker) processEvent(ctx context.Context, event *regionStatefulEvent) error { // event.state is nil when resolvedTsEvent is not nil - skipEvent := event.state != nil && event.state.isStopped() + skipEvent := event.state != nil && event.state.isStale() if skipEvent { return nil } @@ -609,52 +615,51 @@ func (w *regionWorker) handleEventEntry( x *cdcpb.Event_Entries_, state *regionFeedState, ) error { - regionID, regionSpan, startTime, _ := state.getRegionMeta() + emit := func(assembled model.RegionFeedEvent) bool { + select { + case w.outputCh <- assembled: + return true + case <-ctx.Done(): + return false + } + } + return handleEventEntry(x, w.session.startTs, state, w.metrics, emit) +} + +func handleEventEntry( + x *cdcpb.Event_Entries_, + startTs uint64, + state *regionFeedState, + metrics *regionWorkerMetrics, + emit func(assembled model.RegionFeedEvent) bool, +) error { + regionID, regionSpan, _ := state.getRegionMeta() for _, entry := range x.Entries.GetEntries() { // if a region with kv range [a, z), and we only want the get [b, c) from this region, // tikv will return all key events in the region, although specified [b, c) int the request. // we can make tikv only return the events about the keys in the specified range. comparableKey := regionspan.ToComparableKey(entry.GetKey()) - // key for initialized event is nil if entry.Type != cdcpb.Event_INITIALIZED && !regionspan.KeyInSpan(comparableKey, regionSpan) { - w.metrics.metricDroppedEventSize.Observe(float64(entry.Size())) + metrics.metricDroppedEventSize.Observe(float64(entry.Size())) continue } switch entry.Type { case cdcpb.Event_INITIALIZED: - if time.Since(startTime) > 20*time.Second { - log.Warn("The time cost of initializing is too much", - zap.String("namespace", w.session.client.changefeed.Namespace), - zap.String("changefeed", w.session.client.changefeed.ID), - zap.Duration("duration", time.Since(startTime)), - zap.Uint64("regionID", regionID)) - } - w.metrics.metricPullEventInitializedCounter.Inc() - + metrics.metricPullEventInitializedCounter.Inc() state.setInitialized() - // state is just initialized, so we know this must be true - cachedEvents := state.matcher.matchCachedRow(true) - for _, cachedEvent := range cachedEvents { + for _, cachedEvent := range state.matcher.matchCachedRow(true) { revent, err := assembleRowEvent(regionID, cachedEvent) if err != nil { return errors.Trace(err) } - select { - case w.outputCh <- revent: - w.metrics.metricSendEventCommitCounter.Inc() - case <-ctx.Done(): - return errors.Trace(ctx.Err()) + if !emit(revent) { + return nil } + metrics.metricSendEventCommitCounter.Inc() } state.matcher.matchCachedRollbackRow(true) case cdcpb.Event_COMMITTED: - w.metrics.metricPullEventCommittedCounter.Inc() - revent, err := assembleRowEvent(regionID, entry) - if err != nil { - return errors.Trace(err) - } - resolvedTs := state.getLastResolvedTs() if entry.CommitTs <= resolvedTs { logPanic("The CommitTs must be greater than the resolvedTs", @@ -664,17 +669,21 @@ func (w *regionWorker) handleEventEntry( zap.Uint64("regionID", regionID)) return errUnreachable } - select { - case w.outputCh <- revent: - w.metrics.metricSendEventCommittedCounter.Inc() - case <-ctx.Done(): - return errors.Trace(ctx.Err()) + + metrics.metricPullEventCommittedCounter.Inc() + revent, err := assembleRowEvent(regionID, entry) + if err != nil { + return errors.Trace(err) + } + if !emit(revent) { + return nil } + metrics.metricSendEventCommittedCounter.Inc() case cdcpb.Event_PREWRITE: - w.metrics.metricPullEventPrewriteCounter.Inc() + metrics.metricPullEventPrewriteCounter.Inc() state.matcher.putPrewriteRow(entry) case cdcpb.Event_COMMIT: - w.metrics.metricPullEventCommitCounter.Inc() + metrics.metricPullEventCommitCounter.Inc() // NOTE: matchRow should always be called even if the event is stale. if !state.matcher.matchRow(entry, state.isInitialized()) { if !state.isInitialized() { @@ -688,7 +697,7 @@ func (w *regionWorker) handleEventEntry( } // TiKV can send events with StartTs/CommitTs less than startTs. - isStaleEvent := entry.CommitTs <= w.session.startTs + isStaleEvent := entry.CommitTs <= startTs if isStaleEvent { continue } @@ -708,15 +717,12 @@ func (w *regionWorker) handleEventEntry( if err != nil { return errors.Trace(err) } - select { - case w.outputCh <- revent: - w.metrics.metricSendEventCommitCounter.Inc() - case <-ctx.Done(): - return errors.Trace(ctx.Err()) + if !emit(revent) { + return nil } - w.metrics.metricSendEventCommitCounter.Inc() + metrics.metricSendEventCommitCounter.Inc() case cdcpb.Event_ROLLBACK: - w.metrics.metricPullEventRollbackCounter.Inc() + metrics.metricPullEventRollbackCounter.Inc() if !state.isInitialized() { state.matcher.cacheRollbackRow(entry) continue @@ -736,7 +742,7 @@ func (w *regionWorker) handleResolvedTs( regions := make([]uint64, 0, len(revents.regions)) for _, state := range revents.regions { - if state.isStopped() || !state.isInitialized() { + if state.isStale() || !state.isInitialized() { continue } regionID := state.getRegionID() @@ -771,7 +777,7 @@ func (w *regionWorker) handleResolvedTs( default: } for _, state := range revents.regions { - if state.isStopped() || !state.isInitialized() { + if state.isStale() || !state.isInitialized() { continue } state.updateResolvedTs(resolvedTs) @@ -797,7 +803,7 @@ func (w *regionWorker) evictAllRegions() { for _, states := range w.statesManager.states { deletes = deletes[:0] states.iter(func(regionID uint64, regionState *regionFeedState) bool { - if regionState.isStopped() { + if regionState.isStale() { return true } regionState.markStopped() @@ -811,7 +817,6 @@ func (w *regionWorker) evictAllRegions() { }) for _, del := range deletes { w.delRegionState(del.regionID) - del.regionState.setRegionInfoResolvedTs() // since the context used in region worker will be cancelled after // region worker exits, we must use the parent context to prevent // regionErrorInfo loss. diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index 29aea132fef..b88875aea6c 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -49,7 +49,11 @@ func TestRegionStateManagerThreadSafe(t *testing.T) { for i := 0; i < regionCount; i++ { regionID := uint64(1000 + i) regionIDs[i] = regionID - rsm.setState(regionID, ®ionFeedState{requestID: uint64(i + 1), lastResolvedTs: uint64(1000)}) + + state := ®ionFeedState{requestID: uint64(i + 1)} + state.sri.lockedRange = ®ionspan.LockedRange{} + state.updateResolvedTs(1000) + rsm.setState(regionID, state) } var wg sync.WaitGroup @@ -91,8 +95,8 @@ func TestRegionStateManagerThreadSafe(t *testing.T) { for _, regionID := range regionIDs { s, ok := rsm.getState(regionID) require.True(t, ok) - require.Greater(t, s.lastResolvedTs, uint64(1000)) - totalResolvedTs += s.lastResolvedTs + require.Greater(t, s.getLastResolvedTs(), uint64(1000)) + totalResolvedTs += s.getLastResolvedTs() } } @@ -152,7 +156,8 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { state := newRegionFeedState(newSingleRegionInfo( tikv.RegionVerID{}, regionspan.ToComparableSpan(span), - 0, &tikv.RPCContext{}), 0) + &tikv.RPCContext{}), 0) + state.sri.lockedRange = ®ionspan.LockedRange{} state.start() worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "") require.Equal(t, 2, cap(worker.outputCh)) @@ -269,28 +274,30 @@ func TestRegionWorkerHandleResolvedTs(t *testing.T) { s1 := newRegionFeedState(singleRegionInfo{ verID: tikv.NewRegionVerID(1, 1, 1), }, 1) - s1.initialized.Store(true) - s1.lastResolvedTs = 9 + s1.sri.lockedRange = ®ionspan.LockedRange{} + s1.setInitialized() + s1.updateResolvedTs(9) s2 := newRegionFeedState(singleRegionInfo{ verID: tikv.NewRegionVerID(2, 2, 2), }, 2) - s2.initialized.Store(true) - s2.lastResolvedTs = 11 + s2.sri.lockedRange = ®ionspan.LockedRange{} + s2.setInitialized() + s2.updateResolvedTs(11) s3 := newRegionFeedState(singleRegionInfo{ verID: tikv.NewRegionVerID(3, 3, 3), }, 3) - s3.initialized.Store(false) - s3.lastResolvedTs = 8 + s3.sri.lockedRange = ®ionspan.LockedRange{} + s3.updateResolvedTs(8) err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 10, regions: []*regionFeedState{s1, s2, s3}, }) require.Nil(t, err) - require.Equal(t, uint64(10), s1.lastResolvedTs) - require.Equal(t, uint64(11), s2.lastResolvedTs) - require.Equal(t, uint64(8), s3.lastResolvedTs) + require.Equal(t, uint64(10), s1.getLastResolvedTs()) + require.Equal(t, uint64(11), s2.getLastResolvedTs()) + require.Equal(t, uint64(8), s3.getLastResolvedTs()) re := <-w.rtsUpdateCh require.Equal(t, uint64(10), re.resolvedTs) @@ -307,11 +314,14 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { ctx := context.Background() s := createFakeEventFeedSession() s.eventCh = make(chan model.RegionFeedEvent, 2) - s1 := newRegionFeedState(singleRegionInfo{ - verID: tikv.RegionVerID{}, - resolvedTs: 9, - rpcCtx: &tikv.RPCContext{}, - }, 0) + span := regionspan.Span{Start: []byte{}, End: regionspan.UpperBoundKey} + s1 := newRegionFeedState(newSingleRegionInfo( + tikv.RegionVerID{}, + regionspan.ToComparableSpan(span), + &tikv.RPCContext{}), + 0) + s1.sri.lockedRange = ®ionspan.LockedRange{} + s1.sri.lockedRange.CheckpointTs.Store(9) s1.start() w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "") @@ -320,7 +330,7 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { regions: []*regionFeedState{s1}, }) require.Nil(t, err) - require.Equal(t, uint64(9), s1.lastResolvedTs) + require.Equal(t, uint64(9), s1.getLastResolvedTs()) timer := time.NewTimer(time.Second) select { diff --git a/pkg/regionspan/region_range_lock.go b/pkg/regionspan/region_range_lock.go index 4637286fdd7..a936de02c72 100644 --- a/pkg/regionspan/region_range_lock.go +++ b/pkg/regionspan/region_range_lock.go @@ -21,6 +21,7 @@ import ( "math" "sync" "sync/atomic" + "time" "github.com/google/btree" "github.com/pingcap/log" @@ -44,23 +45,23 @@ func rangeTsEntryLess(a, b *rangeTsEntry) bool { return bytes.Compare(a.startKey, b.startKey) < 0 } -// RangeTsMap represents a map from key range to a timestamp. It supports range set and calculating min value among a -// a specified range. -type RangeTsMap struct { +// rangeTsMap represents a map from key range to a timestamp. It supports +// range set and calculating min value among a specified range. +type rangeTsMap struct { m *btree.BTreeG[*rangeTsEntry] } -// NewRangeTsMap creates a RangeTsMap. -func NewRangeTsMap(startKey, endKey []byte, startTs uint64) *RangeTsMap { - m := &RangeTsMap{ - m: btree.NewG[*rangeTsEntry](16, rangeTsEntryLess), +// newRangeTsMap creates a RangeTsMap. +func newRangeTsMap(startKey, endKey []byte, startTs uint64) *rangeTsMap { + m := &rangeTsMap{ + m: btree.NewG(16, rangeTsEntryLess), } m.Set(startKey, endKey, startTs) return m } // Set sets the corresponding ts of the given range to the specified value. -func (m *RangeTsMap) Set(startKey, endKey []byte, ts uint64) { +func (m *rangeTsMap) Set(startKey, endKey []byte, ts uint64) { if _, ok := m.m.Get(rangeTsEntryWithKey(endKey)); !ok { // To calculate the minimal ts, the default value is math.MaxUint64 tailTs := uint64(math.MaxUint64) @@ -92,7 +93,7 @@ func (m *RangeTsMap) Set(startKey, endKey []byte, ts uint64) { } // GetMin gets the min ts value among the given range. endKey must be greater than startKey. -func (m *RangeTsMap) GetMin(startKey, endKey []byte) uint64 { +func (m *rangeTsMap) GetMin(startKey, endKey []byte) uint64 { var ts uint64 = math.MaxUint64 m.m.DescendLessOrEqual(rangeTsEntryWithKey(startKey), func(i *rangeTsEntry) bool { ts = i.ts @@ -115,6 +116,7 @@ type rangeLockEntry struct { regionID uint64 version uint64 waiters []chan<- interface{} + state LockedRange } func rangeLockEntryWithKey(key []byte) *rangeLockEntry { @@ -136,36 +138,36 @@ func (e *rangeLockEntry) String() string { len(e.waiters)) } -var currentID uint64 = 0 - -func allocID() uint64 { - return atomic.AddUint64(¤tID, 1) -} - // RegionRangeLock is specifically used for kv client to manage exclusive region ranges. Acquiring lock will be blocked // if part of its range is already locked. It also manages checkpoint ts of all ranges. The ranges are marked by a // version number, which should comes from the Region's Epoch version. The version is used to compare which range is // new and which is old if two ranges are overlapping. type RegionRangeLock struct { + // ID to identify different RegionRangeLock instances, so logs of different instances can be distinguished. + id uint64 + totalSpan ComparableSpan changefeedLogInfo string + mu sync.Mutex - rangeCheckpointTs *RangeTsMap + rangeCheckpointTs *rangeTsMap rangeLock *btree.BTreeG[*rangeLockEntry] regionIDLock map[uint64]*rangeLockEntry - // ID to identify different RegionRangeLock instances, so logs of different instances can be distinguished. - id uint64 + stopped bool + refCount uint64 } // NewRegionRangeLock creates a new RegionRangeLock. func NewRegionRangeLock( + id uint64, startKey, endKey []byte, startTs uint64, changefeedLogInfo string, ) *RegionRangeLock { return &RegionRangeLock{ + id: id, + totalSpan: ComparableSpan{Start: startKey, End: endKey}, changefeedLogInfo: changefeedLogInfo, - rangeCheckpointTs: NewRangeTsMap(startKey, endKey, startTs), + rangeCheckpointTs: newRangeTsMap(startKey, endKey, startTs), rangeLock: btree.NewG[*rangeLockEntry](16, rangeLockEntryLess), regionIDLock: make(map[uint64]*rangeLockEntry), - id: allocID(), } } @@ -207,6 +209,9 @@ func (l *RegionRangeLock) getOverlappedEntries(startKey, endKey []byte, regionID func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, version uint64) (LockRangeResult, []<-chan interface{}) { l.mu.Lock() defer l.mu.Unlock() + if l.stopped { + return LockRangeResult{Status: LockRangeStatusCancel}, nil + } overlappingEntries := l.getOverlappedEntries(startKey, endKey, regionID) @@ -218,6 +223,8 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio regionID: regionID, version: version, } + newEntry.state.CheckpointTs.Store(checkpointTs) + newEntry.state.Created = time.Now() l.rangeLock.ReplaceOrInsert(newEntry) l.regionIDLock[regionID] = newEntry @@ -229,9 +236,11 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey))) + l.refCount += 1 return LockRangeResult{ Status: LockRangeStatusSuccess, CheckpointTs: checkpointTs, + LockedRange: &newEntry.state, }, nil } @@ -305,7 +314,9 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio } // LockRange locks a range with specified version. -func (l *RegionRangeLock) LockRange(ctx context.Context, startKey, endKey []byte, regionID, version uint64) LockRangeResult { +func (l *RegionRangeLock) LockRange( + ctx context.Context, startKey, endKey []byte, regionID, version uint64, +) LockRangeResult { res, signalChs := l.tryLockRange(startKey, endKey, regionID, version) if res.Status != LockRangeStatusWait { @@ -334,22 +345,23 @@ func (l *RegionRangeLock) LockRange(ctx context.Context, startKey, endKey []byte } // UnlockRange unlocks a range and update checkpointTs of the range to specified value. -func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version uint64, checkpointTs uint64) { +// If it returns true it means it is stopped and all ranges are unlocked correctly. +func (l *RegionRangeLock) UnlockRange( + startKey, endKey []byte, regionID, version uint64, + checkpointTs ...uint64, +) (drained bool) { l.mu.Lock() defer l.mu.Unlock() entry, ok := l.rangeLock.Get(rangeLockEntryWithKey(startKey)) - if !ok { log.Panic("unlocking a not locked range", zap.String("changefeed", l.changefeedLogInfo), zap.Uint64("regionID", regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), - zap.Uint64("version", version), - zap.Uint64("checkpointTs", checkpointTs)) + zap.Uint64("version", version)) } - if entry.regionID != regionID { log.Panic("unlocked a range but regionID mismatch", zap.String("changefeed", l.changefeedLogInfo), @@ -366,6 +378,8 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version zap.String("regionIDLockEntry", l.regionIDLock[regionID].String())) } delete(l.regionIDLock, regionID) + l.refCount -= 1 + drained = l.stopped && l.refCount == 0 if entry.version != version || !bytes.Equal(entry.endKey, endKey) { log.Panic("unlocking region doesn't match the locked region", @@ -374,7 +388,6 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), zap.Uint64("version", version), - zap.Uint64("checkpointTs", checkpointTs), zap.String("foundLockEntry", entry.String())) } @@ -382,17 +395,40 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version ch <- nil } - _, ok = l.rangeLock.Delete(entry) - if !ok { + if entry, ok = l.rangeLock.Delete(entry); !ok { panic("unreachable") } - l.rangeCheckpointTs.Set(startKey, endKey, checkpointTs) + + var newCheckpointTs uint64 + if len(checkpointTs) > 0 { + newCheckpointTs = checkpointTs[0] + } else { + newCheckpointTs = entry.state.CheckpointTs.Load() + } + + l.rangeCheckpointTs.Set(startKey, endKey, newCheckpointTs) log.Debug("unlocked range", zap.String("changefeed", l.changefeedLogInfo), zap.Uint64("lockID", l.id), zap.Uint64("regionID", entry.regionID), - zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("checkpointTs", newCheckpointTs), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey))) + return +} + +// RefCount returns how many ranges are locked. +func (l *RegionRangeLock) RefCount() uint64 { + l.mu.Lock() + defer l.mu.Unlock() + return l.refCount +} + +// Stop stops the instance. +func (l *RegionRangeLock) Stop() (drained bool) { + l.mu.Lock() + defer l.mu.Unlock() + l.stopped = true + return l.stopped && l.refCount == 0 } const ( @@ -407,15 +443,82 @@ const ( ) // LockRangeResult represents the result of LockRange method of RegionRangeLock. -// If Status is LockRangeStatusSuccess, the CheckpointTs field will be the minimal checkpoint ts among the locked -// range. +// If Status is LockRangeStatusSuccess: +// - CheckpointTs will be the minimal checkpoint ts among the locked range; +// - LockedRange is for recording real-time state changes; +// // If Status is LockRangeStatusWait, it means the lock cannot be acquired immediately. WaitFn must be invoked to // continue waiting and acquiring the lock. +// // If Status is LockRangeStatusStale, it means the LockRange request is stale because there's already a overlapping // locked range, whose version is greater or equals to the requested one. type LockRangeResult struct { Status int CheckpointTs uint64 + LockedRange *LockedRange WaitFn func() LockRangeResult RetryRanges []ComparableSpan } + +// LockedRange is returned by `RegionRangeLock.LockRange`, which can be used to +// collect informations for the range. And collected informations can be accessed +// by iterating `RegionRangeLock`. +type LockedRange struct { + CheckpointTs atomic.Uint64 + Initialzied atomic.Bool + Created time.Time +} + +// CollectLockedRangeAttrs collects locked range attributes. +func (l *RegionRangeLock) CollectLockedRangeAttrs( + action func(regionID uint64, state *LockedRange), +) (r CollectedLockedRangeAttrs) { + l.mu.Lock() + defer l.mu.Unlock() + r.FastestRegion.CheckpointTs = 0 + r.SlowestRegion.CheckpointTs = math.MaxUint64 + + lastEnd := l.totalSpan.Start + l.rangeLock.Ascend(func(item *rangeLockEntry) bool { + if action != nil { + action(item.regionID, &item.state) + } + if EndCompare(lastEnd, item.startKey) < 0 { + r.Holes = append(r.Holes, ComparableSpan{Start: lastEnd, End: item.startKey}) + } + ckpt := item.state.CheckpointTs.Load() + if ckpt > r.FastestRegion.CheckpointTs { + r.FastestRegion.RegionID = item.regionID + r.FastestRegion.CheckpointTs = ckpt + r.FastestRegion.Initialized = item.state.Initialzied.Load() + r.FastestRegion.Created = item.state.Created + } + if ckpt < r.SlowestRegion.CheckpointTs { + r.SlowestRegion.RegionID = item.regionID + r.SlowestRegion.CheckpointTs = ckpt + r.SlowestRegion.Initialized = item.state.Initialzied.Load() + r.SlowestRegion.Created = item.state.Created + } + lastEnd = item.endKey + return true + }) + if EndCompare(lastEnd, l.totalSpan.End) < 0 { + r.Holes = append(r.Holes, ComparableSpan{Start: lastEnd, End: l.totalSpan.End}) + } + return +} + +// CollectedLockedRangeAttrs returns by `RegionRangeLock.CollectedLockedRangeAttrs`. +type CollectedLockedRangeAttrs struct { + Holes []ComparableSpan + FastestRegion LockedRangeAttrs + SlowestRegion LockedRangeAttrs +} + +// LockedRangeAttrs is like `LockedRange`, but only contains some read-only attributes. +type LockedRangeAttrs struct { + RegionID uint64 + CheckpointTs uint64 + Initialized bool + Created time.Time +} diff --git a/pkg/regionspan/region_range_lock_test.go b/pkg/regionspan/region_range_lock_test.go index e159b6d412b..13020899dfc 100644 --- a/pkg/regionspan/region_range_lock_test.go +++ b/pkg/regionspan/region_range_lock_test.go @@ -87,7 +87,7 @@ func TestRegionRangeLock(t *testing.T) { t.Parallel() ctx := context.TODO() - l := NewRegionRangeLock([]byte("a"), []byte("h"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("h"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "a", "e", 1, 1, math.MaxUint64) unlockRange(l, "a", "e", 1, 1, 100) @@ -104,7 +104,7 @@ func TestRegionRangeLock(t *testing.T) { func TestRegionRangeLockStale(t *testing.T) { t.Parallel() - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "") ctx := context.TODO() mustLockRangeSuccess(ctx, t, l, "c", "g", 1, 10, math.MaxUint64) mustLockRangeSuccess(ctx, t, l, "j", "n", 2, 8, math.MaxUint64) @@ -127,7 +127,7 @@ func TestRegionRangeLockLockingRegionID(t *testing.T) { t.Parallel() ctx := context.TODO() - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "c", "d", 1, 10, math.MaxUint64) mustLockRangeStale(ctx, t, l, "e", "f", 1, 5, "e", "f") @@ -163,7 +163,7 @@ func TestRegionRangeLockCanBeCancelled(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "g", "h", 1, 10, math.MaxUint64) wait := mustLockRangeWait(ctx, t, l, "g", "h", 1, 12) cancel() @@ -174,7 +174,7 @@ func TestRegionRangeLockCanBeCancelled(t *testing.T) { func TestRangeTsMap(t *testing.T) { t.Parallel() - m := NewRangeTsMap([]byte("a"), []byte("z"), math.MaxUint64) + m := newRangeTsMap([]byte("a"), []byte("z"), math.MaxUint64) mustGetMin := func(startKey, endKey string, expectedTs uint64) { ts := m.GetMin([]byte(startKey), []byte(endKey)) diff --git a/pkg/txnutil/lock_resolver.go b/pkg/txnutil/lock_resolver.go index c0b0aa49884..91e9e0e02c3 100644 --- a/pkg/txnutil/lock_resolver.go +++ b/pkg/txnutil/lock_resolver.go @@ -53,16 +53,32 @@ func NewLockerResolver( const scanLockLimit = 1024 -func (r *resolver) Resolve(ctx context.Context, regionID uint64, maxVersion uint64) error { - // TODO test whether this function will kill active transaction +func (r *resolver) Resolve(ctx context.Context, regionID uint64, maxVersion uint64) (err error) { + var lockCount int = 0 + + log.Info("resolve lock starts", + zap.Uint64("regionID", regionID), + zap.Uint64("maxVersion", maxVersion), + zap.String("namespace", r.changefeed.Namespace), + zap.String("changefeed", r.changefeed.ID)) + defer func() { + log.Info("resolve lock finishes", + zap.Uint64("regionID", regionID), + zap.Int("lockCount", lockCount), + zap.Uint64("maxVersion", maxVersion), + zap.String("namespace", r.changefeed.Namespace), + zap.String("changefeed", r.changefeed.ID), + zap.Error(err)) + }() + + // TODO test whether this function will kill active transaction req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{ MaxVersion: maxVersion, Limit: scanLockLimit, }) bo := tikv.NewGcResolveLockMaxBackoffer(ctx) - var lockCount int var loc *tikv.KeyLocation var key []byte flushRegion := func() error { @@ -131,12 +147,5 @@ func (r *resolver) Resolve(ctx context.Context, regionID uint64, maxVersion uint } bo = tikv.NewGcResolveLockMaxBackoffer(ctx) } - log.Info("resolve lock successfully", - zap.Uint64("regionID", regionID), - zap.Int("lockCount", lockCount), - zap.Uint64("maxVersion", maxVersion), - zap.String("namespace", r.changefeed.Namespace), - zap.String("changefeed", r.changefeed.ID), - zap.Any("role", r.role)) return nil }