diff --git a/pkg/core/region.go b/pkg/core/region.go index be8f392f05e..be8fcddc179 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -36,7 +36,6 @@ import ( "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/ratelimit" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -751,19 +750,19 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { debug = func(msg string, fields ...zap.Field) { logRunner.RunTask( ctx.Context, + "DebugLog", func(_ context.Context) { d(msg, fields...) }, - ratelimit.WithTaskName("DebugLog"), ) } info = func(msg string, fields ...zap.Field) { logRunner.RunTask( ctx.Context, + "InfoLog", func(_ context.Context) { i(msg, fields...) }, - ratelimit.WithTaskName("InfoLog"), ) } } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index c36964a059c..d3691516868 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -594,10 +594,10 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c ctx.TaskRunner.RunTask( ctx, + ratelimit.HandleStatsAsync, func(_ context.Context) { cluster.HandleStatsAsync(c, region) }, - ratelimit.WithTaskName(ratelimit.HandleStatsAsync), ) tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil @@ -611,22 +611,22 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( ctx, + ratelimit.ObserveRegionStatsAsync, func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) } }, - ratelimit.WithTaskName(ratelimit.ObserveRegionStatsAsync), ) } // region is not updated to the subtree. if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( ctx, + ratelimit.UpdateSubTree, func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithTaskName(ratelimit.UpdateSubTree), ) } return nil @@ -646,28 +646,28 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c } ctx.TaskRunner.RunTask( ctx, + ratelimit.UpdateSubTree, func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithTaskName(ratelimit.UpdateSubTree), ) tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( ctx, + ratelimit.HandleOverlaps, func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, - ratelimit.WithTaskName(ratelimit.HandleOverlaps), ) } tracer.OnSaveCacheFinished() // handle region stats ctx.TaskRunner.RunTask( ctx, + ratelimit.CollectRegionStatsAsync, func(_ context.Context) { cluster.Collect(c, region, hasRegionStats) }, - ratelimit.WithTaskName(ratelimit.CollectRegionStatsAsync), ) tracer.OnCollectRegionStatsFinished() return nil diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index bfa1bf1865f..361b08c49b8 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -39,17 +39,18 @@ const initialCapacity = 100 // Runner is the interface for running tasks. type Runner interface { - RunTask(ctx context.Context, f func(context.Context), opts ...TaskOption) error + RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error Start() Stop() } // Task is a task to be run. type Task struct { - Ctx context.Context - Opts *TaskOpts - f func(context.Context) + ctx context.Context submittedAt time.Time + opts *TaskOpts + f func(context.Context) + name string } // ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. @@ -86,19 +87,11 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur } // TaskOpts is the options for RunTask. -type TaskOpts struct { - // TaskName is a human-readable name for the operation. TODO: metrics by name. - TaskName string -} +type TaskOpts struct{} // TaskOption configures TaskOp type TaskOption func(opts *TaskOpts) -// WithTaskName specify the task name. -func WithTaskName(name string) TaskOption { - return func(opts *TaskOpts) { opts.TaskName = name } -} - // Start starts the runner. func (cr *ConcurrentRunner) Start() { cr.stopChan = make(chan struct{}) @@ -114,9 +107,9 @@ func (cr *ConcurrentRunner) Start() { if err != nil { continue } - go cr.run(task.Ctx, task.f, token) + go cr.run(task.ctx, task.f, token) } else { - go cr.run(task.Ctx, task.f, nil) + go cr.run(task.ctx, task.f, nil) } case <-cr.stopChan: cr.pendingMu.Lock() @@ -156,7 +149,7 @@ func (cr *ConcurrentRunner) processPendingTasks() { select { case cr.taskChan <- task: cr.pendingTasks = cr.pendingTasks[1:] - cr.pendingTaskCount[task.Opts.TaskName]-- + cr.pendingTaskCount[task.name]-- return default: return @@ -171,15 +164,16 @@ func (cr *ConcurrentRunner) Stop() { } // RunTask runs the task asynchronously. -func (cr *ConcurrentRunner) RunTask(ctx context.Context, f func(context.Context), opts ...TaskOption) error { +func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error { taskOpts := &TaskOpts{} for _, opt := range opts { opt(taskOpts) } task := &Task{ - Ctx: ctx, + ctx: ctx, + name: name, f: f, - Opts: taskOpts, + opts: taskOpts, } cr.processPendingTasks() @@ -197,7 +191,7 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, f func(context.Context) } task.submittedAt = time.Now() cr.pendingTasks = append(cr.pendingTasks, task) - cr.pendingTaskCount[taskOpts.TaskName]++ + cr.pendingTaskCount[task.name]++ } return nil } @@ -211,7 +205,7 @@ func NewSyncRunner() *SyncRunner { } // RunTask runs the task synchronously. -func (*SyncRunner) RunTask(ctx context.Context, f func(context.Context), _ ...TaskOption) error { +func (*SyncRunner) RunTask(ctx context.Context, _ string, f func(context.Context), _ ...TaskOption) error { f(ctx) return nil } diff --git a/pkg/ratelimit/runner_test.go b/pkg/ratelimit/runner_test.go index ccbf6ed59ed..0241536686b 100644 --- a/pkg/ratelimit/runner_test.go +++ b/pkg/ratelimit/runner_test.go @@ -35,11 +35,11 @@ func TestConcurrentRunner(t *testing.T) { wg.Add(1) err := runner.RunTask( context.Background(), + "test1", func(context.Context) { defer wg.Done() time.Sleep(100 * time.Millisecond) }, - WithTaskName("test1"), ) require.NoError(t, err) } @@ -55,11 +55,11 @@ func TestConcurrentRunner(t *testing.T) { wg.Add(1) err := runner.RunTask( context.Background(), + "test2", func(context.Context) { defer wg.Done() time.Sleep(100 * time.Millisecond) }, - WithTaskName("test2"), ) if err != nil { wg.Done() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 42a1e94849c..a8558051dfa 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1026,10 +1026,10 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.TaskRunner.RunTask( ctx.Context, + ratelimit.HandleStatsAsync, func(_ context.Context) { cluster.HandleStatsAsync(c, region) }, - ratelimit.WithTaskName(ratelimit.HandleStatsAsync), ) } tracer.OnAsyncHotStatsFinished() @@ -1047,22 +1047,22 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( ctx.Context, + ratelimit.ObserveRegionStatsAsync, func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) } }, - ratelimit.WithTaskName(ratelimit.ObserveRegionStatsAsync), ) } // region is not updated to the subtree. if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( ctx, + ratelimit.UpdateSubTree, func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithTaskName(ratelimit.UpdateSubTree), ) } return nil @@ -1086,20 +1086,20 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio } ctx.TaskRunner.RunTask( ctx, + ratelimit.UpdateSubTree, func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithTaskName(ratelimit.UpdateSubTree), ) tracer.OnUpdateSubTreeFinished() if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.TaskRunner.RunTask( ctx.Context, + ratelimit.HandleOverlaps, func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, - ratelimit.WithTaskName(ratelimit.HandleOverlaps), ) } regionUpdateCacheEventCounter.Inc() @@ -1109,13 +1109,13 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // handle region stats ctx.TaskRunner.RunTask( ctx.Context, + ratelimit.CollectRegionStatsAsync, func(_ context.Context) { // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. cluster.Collect(c, region, hasRegionStats) }, - ratelimit.WithTaskName(ratelimit.CollectRegionStatsAsync), ) tracer.OnCollectRegionStatsFinished() @@ -1123,6 +1123,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if saveKV { ctx.TaskRunner.RunTask( ctx.Context, + ratelimit.SaveRegionToKV, func(_ context.Context) { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it. @@ -1144,7 +1145,6 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio } regionUpdateKVEventCounter.Inc() }, - ratelimit.WithTaskName(ratelimit.SaveRegionToKV), ) } }