Skip to content

Commit

Permalink
kvclient(cdc): remove some noicy logs in the module (#9811)
Browse files Browse the repository at this point in the history
ref #9660
  • Loading branch information
hicqu authored Sep 27, 2023
1 parent c601295 commit ca69c33
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 70 deletions.
13 changes: 5 additions & 8 deletions cdc/kv/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/tiflow/cdc/kv/regionlock"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand Down Expand Up @@ -59,10 +58,9 @@ func (s singleRegionInfo) resolvedTs() uint64 {
}

type regionFeedState struct {
sri singleRegionInfo
requestID uint64
matcher *matcher
startFeedTime time.Time
sri singleRegionInfo
requestID uint64
matcher *matcher

// Transform: normal -> stopped -> removed.
// normal: the region is in replicating.
Expand All @@ -87,7 +85,6 @@ func newRegionFeedState(sri singleRegionInfo, requestID uint64) *regionFeedState
}

func (s *regionFeedState) start() {
s.startFeedTime = time.Now()
s.matcher = newMatcher()
}

Expand Down Expand Up @@ -157,8 +154,8 @@ func (s *regionFeedState) getRegionInfo() singleRegionInfo {
return s.sri
}

func (s *regionFeedState) getRegionMeta() (uint64, tablepb.Span, time.Time, string) {
return s.sri.verID.GetID(), s.sri.span, s.startFeedTime, s.sri.rpcCtx.Addr
func (s *regionFeedState) getRegionMeta() (uint64, tablepb.Span, string) {
return s.sri.verID.GetID(), s.sri.span, s.sri.rpcCtx.Addr
}

type syncRegionFeedStateMap struct {
Expand Down
13 changes: 2 additions & 11 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,18 +626,17 @@ func (w *regionWorker) handleEventEntry(
return false
}
}
return handleEventEntry(w.session.client.changefeed, x, w.session.startTs, state, w.metrics, emit)
return handleEventEntry(x, w.session.startTs, state, w.metrics, emit)
}

func handleEventEntry(
changefeed model.ChangeFeedID,
x *cdcpb.Event_Entries_,
startTs uint64,
state *regionFeedState,
metrics *regionWorkerMetrics,
emit func(assembled model.RegionFeedEvent) bool,
) error {
regionID, regionSpan, startTime, _ := state.getRegionMeta()
regionID, regionSpan, _ := state.getRegionMeta()
for _, entry := range x.Entries.GetEntries() {
// NOTE: from TiKV 7.0.0, entries are already filtered out in TiKV side.
// We can remove the check in future.
Expand All @@ -649,14 +648,6 @@ func handleEventEntry(
}
switch entry.Type {
case cdcpb.Event_INITIALIZED:
if time.Since(startTime) > 20*time.Second {
log.Warn("The time cost of initializing is too much",
zap.String("namespace", changefeed.Namespace),
zap.String("changefeed", changefeed.ID),
zap.Uint64("regionID", regionID),
zap.Duration("duration", time.Since(startTime)))
}

metrics.metricPullEventInitializedCounter.Inc()
state.setInitialized()
for _, cachedEvent := range state.matcher.matchCachedRow(true) {
Expand Down
32 changes: 14 additions & 18 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ func (s *SharedClient) setTableStopped(rt *requestedTable) {
log.Info("event feed starts to stop table",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID),
zap.String("span", rt.span.String()))
zap.Any("subscriptionID", rt.subscriptionID))

// Set stopped to true so we can stop handling region events from the table.
// Then send a special singleRegionInfo to regionRouter to deregister the table
Expand All @@ -299,8 +298,7 @@ func (s *SharedClient) onTableDrained(rt *requestedTable) {
log.Info("event feed stop table is finished",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID),
zap.String("span", rt.span.String()))
zap.Any("subscriptionID", rt.subscriptionID))

s.totalSpans.Lock()
defer s.totalSpans.Unlock()
Expand Down Expand Up @@ -353,22 +351,14 @@ func (s *SharedClient) requestRegionToStore(ctx context.Context, g *errgroup.Gro
// If lockedRange is nil it means it's a special task from stopping the table.
if sri.lockedRange == nil {
for _, rs := range s.requestedStores {
rs.broadcastRequest(sri)
s.broadcastRequest(rs, sri)
}
continue
}

storeID := sri.rpcCtx.Peer.StoreId
storeAddr := sri.rpcCtx.Addr
log.Debug("event feed will request a region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("table", sri.requestedTable.span),
zap.Any("subscriptionID", sri.requestedTable.subscriptionID),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Uint64("storeID", storeID),
zap.String("addr", storeAddr))
s.requestStore(ctx, g, storeID, storeAddr).appendRequest(sri)
s.appendRequest(s.requestStore(ctx, g, storeID, storeAddr), sri)
}
}

Expand Down Expand Up @@ -414,12 +404,20 @@ func (s *SharedClient) createRegionRequest(sri singleRegionInfo) *cdcpb.ChangeDa
}
}

