Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv-client(cdc): add more logs to help debug slow regions (#9981) #9983

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 100 additions & 34 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/pingcap/tiflow/pkg/version"
"github.com/prometheus/client_golang/prometheus"
tidbkv "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -271,7 +273,7 @@ func (c *CDCClient) EventFeed(
eventCh chan<- model.RegionFeedEvent,
) error {
s := newEventFeedSession(c, span, lockResolver, ts, eventCh)
return s.eventFeed(ctx, ts)
return s.eventFeed(ctx)
}

// RegionCount returns the number of captured regions.
Expand Down Expand Up @@ -365,7 +367,6 @@ type eventFeedSession struct {

type rangeRequestTask struct {
span tablepb.Span
ts uint64
}

func newEventFeedSession(
Expand All @@ -375,9 +376,10 @@ func newEventFeedSession(
startTs uint64,
eventCh chan<- model.RegionFeedEvent,
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
id := allocID()
idStr := strconv.FormatUint(id, 10)
rangeLock := regionlock.NewRegionRangeLock(
totalSpan.StartKey, totalSpan.EndKey, startTs,
id, totalSpan.StartKey, totalSpan.EndKey, startTs,
client.changefeed.Namespace+"."+client.changefeed.ID)
return &eventFeedSession{
client: client,
Expand All @@ -390,7 +392,7 @@ func newEventFeedSession(
eventCh: eventCh,
rangeLock: rangeLock,
lockResolver: lockResolver,
id: id,
id: idStr,
regionChSizeGauge: clientChannelSize.WithLabelValues("region"),
errChSizeGauge: clientChannelSize.WithLabelValues("err"),
rangeChSizeGauge: clientChannelSize.WithLabelValues("range"),
Expand All @@ -406,7 +408,7 @@ func newEventFeedSession(
}
}

func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
func (s *eventFeedSession) eventFeed(ctx context.Context) error {
s.requestRangeCh = chann.NewAutoDrainChann[rangeRequestTask]()
s.regionCh = chann.NewAutoDrainChann[singleRegionInfo]()
s.regionRouter = chann.NewAutoDrainChann[singleRegionInfo]()
Expand All @@ -423,13 +425,11 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
return s.dispatchRequest(ctx)
})
g.Go(func() error { return s.dispatchRequest(ctx) })

g.Go(func() error {
return s.requestRegionToStore(ctx, g)
})
g.Go(func() error { return s.requestRegionToStore(ctx, g) })

g.Go(func() error { return s.logSlowRegions(ctx) })

g.Go(func() error {
for {
Expand All @@ -447,7 +447,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
// Besides the count or frequency of range request is limited,
// we use ephemeral goroutine instead of permanent goroutine.
g.Go(func() error {
return s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts)
return s.divideAndSendEventFeedToRegions(ctx, task.span)
})
}
}
Expand All @@ -468,15 +468,15 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
}
})

s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan, ts: ts}
s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan}
s.rangeChSizeGauge.Inc()

log.Info("event feed started",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("startTs", ts),
zap.Uint64("startTs", s.startTs),
zap.Stringer("span", &s.totalSpan))

