Skip to content

Commit

Permalink
address
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Apr 22, 2024
1 parent 3007e31 commit 10de156
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 47 deletions.
18 changes: 18 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,24 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
}

// RegionHeartbeatStageName is the name of the stage of the region heartbeat.
const (
HandleStatsAsync = "HandleStatsAsync"
ObserveRegionStatsAsync = "ObserveRegionStatsAsync"
UpdateSubTree = "UpdateSubTree"
HandleOverlaps = "HandleOverlaps"
CollectRegionStatsAsync = "CollectRegionStatsAsync"
SaveRegionToKV = "SaveRegionToKV"
)

// ExtraTaskOpts returns the task options for the task.
func ExtraTaskOpts(ctx *MetaProcessContext, name string) ratelimit.TaskOpts {
return ratelimit.TaskOpts{
TaskName: name,
Limit: ctx.Limiter,
}
}

// RWLockStats is a read-write lock with statistics.
type RWLockStats struct {
syncutil.RWMutex
Expand Down
25 changes: 5 additions & 20 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c

ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "HandleStatsAsync",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.HandleStatsAsync),
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
Expand All @@ -610,10 +607,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "ObserveRegionStatsAsync",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.ObserveRegionStatsAsync),
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
Expand All @@ -638,21 +632,15 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
}
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "UpdateSubTree",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.UpdateSubTree),
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "HandleOverlaps",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.HandleOverlaps),
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
Expand All @@ -662,10 +650,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// handle region stats
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "CollectRegionStatsAsync",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.CollectRegionStatsAsync),
func(_ context.Context) {
cluster.Collect(c, region, hasRegionStats)
},
Expand Down
8 changes: 5 additions & 3 deletions pkg/ratelimit/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,31 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

const nameStr = "runner_name"

var (
RunnerTaskMaxWaitingDuration = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_max_waiting_duration_seconds",
Help: "The duration of tasks waiting in the runner.",
}, []string{"name"})
}, []string{nameStr})

RunnerTaskPendingTasks = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_pending_tasks",
Help: "The number of pending tasks in the runner.",
}, []string{"name"})
}, []string{nameStr})
RunnerTaskFailedTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_failed_tasks_total",
Help: "The number of failed tasks in the runner.",
}, []string{"name"})
}, []string{nameStr})
)

func init() {
Expand Down
30 changes: 6 additions & 24 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,10 +1015,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.TaskOpts{
TaskName: "HandleStatsAsync",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.HandleStatsAsync),
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
Expand All @@ -1039,10 +1036,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.TaskOpts{
TaskName: "ObserveRegionStatsAsync",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.ObserveRegionStatsAsync),
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
Expand Down Expand Up @@ -1071,10 +1065,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
}
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "UpdateSubTree",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.UpdateSubTree),
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
Expand All @@ -1084,10 +1075,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.TaskOpts{
TaskName: "HandleOverlaps",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.HandleOverlaps),
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
Expand All @@ -1100,10 +1088,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// handle region stats
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.TaskOpts{
TaskName: "CollectRegionStatsAsync",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.CollectRegionStatsAsync),
func(_ context.Context) {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
Expand All @@ -1117,10 +1102,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if saveKV {
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.TaskOpts{
TaskName: "SaveRegionToKV",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.SaveRegionToKV),
func(_ context.Context) {
// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
Expand Down

0 comments on commit 10de156

Please sign in to comment.