From a3c5950c30786a0c4d0d44bd30155117cf55f1cd Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 7 May 2024 11:32:37 +0800 Subject: [PATCH] *: split into multiple runner for heartbeat (#8130) ref tikv/pd#7897 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/core/context.go | 3 +- pkg/core/region.go | 37 ++------- pkg/mcs/scheduling/server/cluster.go | 40 +++++---- pkg/ratelimit/runner.go | 118 ++++++++++++++++----------- pkg/ratelimit/runner_test.go | 36 ++++---- server/cluster/cluster.go | 45 +++++----- server/cluster/cluster_worker.go | 13 +-- 7 files changed, 151 insertions(+), 141 deletions(-) diff --git a/pkg/core/context.go b/pkg/core/context.go index ab149378b1d..a0f51e55680 100644 --- a/pkg/core/context.go +++ b/pkg/core/context.go @@ -25,7 +25,7 @@ type MetaProcessContext struct { context.Context Tracer RegionHeartbeatProcessTracer TaskRunner ratelimit.Runner - Limiter *ratelimit.ConcurrencyLimiter + LogRunner ratelimit.Runner } // NewMetaProcessContext creates a new MetaProcessContext. @@ -35,6 +35,7 @@ func ContextTODO() *MetaProcessContext { Context: context.TODO(), Tracer: NewNoopHeartbeatProcessTracer(), TaskRunner: ratelimit.NewSyncRunner(), + LogRunner: ratelimit.NewSyncRunner(), // Limit default is nil } } diff --git a/pkg/core/region.go b/pkg/core/region.go index b3a9e24428c..be8f392f05e 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -744,33 +744,26 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) { - taskRunner := ctx.TaskRunner - limiter := ctx.Limiter + logRunner := ctx.LogRunner // print log asynchronously debug, info := d, i - if taskRunner != nil { + if logRunner != nil { debug = func(msg string, fields ...zap.Field) { - taskRunner.RunTask( + logRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "Log", - Limit: limiter, - }, func(_ context.Context) { d(msg, fields...) }, + ratelimit.WithTaskName("DebugLog"), ) } info = func(msg string, fields ...zap.Field) { - taskRunner.RunTask( + logRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "Log", - Limit: limiter, - }, func(_ context.Context) { i(msg, fields...) }, + ratelimit.WithTaskName("InfoLog"), ) } } @@ -873,24 +866,6 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } } -// RegionHeartbeatStageName is the name of the stage of the region heartbeat. -const ( - HandleStatsAsync = "HandleStatsAsync" - ObserveRegionStatsAsync = "ObserveRegionStatsAsync" - UpdateSubTree = "UpdateSubTree" - HandleOverlaps = "HandleOverlaps" - CollectRegionStatsAsync = "CollectRegionStatsAsync" - SaveRegionToKV = "SaveRegionToKV" -) - -// ExtraTaskOpts returns the task options for the task. -func ExtraTaskOpts(ctx *MetaProcessContext, name string) ratelimit.TaskOpts { - return ratelimit.TaskOpts{ - TaskName: name, - Limit: ctx.Limiter, - } -} - // RWLockStats is a read-write lock with statistics. type RWLockStats struct { syncutil.RWMutex diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 6e6df8e3775..2a5302b34dc 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -54,8 +54,8 @@ type Cluster struct { clusterID uint64 running atomic.Bool - taskRunner ratelimit.Runner - hbConcurrencyLimiter *ratelimit.ConcurrencyLimiter + heartbeatRunnner ratelimit.Runner + logRunner ratelimit.Runner } const ( @@ -64,7 +64,8 @@ const ( collectWaitTime = time.Minute // heartbeat relative const - hbConcurrentRunner = "heartbeat-concurrent-task-runner" + heartbeatTaskRunner = "heartbeat-task-runner" + logTaskRunner = "log-task-runner" ) var syncRunner = ratelimit.NewSyncRunner() @@ -92,8 +93,8 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, clusterID: clusterID, checkMembershipCh: checkMembershipCh, - taskRunner: ratelimit.NewConcurrentRunner(hbConcurrentRunner, time.Minute), - hbConcurrencyLimiter: ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU() * 2)), + heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) @@ -530,7 +531,8 @@ func (c *Cluster) StartBackgroundJobs() { go c.runUpdateStoreStats() go c.runCoordinator() go c.runMetricsCollectionJob() - c.taskRunner.Start() + c.heartbeatRunnner.Start() + c.logRunner.Start() c.running.Store(true) } @@ -541,7 +543,8 @@ func (c *Cluster) StopBackgroundJobs() { } c.running.Store(false) c.coordinator.Stop() - c.taskRunner.Stop() + c.heartbeatRunnner.Stop() + c.logRunner.Stop() c.cancel() c.wg.Wait() } @@ -557,16 +560,17 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { tracer = core.NewHeartbeatProcessTracer() } - var runner ratelimit.Runner - runner = syncRunner + var taskRunner, logRunner ratelimit.Runner + taskRunner, logRunner = syncRunner, syncRunner if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner { - runner = c.taskRunner + taskRunner = c.heartbeatRunnner + logRunner = c.logRunner } ctx := &core.MetaProcessContext{ Context: c.ctx, - Limiter: c.hbConcurrencyLimiter, Tracer: tracer, - TaskRunner: runner, + TaskRunner: taskRunner, + LogRunner: logRunner, } tracer.Begin() if err := c.processRegionHeartbeat(ctx, region); err != nil { @@ -590,10 +594,10 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c ctx.TaskRunner.RunTask( ctx, - core.ExtraTaskOpts(ctx, core.HandleStatsAsync), func(_ context.Context) { cluster.HandleStatsAsync(c, region) }, + ratelimit.WithTaskName(ratelimit.HandleStatsAsync), ) tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil @@ -607,22 +611,22 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( ctx, - core.ExtraTaskOpts(ctx, core.ObserveRegionStatsAsync), func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) } }, + ratelimit.WithTaskName(ratelimit.ObserveRegionStatsAsync), ) } // region is not updated to the subtree. if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( ctx, - core.ExtraTaskOpts(ctx, core.UpdateSubTree), func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithTaskName(ratelimit.UpdateSubTree), ) } return nil @@ -642,28 +646,28 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c } ctx.TaskRunner.RunTask( ctx, - core.ExtraTaskOpts(ctx, core.UpdateSubTree), func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithTaskName(ratelimit.UpdateSubTree), ) tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( ctx, - core.ExtraTaskOpts(ctx, core.HandleOverlaps), func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, + ratelimit.WithTaskName(ratelimit.HandleOverlaps), ) } tracer.OnSaveCacheFinished() // handle region stats ctx.TaskRunner.RunTask( ctx, - core.ExtraTaskOpts(ctx, core.CollectRegionStatsAsync), func(_ context.Context) { cluster.Collect(c, region, hasRegionStats) }, + ratelimit.WithTaskName(ratelimit.CollectRegionStatsAsync), ) tracer.OnCollectRegionStatsFinished() return nil diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index c4f2d5bc5ac..44ee54971f5 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -25,11 +25,21 @@ import ( "go.uber.org/zap" ) +// RegionHeartbeatStageName is the name of the stage of the region heartbeat. +const ( + HandleStatsAsync = "HandleStatsAsync" + ObserveRegionStatsAsync = "ObserveRegionStatsAsync" + UpdateSubTree = "UpdateSubTree" + HandleOverlaps = "HandleOverlaps" + CollectRegionStatsAsync = "CollectRegionStatsAsync" + SaveRegionToKV = "SaveRegionToKV" +) + const initialCapacity = 100 // Runner is the interface for running tasks. type Runner interface { - RunTask(ctx context.Context, opt TaskOpts, f func(context.Context)) error + RunTask(ctx context.Context, f func(context.Context), opts ...TaskOption) error Start() Stop() } @@ -37,7 +47,7 @@ type Runner interface { // Task is a task to be run. type Task struct { Ctx context.Context - Opts TaskOpts + Opts *TaskOpts f func(context.Context) submittedAt time.Time } @@ -48,6 +58,7 @@ var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded") // ConcurrentRunner is a simple task runner that limits the number of concurrent tasks. type ConcurrentRunner struct { name string + limiter *ConcurrencyLimiter maxPendingDuration time.Duration taskChan chan *Task pendingTasks []*Task @@ -59,9 +70,10 @@ type ConcurrentRunner struct { } // NewConcurrentRunner creates a new ConcurrentRunner. -func NewConcurrentRunner(name string, maxPendingDuration time.Duration) *ConcurrentRunner { +func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDuration time.Duration) *ConcurrentRunner { s := &ConcurrentRunner{ name: name, + limiter: limiter, maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), pendingTasks: make([]*Task, 0, initialCapacity), @@ -75,63 +87,70 @@ func NewConcurrentRunner(name string, maxPendingDuration time.Duration) *Concurr type TaskOpts struct { // TaskName is a human-readable name for the operation. TODO: metrics by name. TaskName string - Limit *ConcurrencyLimiter +} + +// TaskOption configures TaskOp +type TaskOption func(opts *TaskOpts) + +// WithTaskName specify the task name. +func WithTaskName(name string) TaskOption { + return func(opts *TaskOpts) { opts.TaskName = name } } // Start starts the runner. -func (s *ConcurrentRunner) Start() { - s.stopChan = make(chan struct{}) - s.wg.Add(1) +func (cr *ConcurrentRunner) Start() { + cr.stopChan = make(chan struct{}) + cr.wg.Add(1) ticker := time.NewTicker(5 * time.Second) go func() { - defer s.wg.Done() + defer cr.wg.Done() for { select { - case task := <-s.taskChan: - if task.Opts.Limit != nil { - token, err := task.Opts.Limit.Acquire(context.Background()) + case task := <-cr.taskChan: + if cr.limiter != nil { + token, err := cr.limiter.Acquire(context.Background()) if err != nil { continue } - go s.run(task.Ctx, task.f, token) + go cr.run(task.Ctx, task.f, token) } else { - go s.run(task.Ctx, task.f, nil) + go cr.run(task.Ctx, task.f, nil) } - case <-s.stopChan: - s.pendingMu.Lock() - s.pendingTasks = make([]*Task, 0, initialCapacity) - s.pendingMu.Unlock() - log.Info("stopping async task runner", zap.String("name", s.name)) + case <-cr.stopChan: + cr.pendingMu.Lock() + cr.pendingTasks = make([]*Task, 0, initialCapacity) + cr.pendingMu.Unlock() + log.Info("stopping async task runner", zap.String("name", cr.name)) return case <-ticker.C: maxDuration := time.Duration(0) - s.pendingMu.Lock() - if len(s.pendingTasks) > 0 { - maxDuration = time.Since(s.pendingTasks[0].submittedAt) + cr.pendingMu.Lock() + if len(cr.pendingTasks) > 0 { + maxDuration = time.Since(cr.pendingTasks[0].submittedAt) } - s.pendingMu.Unlock() - s.maxWaitingDuration.Set(maxDuration.Seconds()) + cr.pendingMu.Unlock() + cr.maxWaitingDuration.Set(maxDuration.Seconds()) } } }() } -func (s *ConcurrentRunner) run(ctx context.Context, task func(context.Context), token *TaskToken) { +func (cr *ConcurrentRunner) run(ctx context.Context, task func(context.Context), token *TaskToken) { task(ctx) if token != nil { token.Release() - s.processPendingTasks() + cr.processPendingTasks() } } -func (s *ConcurrentRunner) processPendingTasks() { - s.pendingMu.Lock() - defer s.pendingMu.Unlock() - for len(s.pendingTasks) > 0 { - task := s.pendingTasks[0] +func (cr *ConcurrentRunner) processPendingTasks() { + cr.pendingMu.Lock() + defer cr.pendingMu.Unlock() + for len(cr.pendingTasks) > 0 { + task := cr.pendingTasks[0] select { - case s.taskChan <- task: - s.pendingTasks = s.pendingTasks[1:] + case cr.taskChan <- task: + cr.pendingTasks = cr.pendingTasks[1:] return default: return @@ -140,33 +159,38 @@ func (s *ConcurrentRunner) processPendingTasks() { } // Stop stops the runner. -func (s *ConcurrentRunner) Stop() { - close(s.stopChan) - s.wg.Wait() +func (cr *ConcurrentRunner) Stop() { + close(cr.stopChan) + cr.wg.Wait() } // RunTask runs the task asynchronously. -func (s *ConcurrentRunner) RunTask(ctx context.Context, opt TaskOpts, f func(context.Context)) error { +func (cr *ConcurrentRunner) RunTask(ctx context.Context, f func(context.Context), opts ...TaskOption) error { + taskOpts := &TaskOpts{} + for _, opt := range opts { + opt(taskOpts) + } task := &Task{ Ctx: ctx, - Opts: opt, f: f, + Opts: taskOpts, } - s.processPendingTasks() + + cr.processPendingTasks() select { - case s.taskChan <- task: + case cr.taskChan <- task: default: - s.pendingMu.Lock() - defer s.pendingMu.Unlock() - if len(s.pendingTasks) > 0 { - maxWait := time.Since(s.pendingTasks[0].submittedAt) - if maxWait > s.maxPendingDuration { - s.failedTaskCount.Inc() + cr.pendingMu.Lock() + defer cr.pendingMu.Unlock() + if len(cr.pendingTasks) > 0 { + maxWait := time.Since(cr.pendingTasks[0].submittedAt) + if maxWait > cr.maxPendingDuration { + cr.failedTaskCount.Inc() return ErrMaxWaitingTasksExceeded } } task.submittedAt = time.Now() - s.pendingTasks = append(s.pendingTasks, task) + cr.pendingTasks = append(cr.pendingTasks, task) } return nil } @@ -180,7 +204,7 @@ func NewSyncRunner() *SyncRunner { } // RunTask runs the task synchronously. -func (*SyncRunner) RunTask(ctx context.Context, _ TaskOpts, f func(context.Context)) error { +func (*SyncRunner) RunTask(ctx context.Context, f func(context.Context), _ ...TaskOption) error { f(ctx) return nil } diff --git a/pkg/ratelimit/runner_test.go b/pkg/ratelimit/runner_test.go index 507f8cf4ee8..ccbf6ed59ed 100644 --- a/pkg/ratelimit/runner_test.go +++ b/pkg/ratelimit/runner_test.go @@ -25,8 +25,7 @@ import ( func TestConcurrentRunner(t *testing.T) { t.Run("RunTask", func(t *testing.T) { - limiter := NewConcurrencyLimiter(1) - runner := NewConcurrentRunner("test", time.Second) + runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Second) runner.Start() defer runner.Stop() @@ -34,33 +33,34 @@ func TestConcurrentRunner(t *testing.T) { for i := 0; i < 10; i++ { time.Sleep(50 * time.Millisecond) wg.Add(1) - err := runner.RunTask(context.Background(), TaskOpts{ - TaskName: "test1", - Limit: limiter, - }, func(context.Context) { - defer wg.Done() - time.Sleep(100 * time.Millisecond) - }) + err := runner.RunTask( + context.Background(), + func(context.Context) { + defer wg.Done() + time.Sleep(100 * time.Millisecond) + }, + WithTaskName("test1"), + ) require.NoError(t, err) } wg.Wait() }) t.Run("MaxPendingDuration", func(t *testing.T) { - limiter := NewConcurrencyLimiter(1) - runner := NewConcurrentRunner("test", 2*time.Millisecond) + runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), 2*time.Millisecond) runner.Start() defer runner.Stop() var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) - err := runner.RunTask(context.Background(), TaskOpts{ - TaskName: "test2", - Limit: limiter, - }, func(context.Context) { - defer wg.Done() - time.Sleep(100 * time.Millisecond) - }) + err := runner.RunTask( + context.Background(), + func(context.Context) { + defer wg.Done() + time.Sleep(100 * time.Millisecond) + }, + WithTaskName("test2"), + ) if err != nil { wg.Done() // task 0 running diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index ae3284e2694..8889fdf87b6 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -107,7 +107,8 @@ const ( minSnapshotDurationSec = 5 // heartbeat relative const - hbConcurrentRunner = "heartbeat-async-task-runner" + heartbeatTaskRunner = "heartbeat-async-task-runner" + logTaskRunner = "log-async-task-runner" ) // Server is the interface for cluster. @@ -172,8 +173,8 @@ type RaftCluster struct { independentServices sync.Map hbstreams *hbstream.HeartbeatStreams - taskRunner ratelimit.Runner - hbConcurrencyLimiter *ratelimit.ConcurrencyLimiter + heartbeatRunnner ratelimit.Runner + logRunner ratelimit.Runner } // Status saves some state information. @@ -190,15 +191,15 @@ type Status struct { func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, httpClient *http.Client) *RaftCluster { return &RaftCluster{ - serverCtx: ctx, - clusterID: clusterID, - regionSyncer: regionSyncer, - httpClient: httpClient, - etcdClient: etcdClient, - core: basicCluster, - storage: storage, - taskRunner: ratelimit.NewConcurrentRunner(hbConcurrentRunner, time.Minute), - hbConcurrencyLimiter: ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU() * 2)), + serverCtx: ctx, + clusterID: clusterID, + regionSyncer: regionSyncer, + httpClient: httpClient, + etcdClient: etcdClient, + core: basicCluster, + storage: storage, + heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } } @@ -356,7 +357,8 @@ func (c *RaftCluster) Start(s Server) error { go c.startGCTuner() c.running = true - c.taskRunner.Start() + c.heartbeatRunnner.Start() + c.logRunner.Start() return nil } @@ -750,7 +752,8 @@ func (c *RaftCluster) Stop() { if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { c.stopSchedulingJobs() } - c.taskRunner.Stop() + c.heartbeatRunnner.Stop() + c.logRunner.Stop() c.Unlock() c.wg.Wait() @@ -1015,10 +1018,10 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.TaskRunner.RunTask( ctx.Context, - core.ExtraTaskOpts(ctx, core.HandleStatsAsync), func(_ context.Context) { cluster.HandleStatsAsync(c, region) }, + ratelimit.WithTaskName(ratelimit.HandleStatsAsync), ) } tracer.OnAsyncHotStatsFinished() @@ -1036,22 +1039,22 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( ctx.Context, - core.ExtraTaskOpts(ctx, core.ObserveRegionStatsAsync), func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) } }, + ratelimit.WithTaskName(ratelimit.ObserveRegionStatsAsync), ) } // region is not updated to the subtree. if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( ctx, - core.ExtraTaskOpts(ctx, core.UpdateSubTree), func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithTaskName(ratelimit.UpdateSubTree), ) } return nil @@ -1075,20 +1078,20 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio } ctx.TaskRunner.RunTask( ctx, - core.ExtraTaskOpts(ctx, core.UpdateSubTree), func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithTaskName(ratelimit.UpdateSubTree), ) tracer.OnUpdateSubTreeFinished() if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.TaskRunner.RunTask( ctx.Context, - core.ExtraTaskOpts(ctx, core.HandleOverlaps), func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, + ratelimit.WithTaskName(ratelimit.HandleOverlaps), ) } regionUpdateCacheEventCounter.Inc() @@ -1098,13 +1101,13 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // handle region stats ctx.TaskRunner.RunTask( ctx.Context, - core.ExtraTaskOpts(ctx, core.CollectRegionStatsAsync), func(_ context.Context) { // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. cluster.Collect(c, region, hasRegionStats) }, + ratelimit.WithTaskName(ratelimit.CollectRegionStatsAsync), ) tracer.OnCollectRegionStatsFinished() @@ -1112,7 +1115,6 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if saveKV { ctx.TaskRunner.RunTask( ctx.Context, - core.ExtraTaskOpts(ctx, core.SaveRegionToKV), func(_ context.Context) { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it. @@ -1134,6 +1136,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio } regionUpdateKVEventCounter.Inc() }, + ratelimit.WithTaskName(ratelimit.SaveRegionToKV), ) } } diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 5c2bb950297..14a4d0c71a1 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -39,16 +39,19 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if c.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { tracer = core.NewHeartbeatProcessTracer() } - var runner ratelimit.Runner - runner = syncRunner + + var taskRunner, logRunner ratelimit.Runner + taskRunner, logRunner = syncRunner, syncRunner if c.GetScheduleConfig().EnableHeartbeatConcurrentRunner { - runner = c.taskRunner + taskRunner = c.heartbeatRunnner + logRunner = c.logRunner } + ctx := &core.MetaProcessContext{ Context: c.ctx, - Limiter: c.hbConcurrencyLimiter, Tracer: tracer, - TaskRunner: runner, + TaskRunner: taskRunner, + LogRunner: logRunner, } tracer.Begin() if err := c.processRegionHeartbeat(ctx, region); err != nil {