diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 781c12d8fb8..dc8822a0e63 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1326,7 +1326,7 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can } func (s *eventFeedSession) logSlowRegions(ctx context.Context) error { - ticker := time.NewTicker(10 * time.Second) + ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { @@ -1335,12 +1335,18 @@ func (s *eventFeedSession) logSlowRegions(ctx context.Context) error { case <-ticker.C: } - currTime := s.client.pdClock.CurrentTime() attr := s.rangeLock.CollectLockedRangeAttrs(nil) + ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs) + currTime := s.client.pdClock.CurrentTime() + log.Info("event feed starts to check locked regions", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName)) + if attr.SlowestRegion.Initialized { - ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs) if currTime.Sub(ckptTime) > 2*resolveLockMinInterval { - log.Info("event feed finds a slow region", + log.Info("event feed finds a initialized slow region", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), @@ -1354,6 +1360,13 @@ func (s *eventFeedSession) logSlowRegions(ctx context.Context) error { zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), zap.Any("slowRegion", attr.SlowestRegion)) + } else if currTime.Sub(ckptTime) > 10*time.Minute { + log.Info("event feed finds a uninitialized slow region", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Any("slowRegion", attr.SlowestRegion)) } if len(attr.Holes) > 0 { holes := make([]string, 0, len(attr.Holes)) diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index 33ef73d9444..46a18e547f1 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -32,6 +32,7 @@ type Frontier interface { Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64) Frontier() uint64 String() string + Entries(fn func(key []byte, ts uint64)) } // spanFrontier tracks the minimum timestamp of a set of spans. diff --git a/cdc/puller/frontier/frontier_test.go b/cdc/puller/frontier/frontier_test.go index 70f8903915b..ca1daefb377 100644 --- a/cdc/puller/frontier/frontier_test.go +++ b/cdc/puller/frontier/frontier_test.go @@ -15,6 +15,7 @@ package frontier import ( "bytes" + "math" "math/rand" "sort" "testing" @@ -393,3 +394,46 @@ func TestMinMaxWithRegionSplitMerge(t *testing.T) { f.Forward(8, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")}, 5) require.Equal(t, uint64(5), f.Frontier()) } + +func TestFrontierEntries(t *testing.T) { + t.Parallel() + + ab := regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")} + bc := regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")} + cd := regionspan.ComparableSpan{Start: []byte("c"), End: []byte("d")} + de := regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")} + ef := regionspan.ComparableSpan{Start: []byte("e"), End: []byte("f")} + af := regionspan.ComparableSpan{Start: []byte("a"), End: []byte("f")} + f := NewFrontier(0, c, af) + + var slowestTs uint64 = math.MaxUint64 + var slowestRange regionspan.ComparableSpan + getSlowestRange := func() { + slowestTs = math.MaxUint64 + slowestRange = regionspan.ComparableSpan{} + f.Entries(func(key []byte, ts uint64) { + if ts < slowestTs { + slowestTs = ts + slowestRange.Start = key + slowestRange.End = nil + } else if slowestTs != math.MaxUint64 && len(slowestRange.End) == 0 { + slowestRange.End = key + } + }) + } + + getSlowestRange() + require.Equal(t, uint64(0), slowestTs) + require.Equal(t, []byte("a"), slowestRange.Start) + require.Equal(t, []byte("f"), slowestRange.End) + + f.Forward(1, ab, 100) + f.Forward(2, bc, 200) + f.Forward(3, cd, 300) + f.Forward(4, de, 400) + f.Forward(5, ef, 500) + getSlowestRange() + require.Equal(t, uint64(100), slowestTs) + require.Equal(t, []byte("a"), slowestRange.Start) + require.Equal(t, []byte("b"), slowestRange.End) +} diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index c740b2ef952..a89559f5445 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -15,6 +15,7 @@ package puller import ( "context" + "math" "sync/atomic" "time" @@ -171,6 +172,8 @@ func (p *pullerImpl) Run(ctx context.Context) error { }() lastResolvedTs := p.checkpointTs + lastAdvancedTime := time.Now() + lastLogSlowRangeTime := time.Now() g.Go(func() error { metricsTicker := time.NewTicker(15 * time.Second) defer metricsTicker.Stop() @@ -265,10 +268,38 @@ func (p *pullerImpl) Run(ctx context.Context) error { zap.Duration("duration", time.Since(start)), zap.Strings("spans", spans)) } - if !initialized || resolvedTs == lastResolvedTs { + if !initialized { + continue + } + if resolvedTs <= lastResolvedTs { + if time.Since(lastAdvancedTime) > 30*time.Second && time.Since(lastLogSlowRangeTime) > 30*time.Second { + var slowestTs uint64 = math.MaxUint64 + slowestRange := regionspan.ComparableSpan{} + rangeFilled := true + p.tsTracker.Entries(func(key []byte, ts uint64) { + if ts < slowestTs { + slowestTs = ts + slowestRange.Start = key + rangeFilled = false + } else if !rangeFilled { + slowestRange.End = key + rangeFilled = true + } + }) + log.Info("table puller has been stucked", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.Int64("tableID", p.tableID), + zap.String("tableName", p.tableName), + zap.Uint64("resolvedTs", resolvedTs), + zap.Uint64("slowestRangeTs", slowestTs), + zap.Stringer("range", &slowestRange)) + lastLogSlowRangeTime = time.Now() + } continue } lastResolvedTs = resolvedTs + lastAdvancedTime = time.Now() err := output(&model.RawKVEntry{CRTs: resolvedTs, OpType: model.OpTypeResolved, RegionID: e.RegionID}) if err != nil { return errors.Trace(err)