return g.Wait()
Expand All @@ -485,9 +485,9 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
// scheduleDivideRegionAndRequest schedules a range to be divided by regions,
// and these regions will be then scheduled to send ChangeData requests.
func (s *eventFeedSession) scheduleDivideRegionAndRequest(
ctx context.Context, span tablepb.Span, ts uint64,
ctx context.Context, span tablepb.Span,
) {
task := rangeRequestTask{span: span, ts: ts}
task := rangeRequestTask{span: span}
select {
case s.requestRangeCh.In() <- task:
s.rangeChSizeGauge.Inc()
Expand All @@ -501,7 +501,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
handleResult := func(res regionlock.LockRangeResult) {
switch res.Status {
case regionlock.LockRangeStatusSuccess:
sri.resolvedTs = res.CheckpointTs
sri.lockedRange = res.LockedRange
select {
case s.regionCh.In() <- sri:
s.regionChSizeGauge.Inc()
Expand All @@ -513,12 +513,11 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
zap.String("changefeed", s.changefeed.ID),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", &sri.span),
zap.Uint64("resolvedTs", sri.resolvedTs),
zap.Any("retrySpans", res.RetryRanges))
for _, r := range res.RetryRanges {
// This call is always blocking, otherwise if scheduling in a new
// goroutine, it won't block the caller of `schedulerRegionRequest`.
s.scheduleDivideRegionAndRequest(ctx, r, sri.resolvedTs)
s.scheduleDivideRegionAndRequest(ctx, r)
}
case regionlock.LockRangeStatusCancel:
return
Expand All @@ -529,11 +528,12 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single

res := s.rangeLock.LockRange(
ctx, sri.span.StartKey, sri.span.EndKey, sri.verID.GetID(), sri.verID.GetVer())

failpoint.Inject("kvClientMockRangeLock", func(val failpoint.Value) {
// short sleep to wait region has split
time.Sleep(time.Second)
s.rangeLock.UnlockRange(sri.span.StartKey, sri.span.EndKey,
sri.verID.GetID(), sri.verID.GetVer(), sri.resolvedTs)
sri.verID.GetID(), sri.verID.GetVer())
regionNum := val.(int)
retryRanges := make([]tablepb.Span, 0, regionNum)
start := []byte("a")
Expand Down Expand Up @@ -561,7 +561,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
// CAUTION: Note that this should only be called in a context that the region has locked its range.
func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo) {
s.rangeLock.UnlockRange(errorInfo.span.StartKey, errorInfo.span.EndKey,
errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs)
errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs())
log.Info("region failed", zap.Stringer("span", &errorInfo.span),
zap.Any("regionId", errorInfo.verID.GetID()),
zap.Error(errorInfo.err))
Expand Down Expand Up @@ -611,7 +611,7 @@ func (s *eventFeedSession) requestRegionToStore(
RegionId: regionID,
RequestId: requestID,
RegionEpoch: regionEpoch,
CheckpointTs: sri.resolvedTs,
CheckpointTs: sri.resolvedTs(),
StartKey: sri.span.StartKey,
EndKey: sri.span.EndKey,
ExtraOp: extraOp,
Expand Down Expand Up @@ -689,13 +689,13 @@ func (s *eventFeedSession) requestRegionToStore(
state := newRegionFeedState(sri, requestID)
pendingRegions.setByRequestID(requestID, state)

log.Debug("start new request",
log.Info("start new request",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("addr", storeAddr),
zap.Any("request", req))
zap.Uint64("regionID", sri.verID.GetID()),
zap.String("addr", storeAddr))

err = stream.client.Send(req)

Expand Down Expand Up @@ -782,7 +782,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error {
Region: sri.verID.GetID(),
},
},
ResolvedTs: sri.resolvedTs,
ResolvedTs: sri.resolvedTs(),
},
}
select {
Expand All @@ -804,7 +804,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error {
zap.String("tableName", s.tableName),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", &sri.span),
zap.Uint64("resolvedTs", sri.resolvedTs))
zap.Uint64("resolvedTs", sri.resolvedTs()))
errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID})
s.onRegionFail(ctx, errInfo)
continue
Expand All @@ -818,7 +818,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error {
// to region boundaries. When region merging happens, it's possible that it
// will produce some overlapping spans.
func (s *eventFeedSession) divideAndSendEventFeedToRegions(
ctx context.Context, span tablepb.Span, ts uint64,
ctx context.Context, span tablepb.Span,
) error {
limit := 20
nextSpan := span
Expand Down Expand Up @@ -870,7 +870,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
// the End key return by the PD API will be nil to represent the biggest key,
partialSpan = spanz.HackSpan(partialSpan)

sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, ts, nil)
sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, nil)
s.scheduleRegionRequest(ctx, sri)
// return if no more regions
if spanz.EndCompare(nextSpan.StartKey, span.EndKey) >= 0 {
Expand All @@ -889,17 +889,24 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
switch eerr := errors.Cause(err).(type) {
case *eventError:
innerErr := eerr.err
log.Info("cdc region error",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Stringer("error", innerErr))

if notLeader := innerErr.GetNotLeader(); notLeader != nil {
metricFeedNotLeaderCounter.Inc()
s.client.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx)
} else if innerErr.GetEpochNotMatch() != nil {
// TODO: If only confver is updated, we don't need to reload the region from region cache.
metricFeedEpochNotMatchCounter.Inc()
s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs)
s.scheduleDivideRegionAndRequest(ctx, errInfo.span)
return nil
} else if innerErr.GetRegionNotFound() != nil {
metricFeedRegionNotFoundCounter.Inc()
s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs)
s.scheduleDivideRegionAndRequest(ctx, errInfo.span)
return nil
} else if duplicatedRequest := innerErr.GetDuplicateRequest(); duplicatedRequest != nil {
metricFeedDuplicateRequestCounter.Inc()
Expand Down Expand Up @@ -929,7 +936,7 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
}
case *rpcCtxUnavailableErr:
metricFeedRPCCtxUnavailable.Inc()
s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs)
s.scheduleDivideRegionAndRequest(ctx, errInfo.span)
return nil
case *connectToStoreErr:
metricConnectToStoreErr.Inc()
Expand Down Expand Up @@ -1206,7 +1213,7 @@ func (s *eventFeedSession) sendRegionChangeEvents(
}
state.start()
worker.setRegionState(event.RegionId, state)
} else if state.isStopped() {
} else if state.isStale() {
log.Warn("drop event due to region feed stopped",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
Expand All @@ -1216,6 +1223,17 @@ func (s *eventFeedSession) sendRegionChangeEvents(
continue
}

switch x := event.Event.(type) {
case *cdcpb.Event_Error:
log.Info("event feed receives a region error",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("regionID", event.RegionId),
zap.Any("error", x.Error))
}

slot := worker.inputCalcSlot(event.RegionId)
statefulEvents[slot] = append(statefulEvents[slot], &regionStatefulEvent{
changeEvent: event,
Expand Down Expand Up @@ -1308,6 +1326,54 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can
return
}

func (s *eventFeedSession) logSlowRegions(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}

currTime, err := s.client.pdClock.CurrentTime()
if err != nil {
continue
}
attr := s.rangeLock.CollectLockedRangeAttrs(nil)
if attr.SlowestRegion.Initialized {
ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs)
if currTime.Sub(ckptTime) > 20*time.Second {
log.Info("event feed finds a slow region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("slowRegion", attr.SlowestRegion))
}
} else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute {
log.Info("event feed initializes a region too slow",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("slowRegion", attr.SlowestRegion))
}
if len(attr.Holes) > 0 {
holes := make([]string, 0, len(attr.Holes))
for _, hole := range attr.Holes {
holes = append(holes, fmt.Sprintf("[%s,%s)", hole.StartKey, hole.EndKey))
}
log.Info("event feed holes exist",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("holes", strings.Join(holes, ", ")))
}
}
}

func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row) (model.RegionFeedEvent, error) {
var opType model.OpType
switch entry.GetOpType() {
Expand Down
Loading
Loading