diff --git a/cdc/kv/client.go b/cdc/kv/client.go index f70ec5e0e97..ea30d0f2ee2 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -19,6 +19,7 @@ import ( "io" "math/rand" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -42,6 +43,7 @@ import ( "github.com/pingcap/tiflow/pkg/version" "github.com/prometheus/client_golang/prometheus" tidbkv "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -422,13 +424,11 @@ func (s *eventFeedSession) eventFeed(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - return s.dispatchRequest(ctx) - }) + g.Go(func() error { return s.dispatchRequest(ctx) }) - g.Go(func() error { - return s.requestRegionToStore(ctx, g) - }) + g.Go(func() error { return s.requestRegionToStore(ctx, g) }) + + g.Go(func() error { return s.logSlowRegions(ctx) }) g.Go(func() error { for { @@ -688,13 +688,13 @@ func (s *eventFeedSession) requestRegionToStore( state := newRegionFeedState(sri, requestID) pendingRegions.setByRequestID(requestID, state) - log.Debug("start new request", + log.Info("start new request", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), - zap.String("addr", storeAddr), - zap.Any("request", req)) + zap.Uint64("regionID", sri.verID.GetID()), + zap.String("addr", storeAddr)) err = stream.client.Send(req) @@ -888,6 +888,13 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI switch eerr := errors.Cause(err).(type) { case *eventError: innerErr := eerr.err + log.Info("cdc region error", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Stringer("error", innerErr)) + if notLeader := innerErr.GetNotLeader(); notLeader != nil { metricFeedNotLeaderCounter.Inc() s.client.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx) @@ -1215,6 +1222,17 @@ func (s *eventFeedSession) sendRegionChangeEvents( continue } + switch x := event.Event.(type) { + case *cdcpb.Event_Error: + log.Info("event feed receives a region error", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("regionID", event.RegionId), + zap.Any("error", x.Error)) + } + slot := worker.inputCalcSlot(event.RegionId) statefulEvents[slot] = append(statefulEvents[slot], ®ionStatefulEvent{ changeEvent: event, @@ -1307,6 +1325,51 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can return } +func (s *eventFeedSession) logSlowRegions(ctx context.Context) error { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + + currTime := s.client.pdClock.CurrentTime() + attr := s.rangeLock.CollectLockedRangeAttrs(nil) + if attr.SlowestRegion.Initialized { + ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs) + if currTime.Sub(ckptTime) > 2*resolveLockMinInterval { + log.Info("event feed finds a slow region", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Any("slowRegion", attr.SlowestRegion)) + } + } else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute { + log.Info("event feed initializes a region too slow", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Any("slowRegion", attr.SlowestRegion)) + } + if len(attr.Holes) > 0 { + holes := make([]string, 0, len(attr.Holes)) + for _, hole := range attr.Holes { + holes = append(holes, fmt.Sprintf("[%s,%s)", hole.StartKey, hole.EndKey)) + } + log.Info("event feed holes exist", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.String("holes", strings.Join(holes, ", "))) + } + } +} + func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row) (model.RegionFeedEvent, error) { var opType model.OpType switch entry.GetOpType() { diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 7a8d55227dd..b38639221bb 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -204,6 +204,7 @@ func (w *regionWorker) checkShouldExit() error { func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState) error { regionID := state.getRegionID() + isStale := state.isStale() log.Info("single region event feed disconnected", zap.String("namespace", w.session.client.changefeed.Namespace), zap.String("changefeed", w.session.client.changefeed.ID), @@ -211,9 +212,10 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState zap.Uint64("requestID", state.requestID), zap.Stringer("span", &state.sri.span), zap.Uint64("resolvedTs", state.sri.resolvedTs()), + zap.Bool("isStale", isStale), zap.Error(err)) // if state is already marked stopped, it must have been or would be processed by `onRegionFail` - if state.isStale() { + if isStale { return w.checkShouldExit() } // We need to ensure when the error is handled, `isStale` must be set. So set it before sending the error. diff --git a/cdc/kv/regionlock/region_range_lock.go b/cdc/kv/regionlock/region_range_lock.go index ac876dd5545..3a4e4b7703c 100644 --- a/cdc/kv/regionlock/region_range_lock.go +++ b/cdc/kv/regionlock/region_range_lock.go @@ -489,9 +489,12 @@ func (l *RegionRangeLock) CollectLockedRangeAttrs( lastEnd := l.totalSpan.StartKey l.rangeLock.Ascend(func(item *rangeLockEntry) bool { - action(item.regionID, &item.state) - - r.HoleExists = r.HoleExists || spanz.EndCompare(lastEnd, item.startKey) < 0 + if action != nil { + action(item.regionID, &item.state) + } + if spanz.EndCompare(lastEnd, item.startKey) < 0 { + r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: item.startKey}) + } ckpt := item.state.CheckpointTs.Load() if ckpt > r.FastestRegion.CheckpointTs { r.FastestRegion.RegionID = item.regionID @@ -508,13 +511,15 @@ func (l *RegionRangeLock) CollectLockedRangeAttrs( lastEnd = item.endKey return true }) - r.HoleExists = r.HoleExists || spanz.EndCompare(lastEnd, l.totalSpan.EndKey) < 0 + if spanz.EndCompare(lastEnd, l.totalSpan.EndKey) < 0 { + r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: l.totalSpan.EndKey}) + } return } // CollectedLockedRangeAttrs returns by `RegionRangeLock.CollectedLockedRangeAttrs`. type CollectedLockedRangeAttrs struct { - HoleExists bool + Holes []tablepb.Span FastestRegion LockedRangeAttrs SlowestRegion LockedRangeAttrs } diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index d61d5a23a88..69dfb2c4aa6 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -16,6 +16,8 @@ package kv import ( "context" "encoding/binary" + "fmt" + "strings" "sync" "sync/atomic" "time" @@ -40,6 +42,7 @@ import ( "github.com/pingcap/tiflow/pkg/version" "github.com/prometheus/client_golang/prometheus" kvclientv2 "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -246,6 +249,7 @@ func (s *SharedClient) Run(ctx context.Context) error { g.Go(func() error { return s.requestRegionToStore(ctx, g) }) g.Go(func() error { return s.handleErrors(ctx) }) g.Go(func() error { return s.resolveLock(ctx) }) + g.Go(func() error { return s.logSlowRegions(ctx) }) log.Info("event feed started", zap.String("namespace", s.changefeed.Namespace), @@ -689,6 +693,52 @@ func (s *SharedClient) resolveLock(ctx context.Context) error { } } +func (s *SharedClient) logSlowRegions(ctx context.Context) error { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + + currTime := s.pdClock.CurrentTime() + s.totalSpans.RLock() + for subscriptionID, rt := range s.totalSpans.v { + attr := rt.rangeLock.CollectLockedRangeAttrs(nil) + if attr.SlowestRegion.Initialized { + ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs) + if currTime.Sub(ckptTime) > 2*resolveLockMinInterval { + log.Info("event feed finds a slow region", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscriptionID), + zap.Any("slowRegion", attr.SlowestRegion)) + } + } else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute { + log.Info("event feed initializes a region too slow", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscriptionID), + zap.Any("slowRegion", attr.SlowestRegion)) + } + if len(attr.Holes) > 0 { + holes := make([]string, 0, len(attr.Holes)) + for _, hole := range attr.Holes { + holes = append(holes, fmt.Sprintf("[%s,%s)", hole.StartKey, hole.EndKey)) + } + log.Info("event feed holes exist", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscriptionID), + zap.String("holes", strings.Join(holes, ", "))) + } + } + s.totalSpans.RUnlock() + } +} + func (s *SharedClient) newRequestedTable( subID SubscriptionID, span tablepb.Span, startTs uint64, eventCh chan<- MultiplexingEvent, diff --git a/pkg/txnutil/lock_resolver.go b/pkg/txnutil/lock_resolver.go index 9826bbcc1dd..dfdbf4f9482 100644 --- a/pkg/txnutil/lock_resolver.go +++ b/pkg/txnutil/lock_resolver.go @@ -50,16 +50,32 @@ func NewLockerResolver( const scanLockLimit = 1024 -func (r *resolver) Resolve(ctx context.Context, regionID uint64, maxVersion uint64) error { - // TODO test whether this function will kill active transaction +func (r *resolver) Resolve(ctx context.Context, regionID uint64, maxVersion uint64) (err error) { + var lockCount int = 0 + + log.Info("resolve lock starts", + zap.Uint64("regionID", regionID), + zap.Uint64("maxVersion", maxVersion), + zap.String("namespace", r.changefeed.Namespace), + zap.String("changefeed", r.changefeed.ID)) + defer func() { + log.Info("resolve lock finishes", + zap.Uint64("regionID", regionID), + zap.Int("lockCount", lockCount), + zap.Uint64("maxVersion", maxVersion), + zap.String("namespace", r.changefeed.Namespace), + zap.String("changefeed", r.changefeed.ID), + zap.Error(err)) + }() + + // TODO test whether this function will kill active transaction req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{ MaxVersion: maxVersion, Limit: scanLockLimit, }) bo := tikv.NewGcResolveLockMaxBackoffer(ctx) - var lockCount int var loc *tikv.KeyLocation var key []byte flushRegion := func() error { @@ -128,11 +144,5 @@ func (r *resolver) Resolve(ctx context.Context, regionID uint64, maxVersion uint } bo = tikv.NewGcResolveLockMaxBackoffer(ctx) } - log.Info("resolve lock successfully", - zap.Uint64("regionID", regionID), - zap.Int("lockCount", lockCount), - zap.Uint64("maxVersion", maxVersion), - zap.String("namespace", r.changefeed.Namespace), - zap.String("changefeed", r.changefeed.ID)) return nil }