Skip to content

Commit

Permalink
This is an automated cherry-pick of #9933
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
hicqu authored and ti-chi-bot committed Nov 1, 2023
1 parent 453aea8 commit 3483a3b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
15 changes: 13 additions & 2 deletions cdc/kv/regionlock/region_range_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,18 @@ func (l *RegionRangeLock) CollectLockedRangeAttrs(

lastEnd := l.totalSpan.StartKey
l.rangeLock.Ascend(func(item *rangeLockEntry) bool {
<<<<<<< HEAD

Check failure on line 492 in cdc/kv/regionlock/region_range_lock.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expected }

Check failure on line 492 in cdc/kv/regionlock/region_range_lock.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expected }
action(item.regionID, &item.state)

r.HoleExists = r.HoleExists || spanz.EndCompare(lastEnd, item.startKey) < 0
=======

Check failure on line 496 in cdc/kv/regionlock/region_range_lock.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ==, expected }

Check failure on line 496 in cdc/kv/regionlock/region_range_lock.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ==, expected }
if action != nil {
action(item.regionID, &item.state)
}
if spanz.EndCompare(lastEnd, item.startKey) < 0 {
r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: item.startKey})
}
>>>>>>> a71208a423 (kv-client(cdc): log slowest regions and region holes (#9933))

Check failure on line 503 in cdc/kv/regionlock/region_range_lock.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'

Check failure on line 503 in cdc/kv/regionlock/region_range_lock.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'
ckpt := item.state.CheckpointTs.Load()
if ckpt > r.FastestRegion.CheckpointTs {
r.FastestRegion.RegionID = item.regionID
Expand All @@ -508,13 +517,15 @@ func (l *RegionRangeLock) CollectLockedRangeAttrs(
lastEnd = item.endKey
return true
})
r.HoleExists = r.HoleExists || spanz.EndCompare(lastEnd, l.totalSpan.EndKey) < 0
if spanz.EndCompare(lastEnd, l.totalSpan.EndKey) < 0 {
r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: l.totalSpan.EndKey})
}
return
}

// CollectedLockedRangeAttrs returns by `RegionRangeLock.CollectedLockedRangeAttrs`.
type CollectedLockedRangeAttrs struct {
HoleExists bool
Holes []tablepb.Span
FastestRegion LockedRangeAttrs
SlowestRegion LockedRangeAttrs
}
Expand Down
50 changes: 50 additions & 0 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package kv
import (
"context"
"encoding/binary"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -40,6 +42,7 @@ import (
"github.com/pingcap/tiflow/pkg/version"
"github.com/prometheus/client_golang/prometheus"
kvclientv2 "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -246,6 +249,7 @@ func (s *SharedClient) Run(ctx context.Context) error {
g.Go(func() error { return s.requestRegionToStore(ctx, g) })
g.Go(func() error { return s.handleErrors(ctx) })
g.Go(func() error { return s.resolveLock(ctx) })
g.Go(func() error { return s.logSlowRegions(ctx) })

log.Info("event feed started",
zap.String("namespace", s.changefeed.Namespace),
Expand Down Expand Up @@ -689,6 +693,52 @@ func (s *SharedClient) resolveLock(ctx context.Context) error {
}
}

func (s *SharedClient) logSlowRegions(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}

currTime := s.pdClock.CurrentTime()
s.totalSpans.RLock()
for subscriptionID, rt := range s.totalSpans.v {
attr := rt.rangeLock.CollectLockedRangeAttrs(nil)
if attr.SlowestRegion.Initialized {
ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs)
if currTime.Sub(ckptTime) > 2*resolveLockMinInterval {
log.Info("event feed finds a slow region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Any("slowRegion", attr.SlowestRegion))
}
} else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute {
log.Info("event feed initializes a region too slow",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Any("slowRegion", attr.SlowestRegion))
}
if len(attr.Holes) > 0 {
holes := make([]string, 0, len(attr.Holes))
for _, hole := range attr.Holes {
holes = append(holes, fmt.Sprintf("[%s,%s)", hole.StartKey, hole.EndKey))
}
log.Info("event feed holes exist",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.String("holes", strings.Join(holes, ", ")))
}
}
s.totalSpans.RUnlock()
}
}

func (s *SharedClient) newRequestedTable(
subID SubscriptionID, span tablepb.Span, startTs uint64,
eventCh chan<- MultiplexingEvent,
Expand Down

0 comments on commit 3483a3b

Please sign in to comment.