diff --git a/pkg/core/region.go b/pkg/core/region.go index 9e012890ee7..f8b6cbe964e 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -753,7 +753,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { if logRunner != nil { debug = func(msg string, fields ...zap.Field) { logRunner.RunTask( - region.GetID(), + fmt.Sprintf("%d-%s", region.GetID(), "DebugLog"), "DebugLog", func() { d(msg, fields...) @@ -762,7 +762,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } info = func(msg string, fields ...zap.Field) { logRunner.RunTask( - region.GetID(), + fmt.Sprintf("%d-%s", region.GetID(), "InfoLog"), "InfoLog", func() { i(msg, fields...) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 5fac3e1604a..f0a2dc3c23a 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -2,6 +2,7 @@ package server import ( "context" + "fmt" "runtime" "sync" "sync/atomic" @@ -613,7 +614,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.ObserveRegionStatsAsync), ratelimit.ObserveRegionStatsAsync, func() { if c.regionStats.RegionStatsNeedUpdate(region) { @@ -625,7 +626,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // region is not updated to the subtree. if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree), ratelimit.UpdateSubTree, func() { c.CheckAndPutSubTree(region) @@ -649,7 +650,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c return err } ctx.TaskRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree), ratelimit.UpdateSubTree, func() { c.CheckAndPutSubTree(region) @@ -658,7 +659,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c ) tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.HandleOverlaps), ratelimit.HandleOverlaps, func() { cluster.HandleOverlaps(c, overlaps) @@ -668,7 +669,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c tracer.OnSaveCacheFinished() // handle region stats ctx.TaskRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.CollectRegionStatsAsync), ratelimit.CollectRegionStatsAsync, func() { cluster.Collect(c, region, hasRegionStats) diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 1f9bea2edab..a85e3f375a8 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -17,7 +17,6 @@ package ratelimit import ( "context" "errors" - "fmt" "sync" "time" @@ -43,14 +42,14 @@ const ( // Runner is the interface for running tasks. type Runner interface { - RunTask(regionID uint64, name string, f func(), opts ...TaskOption) error + RunTask(id, name string, f func(), opts ...TaskOption) error Start() Stop() } // Task is a task to be run. type Task struct { - regionID uint64 + id string submittedAt time.Time f func() name string @@ -161,7 +160,7 @@ func (cr *ConcurrentRunner) processPendingTasks() { case cr.taskChan <- task: cr.pendingTasks = cr.pendingTasks[1:] cr.pendingTaskCount[task.name]-- - delete(cr.pendingRegionTasks, fmt.Sprintf("%d-%s", task.regionID, task.name)) + delete(cr.pendingRegionTasks, task.id) default: } return @@ -175,9 +174,9 @@ func (cr *ConcurrentRunner) Stop() { } // RunTask runs the task asynchronously. -func (cr *ConcurrentRunner) RunTask(regionID uint64, name string, f func(), opts ...TaskOption) error { +func (cr *ConcurrentRunner) RunTask(id, name string, f func(), opts ...TaskOption) error { task := &Task{ - regionID: regionID, + id: id, name: name, f: f, submittedAt: time.Now(), @@ -193,9 +192,8 @@ func (cr *ConcurrentRunner) RunTask(regionID uint64, name string, f func(), opts }() pendingTaskNum := len(cr.pendingTasks) - taskID := fmt.Sprintf("%d-%s", regionID, name) if pendingTaskNum > 0 { - if t, ok := cr.pendingRegionTasks[taskID]; ok { + if t, ok := cr.pendingRegionTasks[task.id]; ok { t.f = f t.submittedAt = time.Now() return nil @@ -207,15 +205,13 @@ func (cr *ConcurrentRunner) RunTask(regionID uint64, name string, f func(), opts return ErrMaxWaitingTasksExceeded } } - // We use the max task number to limit the memory usage. - // It occupies around 1.5GB memory when there is 20000000 pending task. if pendingTaskNum > maxPendingTaskNum { RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded } } cr.pendingTasks = append(cr.pendingTasks, task) - cr.pendingRegionTasks[taskID] = task + cr.pendingRegionTasks[task.id] = task cr.pendingTaskCount[task.name]++ return nil } @@ -229,7 +225,7 @@ func NewSyncRunner() *SyncRunner { } // RunTask runs the task synchronously. -func (*SyncRunner) RunTask(_ uint64, _ string, f func(), _ ...TaskOption) error { +func (*SyncRunner) RunTask(_, _ string, f func(), _ ...TaskOption) error { f() return nil } diff --git a/pkg/ratelimit/runner_test.go b/pkg/ratelimit/runner_test.go index a0e0ab1db10..b58a31a1292 100644 --- a/pkg/ratelimit/runner_test.go +++ b/pkg/ratelimit/runner_test.go @@ -15,6 +15,7 @@ package ratelimit import ( + "fmt" "sync" "testing" "time" @@ -33,7 +34,7 @@ func TestConcurrentRunner(t *testing.T) { time.Sleep(50 * time.Millisecond) wg.Add(1) err := runner.RunTask( - uint64(i), + fmt.Sprintf("%d-%s", i, "test1"), "test1", func() { defer wg.Done() @@ -53,7 +54,7 @@ func TestConcurrentRunner(t *testing.T) { for i := 0; i < 10; i++ { wg.Add(1) err := runner.RunTask( - uint64(i), + fmt.Sprintf("%d-%s", i, "test2"), "test2", func() { defer wg.Done() @@ -84,8 +85,8 @@ func TestConcurrentRunner(t *testing.T) { regionID = 4 } err := runner.RunTask( - regionID, - "test2", + fmt.Sprintf("%d-%s", regionID, "test3"), + "test3", func() { time.Sleep(time.Second) }, diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index be0ba39b899..a61dfe3b2cc 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1047,7 +1047,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.MiscRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.ObserveRegionStatsAsync), ratelimit.ObserveRegionStatsAsync, func() { if c.regionStats.RegionStatsNeedUpdate(region) { @@ -1059,7 +1059,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // region is not updated to the subtree. if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree), ratelimit.UpdateSubTree, func() { c.CheckAndPutSubTree(region) @@ -1087,7 +1087,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio return err } ctx.TaskRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree), ratelimit.UpdateSubTree, func() { c.CheckAndPutSubTree(region) @@ -1098,7 +1098,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.MiscRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.HandleOverlaps), ratelimit.HandleOverlaps, func() { cluster.HandleOverlaps(c, overlaps) @@ -1111,7 +1111,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnSaveCacheFinished() // handle region stats ctx.MiscRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.CollectRegionStatsAsync), ratelimit.CollectRegionStatsAsync, func() { // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", @@ -1125,7 +1125,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if c.storage != nil { if saveKV { ctx.MiscRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.SaveRegionToKV), ratelimit.SaveRegionToKV, func() { // If there are concurrent heartbeats from the same region, the last write will win even if