Skip to content

Commit

Permalink
puller(cdc): debug frontier (#10202) (#10269) (#10321)
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy authored Dec 18, 2023
1 parent 64d0522 commit 6b082ea
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 5 deletions.
21 changes: 17 additions & 4 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can
}

func (s *eventFeedSession) logSlowRegions(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
Expand All @@ -1335,12 +1335,18 @@ func (s *eventFeedSession) logSlowRegions(ctx context.Context) error {
case <-ticker.C:
}

currTime := s.client.pdClock.CurrentTime()
attr := s.rangeLock.CollectLockedRangeAttrs(nil)
ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs)
currTime := s.client.pdClock.CurrentTime()
log.Info("event feed starts to check locked regions",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName))

if attr.SlowestRegion.Initialized {
ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs)
if currTime.Sub(ckptTime) > 2*resolveLockMinInterval {
log.Info("event feed finds a slow region",
log.Info("event feed finds a initialized slow region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
Expand All @@ -1354,6 +1360,13 @@ func (s *eventFeedSession) logSlowRegions(ctx context.Context) error {
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("slowRegion", attr.SlowestRegion))
} else if currTime.Sub(ckptTime) > 10*time.Minute {
log.Info("event feed finds a uninitialized slow region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("slowRegion", attr.SlowestRegion))
}
if len(attr.Holes) > 0 {
holes := make([]string, 0, len(attr.Holes))
Expand Down
1 change: 1 addition & 0 deletions cdc/puller/frontier/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Frontier interface {
Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64)
Frontier() uint64
String() string
Entries(fn func(key []byte, ts uint64))
}

// spanFrontier tracks the minimum timestamp of a set of spans.
Expand Down
44 changes: 44 additions & 0 deletions cdc/puller/frontier/frontier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package frontier

import (
"bytes"
"math"
"math/rand"
"sort"
"testing"
Expand Down Expand Up @@ -393,3 +394,46 @@ func TestMinMaxWithRegionSplitMerge(t *testing.T) {
f.Forward(8, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")}, 5)
require.Equal(t, uint64(5), f.Frontier())
}

func TestFrontierEntries(t *testing.T) {
t.Parallel()

ab := regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}
bc := regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")}
cd := regionspan.ComparableSpan{Start: []byte("c"), End: []byte("d")}
de := regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")}
ef := regionspan.ComparableSpan{Start: []byte("e"), End: []byte("f")}
af := regionspan.ComparableSpan{Start: []byte("a"), End: []byte("f")}
f := NewFrontier(0, c, af)

var slowestTs uint64 = math.MaxUint64
var slowestRange regionspan.ComparableSpan
getSlowestRange := func() {
slowestTs = math.MaxUint64
slowestRange = regionspan.ComparableSpan{}
f.Entries(func(key []byte, ts uint64) {
if ts < slowestTs {
slowestTs = ts
slowestRange.Start = key
slowestRange.End = nil
} else if slowestTs != math.MaxUint64 && len(slowestRange.End) == 0 {
slowestRange.End = key
}
})
}

getSlowestRange()
require.Equal(t, uint64(0), slowestTs)
require.Equal(t, []byte("a"), slowestRange.Start)
require.Equal(t, []byte("f"), slowestRange.End)

f.Forward(1, ab, 100)
f.Forward(2, bc, 200)
f.Forward(3, cd, 300)
f.Forward(4, de, 400)
f.Forward(5, ef, 500)
getSlowestRange()
require.Equal(t, uint64(100), slowestTs)
require.Equal(t, []byte("a"), slowestRange.Start)
require.Equal(t, []byte("b"), slowestRange.End)
}
33 changes: 32 additions & 1 deletion cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package puller

import (
"context"
"math"
"sync/atomic"
"time"

Expand Down Expand Up @@ -171,6 +172,8 @@ func (p *pullerImpl) Run(ctx context.Context) error {
}()

lastResolvedTs := p.checkpointTs
lastAdvancedTime := time.Now()
lastLogSlowRangeTime := time.Now()
g.Go(func() error {
metricsTicker := time.NewTicker(15 * time.Second)
defer metricsTicker.Stop()
Expand Down Expand Up @@ -265,10 +268,38 @@ func (p *pullerImpl) Run(ctx context.Context) error {
zap.Duration("duration", time.Since(start)),
zap.Strings("spans", spans))
}
if !initialized || resolvedTs == lastResolvedTs {
if !initialized {
continue
}
if resolvedTs <= lastResolvedTs {
if time.Since(lastAdvancedTime) > 30*time.Second && time.Since(lastLogSlowRangeTime) > 30*time.Second {
var slowestTs uint64 = math.MaxUint64
slowestRange := regionspan.ComparableSpan{}
rangeFilled := true
p.tsTracker.Entries(func(key []byte, ts uint64) {
if ts < slowestTs {
slowestTs = ts
slowestRange.Start = key
rangeFilled = false
} else if !rangeFilled {
slowestRange.End = key
rangeFilled = true
}
})
log.Info("table puller has been stucked",
zap.String("namespace", p.changefeed.Namespace),
zap.String("changefeed", p.changefeed.ID),
zap.Int64("tableID", p.tableID),
zap.String("tableName", p.tableName),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("slowestRangeTs", slowestTs),
zap.Stringer("range", &slowestRange))
lastLogSlowRangeTime = time.Now()
}
continue
}
lastResolvedTs = resolvedTs
lastAdvancedTime = time.Now()
err := output(&model.RawKVEntry{CRTs: resolvedTs, OpType: model.OpTypeResolved, RegionID: e.RegionID})
if err != nil {
return errors.Trace(err)
Expand Down

0 comments on commit 6b082ea

Please sign in to comment.