diff --git a/cdc/kv/client.go b/cdc/kv/client.go index e656570ced2..9c169817cc3 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -69,6 +69,8 @@ const ( // failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we // don't need to force reload region anymore. regionScheduleReload = false + + scanRegionsConcurrency = 1024 ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -424,6 +426,8 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { }) g.Go(func() error { + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(scanRegionsConcurrency) for { select { case <-ctx.Done(): diff --git a/cdc/processor/sinkmanager/tasks.go b/cdc/processor/sinkmanager/tasks.go index c6b6ea8757b..a736b6fca50 100644 --- a/cdc/processor/sinkmanager/tasks.go +++ b/cdc/processor/sinkmanager/tasks.go @@ -33,8 +33,8 @@ var ( maxUpdateIntervalSize = defaultMaxUpdateIntervalSize // Sink manager schedules table tasks based on lag. Limit the max task range - // can be helpful to reduce changefeed latency. - maxTaskRange = 5 * time.Second + // can be helpful to reduce changefeed latency for large initial data. + maxTaskRange = 30 * time.Minute ) // Used to record the progress of the table.