diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 0ba2f18c28a..9c169817cc3 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -69,6 +69,8 @@ const ( // failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we // don't need to force reload region anymore. regionScheduleReload = false + + scanRegionsConcurrency = 1024 ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -424,6 +426,8 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { }) g.Go(func() error { + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(scanRegionsConcurrency) for { select { case <-ctx.Done(): @@ -997,8 +1001,7 @@ func (s *eventFeedSession) receiveFromStream( // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic - worker := newRegionWorker(s.changefeed, s, addr) - + worker := newRegionWorker(s.changefeed, s, addr, pendingRegions) defer worker.evictAllRegions() g.Go(func() error { diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 0f1e5eb63b9..08fd2a3c956 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -112,10 +112,13 @@ type regionWorker struct { // how many pending input events inputPending int32 + + pendingRegions *syncRegionFeedStateMap } func newRegionWorker( changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, + pendingRegions *syncRegionFeedStateMap, ) *regionWorker { metrics := ®ionWorkerMetrics{} metrics.metricReceivedEventSize = eventSize.WithLabelValues("received") @@ -149,6 +152,8 @@ func newRegionWorker( concurrency: s.client.config.WorkerConcurrent, metrics: metrics, inputPending: 0, + + pendingRegions: pendingRegions, } } @@ -184,7 +189,7 @@ func (w *regionWorker) checkShouldExit() error { empty := w.checkRegionStateEmpty() // If there is no region maintained by this region worker, exit it and // cancel the gRPC stream. - if empty { + if empty && w.pendingRegions.len() == 0 { w.cancelStream(time.Duration(0)) return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index fef317d4c0d..c8fcebe4a18 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -154,7 +154,7 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { regionspan.ToComparableSpan(span), 0, &tikv.RPCContext{}), 0) state.start() - worker := newRegionWorker(model.ChangeFeedID{}, s, "") + worker := newRegionWorker(model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) require.Equal(t, 2, cap(worker.outputCh)) // Receive prewrite2 with empty value. @@ -313,7 +313,7 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { rpcCtx: &tikv.RPCContext{}, }, 0) s1.start() - w := newRegionWorker(model.ChangeFeedID{}, s, "") + w := newRegionWorker(model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 5, diff --git a/cdc/processor/sinkmanager/tasks.go b/cdc/processor/sinkmanager/tasks.go index c6b6ea8757b..a736b6fca50 100644 --- a/cdc/processor/sinkmanager/tasks.go +++ b/cdc/processor/sinkmanager/tasks.go @@ -33,8 +33,8 @@ var ( maxUpdateIntervalSize = defaultMaxUpdateIntervalSize // Sink manager schedules table tasks based on lag. Limit the max task range - // can be helpful to reduce changefeed latency. - maxTaskRange = 5 * time.Second + // can be helpful to reduce changefeed latency for large initial data. + maxTaskRange = 30 * time.Minute ) // Used to record the progress of the table.