Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#9981
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
hicqu authored and ti-chi-bot committed Nov 1, 2023
1 parent 6eaec3d commit a0e7626
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 12 deletions.
81 changes: 72 additions & 9 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/pingcap/tiflow/pkg/version"
"github.com/prometheus/client_golang/prometheus"
tidbkv "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -423,13 +425,11 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
return s.dispatchRequest(ctx)
})
g.Go(func() error { return s.dispatchRequest(ctx) })

g.Go(func() error {
return s.requestRegionToStore(ctx, g)
})
g.Go(func() error { return s.requestRegionToStore(ctx, g) })

g.Go(func() error { return s.logSlowRegions(ctx) })

g.Go(func() error {
for {
Expand Down Expand Up @@ -689,13 +689,13 @@ func (s *eventFeedSession) requestRegionToStore(
state := newRegionFeedState(sri, requestID)
pendingRegions.setByRequestID(requestID, state)

log.Debug("start new request",
log.Info("start new request",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("addr", storeAddr),
zap.Any("request", req))
zap.Uint64("regionID", sri.verID.GetID()),
zap.String("addr", storeAddr))

err = stream.client.Send(req)

Expand Down Expand Up @@ -889,6 +889,13 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
switch eerr := errors.Cause(err).(type) {
case *eventError:
innerErr := eerr.err
log.Info("cdc region error",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Stringer("error", innerErr))

if notLeader := innerErr.GetNotLeader(); notLeader != nil {
metricFeedNotLeaderCounter.Inc()
s.client.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx)
Expand Down Expand Up @@ -1216,6 +1223,17 @@ func (s *eventFeedSession) sendRegionChangeEvents(
continue
}

switch x := event.Event.(type) {
case *cdcpb.Event_Error:
log.Info("event feed receives a region error",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("regionID", event.RegionId),
zap.Any("error", x.Error))
}

slot := worker.inputCalcSlot(event.RegionId)
statefulEvents[slot] = append(statefulEvents[slot], &regionStatefulEvent{
changeEvent: event,
Expand Down Expand Up @@ -1308,6 +1326,51 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can
return
}

func (s *eventFeedSession) logSlowRegions(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}

currTime := s.client.pdClock.CurrentTime()
attr := s.rangeLock.CollectLockedRangeAttrs(nil)
if attr.SlowestRegion.Initialized {
ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs)
if currTime.Sub(ckptTime) > 2*resolveLockMinInterval {
log.Info("event feed finds a slow region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("slowRegion", attr.SlowestRegion))
}
} else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute {
log.Info("event feed initializes a region too slow",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("slowRegion", attr.SlowestRegion))
}
if len(attr.Holes) > 0 {
holes := make([]string, 0, len(attr.Holes))
for _, hole := range attr.Holes {
holes = append(holes, fmt.Sprintf("[%s,%s)", hole.StartKey, hole.EndKey))
}
log.Info("event feed holes exist",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("holes", strings.Join(holes, ", ")))
}
}
}

func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row) (model.RegionFeedEvent, error) {
var opType model.OpType
switch entry.GetOpType() {
Expand Down
9 changes: 9 additions & 0 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,16 +195,25 @@ func (w *regionWorker) checkShouldExit() error {
func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState) error {
state.setRegionInfoResolvedTs()
regionID := state.getRegionID()
isStale := state.isStale()
log.Info("single region event feed disconnected",
zap.String("namespace", w.session.client.changefeed.Namespace),
zap.String("changefeed", w.session.client.changefeed.ID),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", state.requestID),
zap.Stringer("span", &state.sri.span),
<<<<<<< HEAD
zap.Uint64("resolvedTs", state.sri.resolvedTs),
zap.Error(err))
// if state is already marked stopped, it must have been or would be processed by `onRegionFail`
if state.isStopped() {
=======
zap.Uint64("resolvedTs", state.sri.resolvedTs()),
zap.Bool("isStale", isStale),
zap.Error(err))
// if state is already marked stopped, it must have been or would be processed by `onRegionFail`
if isStale {
>>>>>>> 69c180af98 (kv-client(cdc): add more logs to help debug slow regions (#9981))
return w.checkShouldExit()
}
// We need to ensure when the error is handled, `isStopped` must be set. So set it before sending the error.
Expand Down
25 changes: 22 additions & 3 deletions pkg/txnutil/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,32 @@ func NewLockerResolver(

const scanLockLimit = 1024

func (r *resolver) Resolve(ctx context.Context, regionID uint64, maxVersion uint64) error {
// TODO test whether this function will kill active transaction
func (r *resolver) Resolve(ctx context.Context, regionID uint64, maxVersion uint64) (err error) {
var lockCount int = 0

log.Info("resolve lock starts",
zap.Uint64("regionID", regionID),
zap.Uint64("maxVersion", maxVersion),
zap.String("namespace", r.changefeed.Namespace),
zap.String("changefeed", r.changefeed.ID))

defer func() {
log.Info("resolve lock finishes",
zap.Uint64("regionID", regionID),
zap.Int("lockCount", lockCount),
zap.Uint64("maxVersion", maxVersion),
zap.String("namespace", r.changefeed.Namespace),
zap.String("changefeed", r.changefeed.ID),
zap.Error(err))
}()

// TODO test whether this function will kill active transaction
req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{
MaxVersion: maxVersion,
Limit: scanLockLimit,
})

bo := tikv.NewGcResolveLockMaxBackoffer(ctx)
var lockCount int
var loc *tikv.KeyLocation
var key []byte
flushRegion := func() error {
Expand Down Expand Up @@ -131,12 +147,15 @@ func (r *resolver) Resolve(ctx context.Context, regionID uint64, maxVersion uint
}
bo = tikv.NewGcResolveLockMaxBackoffer(ctx)
}
<<<<<<< HEAD
log.Info("resolve lock successfully",
zap.Uint64("regionID", regionID),
zap.Int("lockCount", lockCount),
zap.Uint64("maxVersion", maxVersion),
zap.String("namespace", r.changefeed.Namespace),
zap.String("changefeed", r.changefeed.ID),
zap.Any("role", r.role))
=======
>>>>>>> 69c180af98 (kv-client(cdc): add more logs to help debug slow regions (#9981))
return nil
}

0 comments on commit a0e7626

Please sign in to comment.