Skip to content

Commit

Permalink
remove task opt
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 13, 2024
1 parent 204dcbb commit a7a879e
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 39 deletions.
5 changes: 2 additions & 3 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -751,19 +750,19 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
debug = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
"DebugLog",
func(_ context.Context) {
d(msg, fields...)
},
ratelimit.WithTaskName("DebugLog"),
)
}
info = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
"InfoLog",
func(_ context.Context) {
i(msg, fields...)
},
ratelimit.WithTaskName("InfoLog"),
)
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,10 +594,10 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c

ctx.TaskRunner.RunTask(
ctx,
ratelimit.HandleStatsAsync,
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
ratelimit.WithTaskName(ratelimit.HandleStatsAsync),
)
tracer.OnAsyncHotStatsFinished()
hasRegionStats := c.regionStats != nil
Expand All @@ -611,22 +611,22 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx,
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
},
ratelimit.WithTaskName(ratelimit.ObserveRegionStatsAsync),
)
}
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
)
}
return nil
Expand All @@ -646,28 +646,28 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
}
ctx.TaskRunner.RunTask(
ctx,
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
ctx,
ratelimit.HandleOverlaps,
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
ratelimit.WithTaskName(ratelimit.HandleOverlaps),
)
}
tracer.OnSaveCacheFinished()
// handle region stats
ctx.TaskRunner.RunTask(
ctx,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
cluster.Collect(c, region, hasRegionStats)
},
ratelimit.WithTaskName(ratelimit.CollectRegionStatsAsync),
)
tracer.OnCollectRegionStatsFinished()
return nil
Expand Down
36 changes: 15 additions & 21 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,18 @@ const initialCapacity = 100

// Runner is the interface for running tasks.
type Runner interface {
RunTask(ctx context.Context, f func(context.Context), opts ...TaskOption) error
RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error
Start()
Stop()
}

// Task is a task to be run.
type Task struct {
Ctx context.Context
Opts *TaskOpts
f func(context.Context)
ctx context.Context
submittedAt time.Time
opts *TaskOpts
f func(context.Context)
name string
}

// ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum.
Expand Down Expand Up @@ -86,19 +87,11 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur
}

// TaskOpts is the options for RunTask.
type TaskOpts struct {
// TaskName is a human-readable name for the operation. TODO: metrics by name.
TaskName string
}
type TaskOpts struct{}

// TaskOption configures TaskOp
type TaskOption func(opts *TaskOpts)

// WithTaskName specify the task name.
func WithTaskName(name string) TaskOption {
return func(opts *TaskOpts) { opts.TaskName = name }
}

// Start starts the runner.
func (cr *ConcurrentRunner) Start() {
cr.stopChan = make(chan struct{})
Expand All @@ -114,9 +107,9 @@ func (cr *ConcurrentRunner) Start() {
if err != nil {
continue
}
go cr.run(task.Ctx, task.f, token)
go cr.run(task.ctx, task.f, token)
} else {
go cr.run(task.Ctx, task.f, nil)
go cr.run(task.ctx, task.f, nil)
}
case <-cr.stopChan:
cr.pendingMu.Lock()
Expand Down Expand Up @@ -156,7 +149,7 @@ func (cr *ConcurrentRunner) processPendingTasks() {
select {
case cr.taskChan <- task:
cr.pendingTasks = cr.pendingTasks[1:]
cr.pendingTaskCount[task.Opts.TaskName]--
cr.pendingTaskCount[task.name]--
return
default:
return
Expand All @@ -171,15 +164,16 @@ func (cr *ConcurrentRunner) Stop() {
}

// RunTask runs the task asynchronously.
func (cr *ConcurrentRunner) RunTask(ctx context.Context, f func(context.Context), opts ...TaskOption) error {
func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error {
taskOpts := &TaskOpts{}
for _, opt := range opts {
opt(taskOpts)
}
task := &Task{
Ctx: ctx,
ctx: ctx,
name: name,
f: f,
Opts: taskOpts,
opts: taskOpts,
}

cr.processPendingTasks()
Expand All @@ -197,7 +191,7 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, f func(context.Context)
}
task.submittedAt = time.Now()
cr.pendingTasks = append(cr.pendingTasks, task)
cr.pendingTaskCount[taskOpts.TaskName]++
cr.pendingTaskCount[task.name]++
}
return nil
}
Expand All @@ -211,7 +205,7 @@ func NewSyncRunner() *SyncRunner {
}

// RunTask runs the task synchronously.
func (*SyncRunner) RunTask(ctx context.Context, f func(context.Context), _ ...TaskOption) error {
func (*SyncRunner) RunTask(ctx context.Context, _ string, f func(context.Context), _ ...TaskOption) error {
f(ctx)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ratelimit/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ func TestConcurrentRunner(t *testing.T) {
wg.Add(1)
err := runner.RunTask(
context.Background(),
"test1",
func(context.Context) {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
},
WithTaskName("test1"),
)
require.NoError(t, err)
}
Expand All @@ -55,11 +55,11 @@ func TestConcurrentRunner(t *testing.T) {
wg.Add(1)
err := runner.RunTask(
context.Background(),
"test2",
func(context.Context) {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
},
WithTaskName("test2"),
)
if err != nil {
wg.Done()
Expand Down
14 changes: 7 additions & 7 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,10 +1026,10 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.HandleStatsAsync,
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
ratelimit.WithTaskName(ratelimit.HandleStatsAsync),
)
}
tracer.OnAsyncHotStatsFinished()
Expand All @@ -1047,22 +1047,22 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
},
ratelimit.WithTaskName(ratelimit.ObserveRegionStatsAsync),
)
}
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
)
}
return nil
Expand All @@ -1086,20 +1086,20 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
}
ctx.TaskRunner.RunTask(
ctx,
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
)
tracer.OnUpdateSubTreeFinished()

if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.HandleOverlaps,
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
ratelimit.WithTaskName(ratelimit.HandleOverlaps),
)
}
regionUpdateCacheEventCounter.Inc()
Expand All @@ -1109,20 +1109,21 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// handle region stats
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
cluster.Collect(c, region, hasRegionStats)
},
ratelimit.WithTaskName(ratelimit.CollectRegionStatsAsync),
)

tracer.OnCollectRegionStatsFinished()
if c.storage != nil {
if saveKV {
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.SaveRegionToKV,
func(_ context.Context) {
// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
Expand All @@ -1144,7 +1145,6 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
}
regionUpdateKVEventCounter.Inc()
},
ratelimit.WithTaskName(ratelimit.SaveRegionToKV),
)
}
}
Expand Down

0 comments on commit a7a879e

Please sign in to comment.