Skip to content

Commit

Permalink
use general task id
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jun 3, 2024
1 parent 3772233 commit 2695c79
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 29 deletions.
4 changes: 2 additions & 2 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
if logRunner != nil {
debug = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
region.GetID(),
fmt.Sprintf("%d-%s", region.GetID(), "DebugLog"),

Check warning on line 756 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L756

Added line #L756 was not covered by tests
"DebugLog",
func() {

Check warning on line 758 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L758

Added line #L758 was not covered by tests
d(msg, fields...)
Expand All @@ -762,7 +762,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
info = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
region.GetID(),
fmt.Sprintf("%d-%s", region.GetID(), "InfoLog"),
"InfoLog",
func() {
i(msg, fields...)
Expand Down
11 changes: 6 additions & 5 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -613,7 +614,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
regionID,
fmt.Sprintf("%d-%s", regionID, ratelimit.ObserveRegionStatsAsync),

Check warning on line 617 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L617

Added line #L617 was not covered by tests
ratelimit.ObserveRegionStatsAsync,
func() {

Check warning on line 619 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L619

Added line #L619 was not covered by tests
if c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -625,7 +626,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
regionID,
fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree),

Check warning on line 629 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L629

Added line #L629 was not covered by tests
ratelimit.UpdateSubTree,
func() {

Check warning on line 631 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L631

Added line #L631 was not covered by tests
c.CheckAndPutSubTree(region)
Expand All @@ -649,7 +650,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
return err
}
ctx.TaskRunner.RunTask(
regionID,
fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree),
ratelimit.UpdateSubTree,
func() {
c.CheckAndPutSubTree(region)
Expand All @@ -658,7 +659,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
regionID,
fmt.Sprintf("%d-%s", regionID, ratelimit.HandleOverlaps),
ratelimit.HandleOverlaps,
func() {
cluster.HandleOverlaps(c, overlaps)
Expand All @@ -668,7 +669,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
tracer.OnSaveCacheFinished()
// handle region stats
ctx.TaskRunner.RunTask(
regionID,
fmt.Sprintf("%d-%s", regionID, ratelimit.CollectRegionStatsAsync),
ratelimit.CollectRegionStatsAsync,
func() {
cluster.Collect(c, region, hasRegionStats)
Expand Down
20 changes: 8 additions & 12 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package ratelimit
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -43,14 +42,14 @@ const (

// Runner is the interface for running tasks.
type Runner interface {
RunTask(regionID uint64, name string, f func(), opts ...TaskOption) error
RunTask(id, name string, f func(), opts ...TaskOption) error
Start()
Stop()
}

// Task is a task to be run.
type Task struct {
regionID uint64
id string
submittedAt time.Time
f func()
name string
Expand Down Expand Up @@ -161,7 +160,7 @@ func (cr *ConcurrentRunner) processPendingTasks() {
case cr.taskChan <- task:
cr.pendingTasks = cr.pendingTasks[1:]
cr.pendingTaskCount[task.name]--
delete(cr.pendingRegionTasks, fmt.Sprintf("%d-%s", task.regionID, task.name))
delete(cr.pendingRegionTasks, task.id)
default:
}
return
Expand All @@ -175,9 +174,9 @@ func (cr *ConcurrentRunner) Stop() {
}

// RunTask runs the task asynchronously.
func (cr *ConcurrentRunner) RunTask(regionID uint64, name string, f func(), opts ...TaskOption) error {
func (cr *ConcurrentRunner) RunTask(id, name string, f func(), opts ...TaskOption) error {
task := &Task{
regionID: regionID,
id: id,
name: name,
f: f,
submittedAt: time.Now(),
Expand All @@ -193,9 +192,8 @@ func (cr *ConcurrentRunner) RunTask(regionID uint64, name string, f func(), opts
}()

pendingTaskNum := len(cr.pendingTasks)
taskID := fmt.Sprintf("%d-%s", regionID, name)
if pendingTaskNum > 0 {
if t, ok := cr.pendingRegionTasks[taskID]; ok {
if t, ok := cr.pendingRegionTasks[task.id]; ok {
t.f = f
t.submittedAt = time.Now()
return nil
Expand All @@ -207,15 +205,13 @@ func (cr *ConcurrentRunner) RunTask(regionID uint64, name string, f func(), opts
return ErrMaxWaitingTasksExceeded
}
}
// We use the max task number to limit the memory usage.
// It occupies around 1.5GB memory when there is 20000000 pending task.
if pendingTaskNum > maxPendingTaskNum {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
cr.pendingTasks = append(cr.pendingTasks, task)
cr.pendingRegionTasks[taskID] = task
cr.pendingRegionTasks[task.id] = task
cr.pendingTaskCount[task.name]++
return nil
}
Expand All @@ -229,7 +225,7 @@ func NewSyncRunner() *SyncRunner {
}

// RunTask runs the task synchronously.
func (*SyncRunner) RunTask(_ uint64, _ string, f func(), _ ...TaskOption) error {
func (*SyncRunner) RunTask(_, _ string, f func(), _ ...TaskOption) error {
f()
return nil
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/ratelimit/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ratelimit

import (
"fmt"
"sync"
"testing"
"time"
Expand All @@ -33,7 +34,7 @@ func TestConcurrentRunner(t *testing.T) {
time.Sleep(50 * time.Millisecond)
wg.Add(1)
err := runner.RunTask(
uint64(i),
fmt.Sprintf("%d-%s", i, "test1"),
"test1",
func() {
defer wg.Done()
Expand All @@ -53,7 +54,7 @@ func TestConcurrentRunner(t *testing.T) {
for i := 0; i < 10; i++ {
wg.Add(1)
err := runner.RunTask(
uint64(i),
fmt.Sprintf("%d-%s", i, "test2"),
"test2",
func() {
defer wg.Done()
Expand Down Expand Up @@ -84,8 +85,8 @@ func TestConcurrentRunner(t *testing.T) {
regionID = 4
}
err := runner.RunTask(
regionID,
"test2",
fmt.Sprintf("%d-%s", regionID, "test3"),
"test3",
func() {
time.Sleep(time.Second)
},
Expand Down
12 changes: 6 additions & 6 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// We need to think of a better way to reduce this part of the cost in the future.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.MiscRunner.RunTask(
regionID,
fmt.Sprintf("%d-%s", regionID, ratelimit.ObserveRegionStatsAsync),
ratelimit.ObserveRegionStatsAsync,
func() {
if c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -1059,7 +1059,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
regionID,
fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree),
ratelimit.UpdateSubTree,
func() {
c.CheckAndPutSubTree(region)
Expand Down Expand Up @@ -1087,7 +1087,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
return err
}
ctx.TaskRunner.RunTask(
regionID,
fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree),
ratelimit.UpdateSubTree,
func() {
c.CheckAndPutSubTree(region)
Expand All @@ -1098,7 +1098,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio

if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.MiscRunner.RunTask(
regionID,
fmt.Sprintf("%d-%s", regionID, ratelimit.HandleOverlaps),
ratelimit.HandleOverlaps,
func() {
cluster.HandleOverlaps(c, overlaps)
Expand All @@ -1111,7 +1111,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
tracer.OnSaveCacheFinished()
// handle region stats
ctx.MiscRunner.RunTask(
regionID,
fmt.Sprintf("%d-%s", regionID, ratelimit.CollectRegionStatsAsync),
ratelimit.CollectRegionStatsAsync,
func() {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
Expand All @@ -1125,7 +1125,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if c.storage != nil {
if saveKV {
ctx.MiscRunner.RunTask(
regionID,
fmt.Sprintf("%d-%s", regionID, ratelimit.SaveRegionToKV),
ratelimit.SaveRegionToKV,
func() {
// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down

0 comments on commit 2695c79

Please sign in to comment.