func (r *requestedStore) appendRequest(sri singleRegionInfo) {
func (s *SharedClient) appendRequest(r *requestedStore, sri singleRegionInfo) {
offset := r.nextStream.Add(1) % uint32(len(r.streams))
log.Debug("event feed will request a region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Uint64("streamID", r.streams[offset].streamID),
zap.Any("subscriptionID", sri.requestedTable.subscriptionID),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Uint64("storeID", r.storeID),
zap.String("addr", r.storeAddr))
r.streams[offset].requests.In() <- sri
}

func (r *requestedStore) broadcastRequest(sri singleRegionInfo) {
func (s *SharedClient) broadcastRequest(r *requestedStore, sri singleRegionInfo) {
for _, stream := range r.streams {
stream.requests.In() <- sri
}
Expand Down Expand Up @@ -632,7 +630,6 @@ func (s *SharedClient) handleError(ctx context.Context, errInfo regionErrorInfo)
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", errInfo.requestedTable.subscriptionID),
zap.String("span", errInfo.requestedTable.span.String()),
zap.Error(err))
return err
}
Expand Down Expand Up @@ -741,7 +738,6 @@ func (r *requestedTable) updateStaleLocks(s *SharedClient, maxVersion uint64) {
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", r.subscriptionID),
zap.String("span", r.span.String()),
zap.Any("ranges", res))
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/shared_region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (w *sharedRegionWorker) handleEventEntry(ctx context.Context, x *cdcpb.Even
zap.String("changefeed", w.changefeed.ID),
zap.Any("subscriptionID", state.sri.requestedTable.subscriptionID),
zap.Int("rows", len(x.Entries.GetEntries())))
return handleEventEntry(w.changefeed, x, startTs, state, w.metrics, emit)
return handleEventEntry(x, startTs, state, w.metrics, emit)
}

func (w *sharedRegionWorker) handleResolvedTs(ctx context.Context, batch resolvedTsBatch) {
Expand Down
53 changes: 21 additions & 32 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
log.Panic("preFetchForConnecting should be nil",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", stream.streamID),
zap.Uint64("storeID", r.storeID),
zap.String("addr", r.storeAddr),
zap.Uint64("streamID", stream.streamID))
zap.String("addr", r.storeAddr))
}
for {
select {
Expand Down Expand Up @@ -129,27 +129,27 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste
log.Info("event feed check store version fails",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Uint64("streamID", s.streamID),
zap.Error(err))
return isCanceled()
}

log.Info("event feed going to create grpc stream",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Uint64("streamID", s.streamID))
zap.String("addr", rs.storeAddr))

defer func() {
log.Info("event feed grpc stream exits",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Uint64("streamID", s.streamID),
zap.Bool("canceled", canceled))
if s.multiplexing != nil {
s.multiplexing = nil
Expand All @@ -164,9 +164,9 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste
log.Warn("event feed create grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Uint64("streamID", s.streamID),
zap.Error(err))
return isCanceled()
}
Expand All @@ -179,9 +179,9 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste
log.Info("event feed stream multiplexing is not supported, will fallback",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Uint64("streamID", s.streamID))
zap.String("addr", rs.storeAddr))
cc.Release()

