From 3483a3b80cafc56a0aa3109d60106f321520b2cb Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 20 Oct 2023 16:44:01 +0800 Subject: [PATCH] This is an automated cherry-pick of #9933 Signed-off-by: ti-chi-bot --- cdc/kv/regionlock/region_range_lock.go | 15 ++++++-- cdc/kv/shared_client.go | 50 ++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/cdc/kv/regionlock/region_range_lock.go b/cdc/kv/regionlock/region_range_lock.go index ac876dd5545..f9d272446e3 100644 --- a/cdc/kv/regionlock/region_range_lock.go +++ b/cdc/kv/regionlock/region_range_lock.go @@ -489,9 +489,18 @@ func (l *RegionRangeLock) CollectLockedRangeAttrs( lastEnd := l.totalSpan.StartKey l.rangeLock.Ascend(func(item *rangeLockEntry) bool { +<<<<<<< HEAD action(item.regionID, &item.state) r.HoleExists = r.HoleExists || spanz.EndCompare(lastEnd, item.startKey) < 0 +======= + if action != nil { + action(item.regionID, &item.state) + } + if spanz.EndCompare(lastEnd, item.startKey) < 0 { + r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: item.startKey}) + } +>>>>>>> a71208a423 (kv-client(cdc): log slowest regions and region holes (#9933)) ckpt := item.state.CheckpointTs.Load() if ckpt > r.FastestRegion.CheckpointTs { r.FastestRegion.RegionID = item.regionID @@ -508,13 +517,15 @@ func (l *RegionRangeLock) CollectLockedRangeAttrs( lastEnd = item.endKey return true }) - r.HoleExists = r.HoleExists || spanz.EndCompare(lastEnd, l.totalSpan.EndKey) < 0 + if spanz.EndCompare(lastEnd, l.totalSpan.EndKey) < 0 { + r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: l.totalSpan.EndKey}) + } return } // CollectedLockedRangeAttrs returns by `RegionRangeLock.CollectedLockedRangeAttrs`. type CollectedLockedRangeAttrs struct { - HoleExists bool + Holes []tablepb.Span FastestRegion LockedRangeAttrs SlowestRegion LockedRangeAttrs } diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index d61d5a23a88..69dfb2c4aa6 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -16,6 +16,8 @@ package kv import ( "context" "encoding/binary" + "fmt" + "strings" "sync" "sync/atomic" "time" @@ -40,6 +42,7 @@ import ( "github.com/pingcap/tiflow/pkg/version" "github.com/prometheus/client_golang/prometheus" kvclientv2 "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -246,6 +249,7 @@ func (s *SharedClient) Run(ctx context.Context) error { g.Go(func() error { return s.requestRegionToStore(ctx, g) }) g.Go(func() error { return s.handleErrors(ctx) }) g.Go(func() error { return s.resolveLock(ctx) }) + g.Go(func() error { return s.logSlowRegions(ctx) }) log.Info("event feed started", zap.String("namespace", s.changefeed.Namespace), @@ -689,6 +693,52 @@ func (s *SharedClient) resolveLock(ctx context.Context) error { } } +func (s *SharedClient) logSlowRegions(ctx context.Context) error { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + + currTime := s.pdClock.CurrentTime() + s.totalSpans.RLock() + for subscriptionID, rt := range s.totalSpans.v { + attr := rt.rangeLock.CollectLockedRangeAttrs(nil) + if attr.SlowestRegion.Initialized { + ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs) + if currTime.Sub(ckptTime) > 2*resolveLockMinInterval { + log.Info("event feed finds a slow region", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscriptionID), + zap.Any("slowRegion", attr.SlowestRegion)) + } + } else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute { + log.Info("event feed initializes a region too slow", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscriptionID), + zap.Any("slowRegion", attr.SlowestRegion)) + } + if len(attr.Holes) > 0 { + holes := make([]string, 0, len(attr.Holes)) + for _, hole := range attr.Holes { + holes = append(holes, fmt.Sprintf("[%s,%s)", hole.StartKey, hole.EndKey)) + } + log.Info("event feed holes exist", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscriptionID), + zap.String("holes", strings.Join(holes, ", "))) + } + } + s.totalSpans.RUnlock() + } +} + func (s *SharedClient) newRequestedTable( subID SubscriptionID, span tablepb.Span, startTs uint64, eventCh chan<- MultiplexingEvent,