Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu committed Dec 12, 2023
1 parent bd1562a commit 253be08
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 23 deletions.
7 changes: 3 additions & 4 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,8 +860,7 @@ var (
)

const (
loadRegionRetryInterval time.Duration = 100 * time.Millisecond
resolveLockMinInterval time.Duration = 10 * time.Second
serverIsBusyBackoffInterval time.Duration = 200 * time.Millisecond
invalidSubscriptionID SubscriptionID = SubscriptionID(0)
loadRegionRetryInterval time.Duration = 100 * time.Millisecond
resolveLockMinInterval time.Duration = 10 * time.Second
invalidSubscriptionID SubscriptionID = SubscriptionID(0)
)
19 changes: 0 additions & 19 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ type requestedStream struct {

// tableExclusives means one GRPC stream is exclusive by one table.
tableExclusives chan tableExclusive

// Time of the last `ServerIsBusy` error is reported.
lastKvIsBusy atomic.Int64
}

type tableExclusive struct {
Expand Down Expand Up @@ -254,13 +251,6 @@ func (s *requestedStream) receive(

func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *requestedStore) (err error) {
doSend := func(cc *sharedconn.ConnAndClient, req *cdcpb.ChangeDataRequest, subscriptionID SubscriptionID) error {
lastKvIsBusy := time.Since(time.UnixMilli(s.lastKvIsBusy.Load()))
if lastKvIsBusy < serverIsBusyBackoffInterval {
if err := util.Hang(ctx, serverIsBusyBackoffInterval - lastKvIsBusy); err != nil {
return err
}
}

if err := cc.Client().Send(req); err != nil {
log.Warn("event feed send request to grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
Expand Down Expand Up @@ -497,15 +487,6 @@ func (s *requestedStream) sendRegionChangeEvents(
zap.Uint64("regionID", event.RegionId),
zap.Bool("stateIsNil", state == nil),
zap.Any("error", x.Error))
if x.Error.GetServerIsBusy() != nil {
for {
prev := s.lastKvIsBusy.Load()
next := time.Now().UnixMilli()
if prev >= next || s.lastKvIsBusy.CompareAndSwap(prev, next) {
break
}
}
}
}

if state != nil {
Expand Down

0 comments on commit 253be08

Please sign in to comment.