s.tableExclusives = make(chan tableExclusive, 8)
Expand Down Expand Up @@ -217,9 +217,9 @@ func (s *requestedStream) receive(
log.Info("event feed receive from grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Uint64("streamID", s.streamID),
zap.String("code", grpcstatus.Code(err).String()),
zap.Error(err))
if sharedconn.StatusIsEOF(grpcstatus.Convert(err)) {
Expand All @@ -242,35 +242,35 @@ func (s *requestedStream) receive(
}

func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *requestedStore) (err error) {
doSend := func(cc *sharedconn.ConnAndClient, req *cdcpb.ChangeDataRequest) error {
doSend := func(cc *sharedconn.ConnAndClient, req *cdcpb.ChangeDataRequest, subscriptionID SubscriptionID) error {
if err := cc.Client().Send(req); err != nil {
log.Warn("event feed send request to grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Any("subscriptionID", SubscriptionID(req.RequestId)),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", req.RegionId),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Uint64("streamID", s.streamID),
zap.Error(err))
return errors.Trace(err)
}
log.Debug("event feed send request to grpc stream success",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Any("subscriptionID", SubscriptionID(req.RequestId)),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", req.RegionId),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Uint64("streamID", s.streamID))
zap.String("addr", rs.storeAddr))
return nil
}

fetchMoreReq := func() (singleRegionInfo, error) {
waitReqTicker := time.NewTicker(60 * time.Second)
defer waitReqTicker.Stop()
var sri singleRegionInfo
for {
var sri singleRegionInfo
select {
case <-ctx.Done():
return sri, ctx.Err()
Expand Down Expand Up @@ -320,19 +320,19 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request
log.Debug("event feed gets a singleRegionInfo",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Uint64("streamID", s.streamID))
zap.String("addr", rs.storeAddr))
// It means it's a special task for stopping the table.
if sri.lockedRange == nil {
if s.multiplexing != nil {
req := &cdcpb.ChangeDataRequest{
RequestId: uint64(subscriptionID),
Request: &cdcpb.ChangeDataRequest_Deregister_{},
}
if err = doSend(s.multiplexing, req); err != nil {
if err = doSend(s.multiplexing, req, subscriptionID); err != nil {
return err
}
} else if cc := tableExclusives[subscriptionID]; cc != nil {
Expand Down Expand Up @@ -373,7 +373,7 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request
} else if cc, err = getTableExclusiveConn(subscriptionID); err != nil {
return err
}
if err = doSend(cc, c.createRegionRequest(sri)); err != nil {
if err = doSend(cc, c.createRegionRequest(sri), subscriptionID); err != nil {
return err
}
}
Expand Down Expand Up @@ -465,10 +465,6 @@ func (s *requestedStream) sendRegionChangeEvents(
subscriptionID = tableSubID
}
if state := s.getState(subscriptionID, regionID); state != nil {
log.Debug("event feed get an Event",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Any("subscriptionID", subscriptionID))
sfEvent := newEventItem(event, state, s)
slot := hashRegionID(regionID, len(c.workers))
if err := c.workers[slot].sendEvent(ctx, sfEvent); err != nil {
Expand All @@ -490,13 +486,6 @@ func (s *requestedStream) sendResolvedTs(
subscriptionID = tableSubID
}
sfEvents := make([]statefulEvent, len(c.workers))
log.Debug("event feed get a ResolvedTs",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("ResolvedTs", resolvedTs.Ts),
zap.Int("regionCount", len(resolvedTs.Regions)))

for _, regionID := range resolvedTs.Regions {
slot := hashRegionID(regionID, len(c.workers))
if sfEvents[slot].stream == nil {
Expand Down

0 comments on commit ca69c33

Please sign in to comment.