From 2d43787d2f0333bcdfe9e855c5f186de93615563 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 31 May 2024 14:56:40 +0800 Subject: [PATCH 1/7] remove old duplicated task Signed-off-by: Ryan Leung --- pkg/core/region.go | 2 + pkg/mcs/scheduling/server/cluster.go | 7 +++- pkg/ratelimit/runner.go | 55 ++++++++++++++++++---------- pkg/ratelimit/runner_test.go | 2 + server/cluster/cluster.go | 7 ++++ 5 files changed, 53 insertions(+), 20 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 19c1d0d4794..2b31593d1fb 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -755,6 +755,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { debug = func(msg string, fields ...zap.Field) { logRunner.RunTask( ctx.Context, + region.GetID(), "DebugLog", func(_ context.Context) { d(msg, fields...) @@ -764,6 +765,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { info = func(msg string, fields ...zap.Field) { logRunner.RunTask( ctx.Context, + region.GetID(), "InfoLog", func(_ context.Context) { i(msg, fields...) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index caaafe42c87..c1f3fae8e66 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -607,13 +607,14 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. _, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin) - + regionID := region.GetID() if !saveCache { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( ctx, + regionID, ratelimit.ObserveRegionStatsAsync, func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { @@ -626,6 +627,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( ctx, + regionID, ratelimit.UpdateSubTree, func(_ context.Context) { c.CheckAndPutSubTree(region) @@ -650,6 +652,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c } ctx.TaskRunner.RunTask( ctx, + regionID, ratelimit.UpdateSubTree, func(_ context.Context) { c.CheckAndPutSubTree(region) @@ -659,6 +662,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( ctx, + regionID, ratelimit.HandleOverlaps, func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) @@ -669,6 +673,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // handle region stats ctx.TaskRunner.RunTask( ctx, + regionID, ratelimit.CollectRegionStatsAsync, func(_ context.Context) { cluster.Collect(c, region, hasRegionStats) diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 17a45067f3d..4c68e9d05cb 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -15,8 +15,10 @@ package ratelimit import ( + "container/list" "context" "errors" + "fmt" "sync" "time" @@ -42,7 +44,7 @@ const ( // Runner is the interface for running tasks. type Runner interface { - RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error + RunTask(ctx context.Context, regionID uint64, name string, f func(context.Context), opts ...TaskOption) error Start() Stop() } @@ -50,6 +52,7 @@ type Runner interface { // Task is a task to be run. type Task struct { ctx context.Context + regionID uint64 submittedAt time.Time f func(context.Context) name string @@ -66,11 +69,12 @@ type ConcurrentRunner struct { limiter *ConcurrencyLimiter maxPendingDuration time.Duration taskChan chan *Task - pendingTasks []*Task pendingMu sync.Mutex stopChan chan struct{} wg sync.WaitGroup pendingTaskCount map[string]int64 + pendingTasks *list.List + pendingRegionTasks map[string]*list.Element maxWaitingDuration prometheus.Gauge } @@ -81,8 +85,9 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur limiter: limiter, maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), - pendingTasks: make([]*Task, 0, initialCapacity), + pendingTasks: list.New(), pendingTaskCount: make(map[string]int64), + pendingRegionTasks: make(map[string]*list.Element), maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s @@ -101,6 +106,7 @@ func (cr *ConcurrentRunner) Start() { cr.stopChan = make(chan struct{}) cr.wg.Add(1) ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() go func() { defer cr.wg.Done() for { @@ -117,15 +123,15 @@ func (cr *ConcurrentRunner) Start() { } case <-cr.stopChan: cr.pendingMu.Lock() - cr.pendingTasks = make([]*Task, 0, initialCapacity) + cr.pendingTasks = list.New() cr.pendingMu.Unlock() log.Info("stopping async task runner", zap.String("name", cr.name)) return case <-ticker.C: maxDuration := time.Duration(0) cr.pendingMu.Lock() - if len(cr.pendingTasks) > 0 { - maxDuration = time.Since(cr.pendingTasks[0].submittedAt) + if cr.pendingTasks.Len() > 0 { + maxDuration = time.Since(cr.pendingTasks.Front().Value.(*Task).submittedAt) } for taskName, cnt := range cr.pendingTaskCount { RunnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt)) @@ -151,12 +157,13 @@ func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) { func (cr *ConcurrentRunner) processPendingTasks() { cr.pendingMu.Lock() defer cr.pendingMu.Unlock() - if len(cr.pendingTasks) > 0 { - task := cr.pendingTasks[0] + if cr.pendingTasks.Len() > 0 { + task := cr.pendingTasks.Front().Value.(*Task) select { case cr.taskChan <- task: - cr.pendingTasks = cr.pendingTasks[1:] + cr.pendingTasks.Remove(cr.pendingTasks.Front()) cr.pendingTaskCount[task.name]-- + delete(cr.pendingRegionTasks, fmt.Sprintf("%d-%s", task.regionID, task.name)) default: } return @@ -170,11 +177,13 @@ func (cr *ConcurrentRunner) Stop() { } // RunTask runs the task asynchronously. -func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error { +func (cr *ConcurrentRunner) RunTask(ctx context.Context, regionID uint64, name string, f func(context.Context), opts ...TaskOption) error { task := &Task{ - ctx: ctx, - name: name, - f: f, + ctx: ctx, + regionID: regionID, + name: name, + f: f, + submittedAt: time.Now(), } for _, opt := range opts { opt(task) @@ -186,10 +195,18 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con cr.processPendingTasks() }() - pendingTaskNum := len(cr.pendingTasks) + pendingTaskNum := cr.pendingTasks.Len() + taskID := fmt.Sprintf("%d-%s", regionID, name) if pendingTaskNum > 0 { + if element, ok := cr.pendingRegionTasks[taskID]; ok { + // Update the task in pendingTasks + element.Value = task + // Update the task in pendingRegionTasks + cr.pendingRegionTasks[taskID] = element + return nil + } if !task.retained { - maxWait := time.Since(cr.pendingTasks[0].submittedAt) + maxWait := time.Since(cr.pendingTasks.Front().Value.(*Task).submittedAt) if maxWait > cr.maxPendingDuration { RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded @@ -197,13 +214,13 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con } // We use the max task number to limit the memory usage. // It occupies around 1.5GB memory when there is 20000000 pending task. - if len(cr.pendingTasks) > maxPendingTaskNum { + if pendingTaskNum > maxPendingTaskNum { RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded } } - task.submittedAt = time.Now() - cr.pendingTasks = append(cr.pendingTasks, task) + element := cr.pendingTasks.PushBack(task) + cr.pendingRegionTasks[taskID] = element cr.pendingTaskCount[task.name]++ return nil } @@ -217,7 +234,7 @@ func NewSyncRunner() *SyncRunner { } // RunTask runs the task synchronously. -func (*SyncRunner) RunTask(ctx context.Context, _ string, f func(context.Context), _ ...TaskOption) error { +func (*SyncRunner) RunTask(ctx context.Context, _ uint64, _ string, f func(context.Context), _ ...TaskOption) error { f(ctx) return nil } diff --git a/pkg/ratelimit/runner_test.go b/pkg/ratelimit/runner_test.go index 0241536686b..3d7b8fd18be 100644 --- a/pkg/ratelimit/runner_test.go +++ b/pkg/ratelimit/runner_test.go @@ -35,6 +35,7 @@ func TestConcurrentRunner(t *testing.T) { wg.Add(1) err := runner.RunTask( context.Background(), + uint64(i), "test1", func(context.Context) { defer wg.Done() @@ -55,6 +56,7 @@ func TestConcurrentRunner(t *testing.T) { wg.Add(1) err := runner.RunTask( context.Background(), + uint64(i), "test2", func(context.Context) { defer wg.Done() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 70d6b46b980..d96ecd1e522 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1038,6 +1038,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // Save to cache if meta or leader is updated, or contains any down/pending peer. saveKV, saveCache, needSync, retained := regionGuide(ctx, region, origin) tracer.OnRegionGuideFinished() + regionID := region.GetID() if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. @@ -1047,6 +1048,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.MiscRunner.RunTask( ctx.Context, + regionID, ratelimit.ObserveRegionStatsAsync, func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { @@ -1059,6 +1061,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( ctx, + regionID, ratelimit.UpdateSubTree, func(_ context.Context) { c.CheckAndPutSubTree(region) @@ -1087,6 +1090,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio } ctx.TaskRunner.RunTask( ctx, + regionID, ratelimit.UpdateSubTree, func(_ context.Context) { c.CheckAndPutSubTree(region) @@ -1098,6 +1102,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.MiscRunner.RunTask( ctx.Context, + regionID, ratelimit.HandleOverlaps, func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) @@ -1111,6 +1116,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // handle region stats ctx.MiscRunner.RunTask( ctx.Context, + regionID, ratelimit.CollectRegionStatsAsync, func(_ context.Context) { // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", @@ -1125,6 +1131,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if saveKV { ctx.MiscRunner.RunTask( ctx.Context, + regionID, ratelimit.SaveRegionToKV, func(_ context.Context) { // If there are concurrent heartbeats from the same region, the last write will win even if From 02b6f89ba0ab64b9c2f23f34dd57659a3bb9897e Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 31 May 2024 16:11:58 +0800 Subject: [PATCH 2/7] add unit test Signed-off-by: Ryan Leung --- pkg/ratelimit/runner_test.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/pkg/ratelimit/runner_test.go b/pkg/ratelimit/runner_test.go index 3d7b8fd18be..121cf177bbe 100644 --- a/pkg/ratelimit/runner_test.go +++ b/pkg/ratelimit/runner_test.go @@ -76,4 +76,37 @@ func TestConcurrentRunner(t *testing.T) { } wg.Wait() }) + + t.Run("Duplicated", func(t *testing.T) { + runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Minute) + runner.Start() + defer runner.Stop() + for i := 1; i < 11; i++ { + regionID := uint64(i) + if i == 10 { + regionID = 4 + } + err := runner.RunTask( + context.Background(), + regionID, + "test2", + func(context.Context) { + time.Sleep(time.Second) + }, + ) + require.NoError(t, err) + time.Sleep(1 * time.Millisecond) + } + + var updatedSubmitted, lastSubmitted time.Time + for i := 0; i < 7; i++ { + task := runner.pendingTasks.Front().Value.(*Task) + lastSubmitted = task.submittedAt + if task.regionID == 4 { + updatedSubmitted = lastSubmitted + } + runner.pendingTasks.Remove(runner.pendingTasks.Front()) + } + require.Greater(t, updatedSubmitted, lastSubmitted) + }) } From bfcceb38f6dd624468166c4ec3a8ec72de8314f6 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 31 May 2024 16:34:01 +0800 Subject: [PATCH 3/7] remove context Signed-off-by: Ryan Leung --- pkg/core/region.go | 7 ++----- pkg/mcs/scheduling/server/cluster.go | 15 +++++---------- pkg/ratelimit/runner.go | 18 ++++++++---------- pkg/ratelimit/runner_test.go | 10 +++------- server/cluster/cluster.go | 18 ++++++------------ 5 files changed, 24 insertions(+), 44 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 2b31593d1fb..9e012890ee7 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -16,7 +16,6 @@ package core import ( "bytes" - "context" "encoding/hex" "fmt" "math" @@ -754,20 +753,18 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { if logRunner != nil { debug = func(msg string, fields ...zap.Field) { logRunner.RunTask( - ctx.Context, region.GetID(), "DebugLog", - func(_ context.Context) { + func() { d(msg, fields...) }, ) } info = func(msg string, fields ...zap.Field) { logRunner.RunTask( - ctx.Context, region.GetID(), "InfoLog", - func(_ context.Context) { + func() { i(msg, fields...) }, ) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index c1f3fae8e66..5fac3e1604a 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -613,10 +613,9 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( - ctx, regionID, ratelimit.ObserveRegionStatsAsync, - func(_ context.Context) { + func() { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) } @@ -626,10 +625,9 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // region is not updated to the subtree. if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( - ctx, regionID, ratelimit.UpdateSubTree, - func(_ context.Context) { + func() { c.CheckAndPutSubTree(region) }, ratelimit.WithRetained(true), @@ -651,20 +649,18 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c return err } ctx.TaskRunner.RunTask( - ctx, regionID, ratelimit.UpdateSubTree, - func(_ context.Context) { + func() { c.CheckAndPutSubTree(region) }, ratelimit.WithRetained(retained), ) tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( - ctx, regionID, ratelimit.HandleOverlaps, - func(_ context.Context) { + func() { cluster.HandleOverlaps(c, overlaps) }, ) @@ -672,10 +668,9 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c tracer.OnSaveCacheFinished() // handle region stats ctx.TaskRunner.RunTask( - ctx, regionID, ratelimit.CollectRegionStatsAsync, - func(_ context.Context) { + func() { cluster.Collect(c, region, hasRegionStats) }, ) diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 4c68e9d05cb..c55a41a4e22 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -44,17 +44,16 @@ const ( // Runner is the interface for running tasks. type Runner interface { - RunTask(ctx context.Context, regionID uint64, name string, f func(context.Context), opts ...TaskOption) error + RunTask(regionID uint64, name string, f func(), opts ...TaskOption) error Start() Stop() } // Task is a task to be run. type Task struct { - ctx context.Context regionID uint64 submittedAt time.Time - f func(context.Context) + f func() name string // retained indicates whether the task should be dropped if the task queue exceeds maxPendingDuration. retained bool @@ -72,7 +71,7 @@ type ConcurrentRunner struct { pendingMu sync.Mutex stopChan chan struct{} wg sync.WaitGroup - pendingTaskCount map[string]int64 + pendingTaskCount map[string]int pendingTasks *list.List pendingRegionTasks map[string]*list.Element maxWaitingDuration prometheus.Gauge @@ -86,7 +85,7 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), pendingTasks: list.New(), - pendingTaskCount: make(map[string]int64), + pendingTaskCount: make(map[string]int), pendingRegionTasks: make(map[string]*list.Element), maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } @@ -145,7 +144,7 @@ func (cr *ConcurrentRunner) Start() { func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) { start := time.Now() - task.f(task.ctx) + task.f() if token != nil { cr.limiter.ReleaseToken(token) cr.processPendingTasks() @@ -177,9 +176,8 @@ func (cr *ConcurrentRunner) Stop() { } // RunTask runs the task asynchronously. -func (cr *ConcurrentRunner) RunTask(ctx context.Context, regionID uint64, name string, f func(context.Context), opts ...TaskOption) error { +func (cr *ConcurrentRunner) RunTask(regionID uint64, name string, f func(), opts ...TaskOption) error { task := &Task{ - ctx: ctx, regionID: regionID, name: name, f: f, @@ -234,8 +232,8 @@ func NewSyncRunner() *SyncRunner { } // RunTask runs the task synchronously. -func (*SyncRunner) RunTask(ctx context.Context, _ uint64, _ string, f func(context.Context), _ ...TaskOption) error { - f(ctx) +func (*SyncRunner) RunTask(_ uint64, _ string, f func(), _ ...TaskOption) error { + f() return nil } diff --git a/pkg/ratelimit/runner_test.go b/pkg/ratelimit/runner_test.go index 121cf177bbe..21bff781d38 100644 --- a/pkg/ratelimit/runner_test.go +++ b/pkg/ratelimit/runner_test.go @@ -15,7 +15,6 @@ package ratelimit import ( - "context" "sync" "testing" "time" @@ -34,10 +33,9 @@ func TestConcurrentRunner(t *testing.T) { time.Sleep(50 * time.Millisecond) wg.Add(1) err := runner.RunTask( - context.Background(), uint64(i), "test1", - func(context.Context) { + func() { defer wg.Done() time.Sleep(100 * time.Millisecond) }, @@ -55,10 +53,9 @@ func TestConcurrentRunner(t *testing.T) { for i := 0; i < 10; i++ { wg.Add(1) err := runner.RunTask( - context.Background(), uint64(i), "test2", - func(context.Context) { + func() { defer wg.Done() time.Sleep(100 * time.Millisecond) }, @@ -87,10 +84,9 @@ func TestConcurrentRunner(t *testing.T) { regionID = 4 } err := runner.RunTask( - context.Background(), regionID, "test2", - func(context.Context) { + func() { time.Sleep(time.Second) }, ) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index d96ecd1e522..be0ba39b899 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1047,10 +1047,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.MiscRunner.RunTask( - ctx.Context, regionID, ratelimit.ObserveRegionStatsAsync, - func(_ context.Context) { + func() { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) } @@ -1060,10 +1059,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // region is not updated to the subtree. if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( - ctx, regionID, ratelimit.UpdateSubTree, - func(_ context.Context) { + func() { c.CheckAndPutSubTree(region) }, ratelimit.WithRetained(true), @@ -1089,10 +1087,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio return err } ctx.TaskRunner.RunTask( - ctx, regionID, ratelimit.UpdateSubTree, - func(_ context.Context) { + func() { c.CheckAndPutSubTree(region) }, ratelimit.WithRetained(retained), @@ -1101,10 +1098,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.MiscRunner.RunTask( - ctx.Context, regionID, ratelimit.HandleOverlaps, - func(_ context.Context) { + func() { cluster.HandleOverlaps(c, overlaps) }, ) @@ -1115,10 +1111,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnSaveCacheFinished() // handle region stats ctx.MiscRunner.RunTask( - ctx.Context, regionID, ratelimit.CollectRegionStatsAsync, - func(_ context.Context) { + func() { // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. @@ -1130,10 +1125,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if c.storage != nil { if saveKV { ctx.MiscRunner.RunTask( - ctx.Context, regionID, ratelimit.SaveRegionToKV, - func(_ context.Context) { + func() { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it. // Not successfully saved to storage is not fatal, it only leads to longer warm-up From 3772233ad492c584e1a03ae78ee1dc6dc91a846a Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 31 May 2024 16:46:50 +0800 Subject: [PATCH 4/7] change back to slice Signed-off-by: Ryan Leung --- pkg/ratelimit/runner.go | 37 +++++++++++++++++------------------- pkg/ratelimit/runner_test.go | 13 +++---------- 2 files changed, 20 insertions(+), 30 deletions(-) diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index c55a41a4e22..1f9bea2edab 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -15,7 +15,6 @@ package ratelimit import ( - "container/list" "context" "errors" "fmt" @@ -72,8 +71,8 @@ type ConcurrentRunner struct { stopChan chan struct{} wg sync.WaitGroup pendingTaskCount map[string]int - pendingTasks *list.List - pendingRegionTasks map[string]*list.Element + pendingTasks []*Task + pendingRegionTasks map[string]*Task maxWaitingDuration prometheus.Gauge } @@ -84,9 +83,9 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur limiter: limiter, maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), - pendingTasks: list.New(), + pendingTasks: make([]*Task, 0, initialCapacity), pendingTaskCount: make(map[string]int), - pendingRegionTasks: make(map[string]*list.Element), + pendingRegionTasks: make(map[string]*Task), maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s @@ -122,15 +121,15 @@ func (cr *ConcurrentRunner) Start() { } case <-cr.stopChan: cr.pendingMu.Lock() - cr.pendingTasks = list.New() + cr.pendingTasks = make([]*Task, 0, initialCapacity) cr.pendingMu.Unlock() log.Info("stopping async task runner", zap.String("name", cr.name)) return case <-ticker.C: maxDuration := time.Duration(0) cr.pendingMu.Lock() - if cr.pendingTasks.Len() > 0 { - maxDuration = time.Since(cr.pendingTasks.Front().Value.(*Task).submittedAt) + if len(cr.pendingTasks) > 0 { + maxDuration = time.Since(cr.pendingTasks[0].submittedAt) } for taskName, cnt := range cr.pendingTaskCount { RunnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt)) @@ -156,11 +155,11 @@ func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) { func (cr *ConcurrentRunner) processPendingTasks() { cr.pendingMu.Lock() defer cr.pendingMu.Unlock() - if cr.pendingTasks.Len() > 0 { - task := cr.pendingTasks.Front().Value.(*Task) + if len(cr.pendingTasks) > 0 { + task := cr.pendingTasks[0] select { case cr.taskChan <- task: - cr.pendingTasks.Remove(cr.pendingTasks.Front()) + cr.pendingTasks = cr.pendingTasks[1:] cr.pendingTaskCount[task.name]-- delete(cr.pendingRegionTasks, fmt.Sprintf("%d-%s", task.regionID, task.name)) default: @@ -193,18 +192,16 @@ func (cr *ConcurrentRunner) RunTask(regionID uint64, name string, f func(), opts cr.processPendingTasks() }() - pendingTaskNum := cr.pendingTasks.Len() + pendingTaskNum := len(cr.pendingTasks) taskID := fmt.Sprintf("%d-%s", regionID, name) if pendingTaskNum > 0 { - if element, ok := cr.pendingRegionTasks[taskID]; ok { - // Update the task in pendingTasks - element.Value = task - // Update the task in pendingRegionTasks - cr.pendingRegionTasks[taskID] = element + if t, ok := cr.pendingRegionTasks[taskID]; ok { + t.f = f + t.submittedAt = time.Now() return nil } if !task.retained { - maxWait := time.Since(cr.pendingTasks.Front().Value.(*Task).submittedAt) + maxWait := time.Since(cr.pendingTasks[0].submittedAt) if maxWait > cr.maxPendingDuration { RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded @@ -217,8 +214,8 @@ func (cr *ConcurrentRunner) RunTask(regionID uint64, name string, f func(), opts return ErrMaxWaitingTasksExceeded } } - element := cr.pendingTasks.PushBack(task) - cr.pendingRegionTasks[taskID] = element + cr.pendingTasks = append(cr.pendingTasks, task) + cr.pendingRegionTasks[taskID] = task cr.pendingTaskCount[task.name]++ return nil } diff --git a/pkg/ratelimit/runner_test.go b/pkg/ratelimit/runner_test.go index 21bff781d38..a0e0ab1db10 100644 --- a/pkg/ratelimit/runner_test.go +++ b/pkg/ratelimit/runner_test.go @@ -74,7 +74,7 @@ func TestConcurrentRunner(t *testing.T) { wg.Wait() }) - t.Run("Duplicated", func(t *testing.T) { + t.Run("DuplicatedTask", func(t *testing.T) { runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Minute) runner.Start() defer runner.Stop() @@ -94,15 +94,8 @@ func TestConcurrentRunner(t *testing.T) { time.Sleep(1 * time.Millisecond) } - var updatedSubmitted, lastSubmitted time.Time - for i := 0; i < 7; i++ { - task := runner.pendingTasks.Front().Value.(*Task) - lastSubmitted = task.submittedAt - if task.regionID == 4 { - updatedSubmitted = lastSubmitted - } - runner.pendingTasks.Remove(runner.pendingTasks.Front()) - } + updatedSubmitted := runner.pendingTasks[1].submittedAt + lastSubmitted := runner.pendingTasks[len(runner.pendingTasks)-1].submittedAt require.Greater(t, updatedSubmitted, lastSubmitted) }) } From 2695c7942cde2337f89072f038e20bd24ea96410 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 3 Jun 2024 10:50:26 +0800 Subject: [PATCH 5/7] use general task id Signed-off-by: Ryan Leung --- pkg/core/region.go | 4 ++-- pkg/mcs/scheduling/server/cluster.go | 11 ++++++----- pkg/ratelimit/runner.go | 20 ++++++++------------ pkg/ratelimit/runner_test.go | 9 +++++---- server/cluster/cluster.go | 12 ++++++------ 5 files changed, 27 insertions(+), 29 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 9e012890ee7..f8b6cbe964e 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -753,7 +753,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { if logRunner != nil { debug = func(msg string, fields ...zap.Field) { logRunner.RunTask( - region.GetID(), + fmt.Sprintf("%d-%s", region.GetID(), "DebugLog"), "DebugLog", func() { d(msg, fields...) @@ -762,7 +762,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } info = func(msg string, fields ...zap.Field) { logRunner.RunTask( - region.GetID(), + fmt.Sprintf("%d-%s", region.GetID(), "InfoLog"), "InfoLog", func() { i(msg, fields...) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 5fac3e1604a..f0a2dc3c23a 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -2,6 +2,7 @@ package server import ( "context" + "fmt" "runtime" "sync" "sync/atomic" @@ -613,7 +614,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.ObserveRegionStatsAsync), ratelimit.ObserveRegionStatsAsync, func() { if c.regionStats.RegionStatsNeedUpdate(region) { @@ -625,7 +626,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // region is not updated to the subtree. if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree), ratelimit.UpdateSubTree, func() { c.CheckAndPutSubTree(region) @@ -649,7 +650,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c return err } ctx.TaskRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree), ratelimit.UpdateSubTree, func() { c.CheckAndPutSubTree(region) @@ -658,7 +659,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c ) tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.HandleOverlaps), ratelimit.HandleOverlaps, func() { cluster.HandleOverlaps(c, overlaps) @@ -668,7 +669,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c tracer.OnSaveCacheFinished() // handle region stats ctx.TaskRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.CollectRegionStatsAsync), ratelimit.CollectRegionStatsAsync, func() { cluster.Collect(c, region, hasRegionStats) diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 1f9bea2edab..a85e3f375a8 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -17,7 +17,6 @@ package ratelimit import ( "context" "errors" - "fmt" "sync" "time" @@ -43,14 +42,14 @@ const ( // Runner is the interface for running tasks. type Runner interface { - RunTask(regionID uint64, name string, f func(), opts ...TaskOption) error + RunTask(id, name string, f func(), opts ...TaskOption) error Start() Stop() } // Task is a task to be run. type Task struct { - regionID uint64 + id string submittedAt time.Time f func() name string @@ -161,7 +160,7 @@ func (cr *ConcurrentRunner) processPendingTasks() { case cr.taskChan <- task: cr.pendingTasks = cr.pendingTasks[1:] cr.pendingTaskCount[task.name]-- - delete(cr.pendingRegionTasks, fmt.Sprintf("%d-%s", task.regionID, task.name)) + delete(cr.pendingRegionTasks, task.id) default: } return @@ -175,9 +174,9 @@ func (cr *ConcurrentRunner) Stop() { } // RunTask runs the task asynchronously. -func (cr *ConcurrentRunner) RunTask(regionID uint64, name string, f func(), opts ...TaskOption) error { +func (cr *ConcurrentRunner) RunTask(id, name string, f func(), opts ...TaskOption) error { task := &Task{ - regionID: regionID, + id: id, name: name, f: f, submittedAt: time.Now(), @@ -193,9 +192,8 @@ func (cr *ConcurrentRunner) RunTask(regionID uint64, name string, f func(), opts }() pendingTaskNum := len(cr.pendingTasks) - taskID := fmt.Sprintf("%d-%s", regionID, name) if pendingTaskNum > 0 { - if t, ok := cr.pendingRegionTasks[taskID]; ok { + if t, ok := cr.pendingRegionTasks[task.id]; ok { t.f = f t.submittedAt = time.Now() return nil @@ -207,15 +205,13 @@ func (cr *ConcurrentRunner) RunTask(regionID uint64, name string, f func(), opts return ErrMaxWaitingTasksExceeded } } - // We use the max task number to limit the memory usage. - // It occupies around 1.5GB memory when there is 20000000 pending task. if pendingTaskNum > maxPendingTaskNum { RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded } } cr.pendingTasks = append(cr.pendingTasks, task) - cr.pendingRegionTasks[taskID] = task + cr.pendingRegionTasks[task.id] = task cr.pendingTaskCount[task.name]++ return nil } @@ -229,7 +225,7 @@ func NewSyncRunner() *SyncRunner { } // RunTask runs the task synchronously. -func (*SyncRunner) RunTask(_ uint64, _ string, f func(), _ ...TaskOption) error { +func (*SyncRunner) RunTask(_, _ string, f func(), _ ...TaskOption) error { f() return nil } diff --git a/pkg/ratelimit/runner_test.go b/pkg/ratelimit/runner_test.go index a0e0ab1db10..b58a31a1292 100644 --- a/pkg/ratelimit/runner_test.go +++ b/pkg/ratelimit/runner_test.go @@ -15,6 +15,7 @@ package ratelimit import ( + "fmt" "sync" "testing" "time" @@ -33,7 +34,7 @@ func TestConcurrentRunner(t *testing.T) { time.Sleep(50 * time.Millisecond) wg.Add(1) err := runner.RunTask( - uint64(i), + fmt.Sprintf("%d-%s", i, "test1"), "test1", func() { defer wg.Done() @@ -53,7 +54,7 @@ func TestConcurrentRunner(t *testing.T) { for i := 0; i < 10; i++ { wg.Add(1) err := runner.RunTask( - uint64(i), + fmt.Sprintf("%d-%s", i, "test2"), "test2", func() { defer wg.Done() @@ -84,8 +85,8 @@ func TestConcurrentRunner(t *testing.T) { regionID = 4 } err := runner.RunTask( - regionID, - "test2", + fmt.Sprintf("%d-%s", regionID, "test3"), + "test3", func() { time.Sleep(time.Second) }, diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index be0ba39b899..a61dfe3b2cc 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1047,7 +1047,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.MiscRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.ObserveRegionStatsAsync), ratelimit.ObserveRegionStatsAsync, func() { if c.regionStats.RegionStatsNeedUpdate(region) { @@ -1059,7 +1059,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // region is not updated to the subtree. if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree), ratelimit.UpdateSubTree, func() { c.CheckAndPutSubTree(region) @@ -1087,7 +1087,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio return err } ctx.TaskRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree), ratelimit.UpdateSubTree, func() { c.CheckAndPutSubTree(region) @@ -1098,7 +1098,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.MiscRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.HandleOverlaps), ratelimit.HandleOverlaps, func() { cluster.HandleOverlaps(c, overlaps) @@ -1111,7 +1111,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnSaveCacheFinished() // handle region stats ctx.MiscRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.CollectRegionStatsAsync), ratelimit.CollectRegionStatsAsync, func() { // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", @@ -1125,7 +1125,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if c.storage != nil { if saveKV { ctx.MiscRunner.RunTask( - regionID, + fmt.Sprintf("%d-%s", regionID, ratelimit.SaveRegionToKV), ratelimit.SaveRegionToKV, func() { // If there are concurrent heartbeats from the same region, the last write will win even if From d73aa2f11bb7a7146a43093dc5ad66697b1e134e Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 4 Jun 2024 15:59:46 +0800 Subject: [PATCH 6/7] address comments Signed-off-by: Ryan Leung --- pkg/core/region.go | 5 +++-- pkg/mcs/scheduling/server/cluster.go | 11 +++++------ pkg/ratelimit/runner.go | 27 ++++++++++++++++++--------- pkg/ratelimit/runner_test.go | 7 +++---- server/cluster/cluster.go | 12 ++++++------ 5 files changed, 35 insertions(+), 27 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index f8b6cbe964e..5d7880823e8 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -750,10 +750,11 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { logRunner := ctx.LogRunner // print log asynchronously debug, info := d, i + regionID := region.GetID() if logRunner != nil { debug = func(msg string, fields ...zap.Field) { logRunner.RunTask( - fmt.Sprintf("%d-%s", region.GetID(), "DebugLog"), + regionID, "DebugLog", func() { d(msg, fields...) @@ -762,7 +763,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } info = func(msg string, fields ...zap.Field) { logRunner.RunTask( - fmt.Sprintf("%d-%s", region.GetID(), "InfoLog"), + regionID, "InfoLog", func() { i(msg, fields...) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index f0a2dc3c23a..5fac3e1604a 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -2,7 +2,6 @@ package server import ( "context" - "fmt" "runtime" "sync" "sync/atomic" @@ -614,7 +613,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( - fmt.Sprintf("%d-%s", regionID, ratelimit.ObserveRegionStatsAsync), + regionID, ratelimit.ObserveRegionStatsAsync, func() { if c.regionStats.RegionStatsNeedUpdate(region) { @@ -626,7 +625,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // region is not updated to the subtree. if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( - fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree), + regionID, ratelimit.UpdateSubTree, func() { c.CheckAndPutSubTree(region) @@ -650,7 +649,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c return err } ctx.TaskRunner.RunTask( - fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree), + regionID, ratelimit.UpdateSubTree, func() { c.CheckAndPutSubTree(region) @@ -659,7 +658,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c ) tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( - fmt.Sprintf("%d-%s", regionID, ratelimit.HandleOverlaps), + regionID, ratelimit.HandleOverlaps, func() { cluster.HandleOverlaps(c, overlaps) @@ -669,7 +668,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c tracer.OnSaveCacheFinished() // handle region stats ctx.TaskRunner.RunTask( - fmt.Sprintf("%d-%s", regionID, ratelimit.CollectRegionStatsAsync), + regionID, ratelimit.CollectRegionStatsAsync, func() { cluster.Collect(c, region, hasRegionStats) diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index a85e3f375a8..89e72cd4bb5 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -42,14 +42,14 @@ const ( // Runner is the interface for running tasks. type Runner interface { - RunTask(id, name string, f func(), opts ...TaskOption) error + RunTask(id uint64, name string, f func(), opts ...TaskOption) error Start() Stop() } // Task is a task to be run. type Task struct { - id string + id uint64 submittedAt time.Time f func() name string @@ -61,6 +61,12 @@ type Task struct { var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded") // ConcurrentRunner is a simple task runner that limits the number of concurrent tasks. + +type taskID struct { + id uint64 + name string +} + type ConcurrentRunner struct { name string limiter *ConcurrencyLimiter @@ -71,7 +77,7 @@ type ConcurrentRunner struct { wg sync.WaitGroup pendingTaskCount map[string]int pendingTasks []*Task - pendingRegionTasks map[string]*Task + existTasks map[taskID]*Task maxWaitingDuration prometheus.Gauge } @@ -84,7 +90,7 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur taskChan: make(chan *Task), pendingTasks: make([]*Task, 0, initialCapacity), pendingTaskCount: make(map[string]int), - pendingRegionTasks: make(map[string]*Task), + existTasks: make(map[taskID]*Task), maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s @@ -160,7 +166,7 @@ func (cr *ConcurrentRunner) processPendingTasks() { case cr.taskChan <- task: cr.pendingTasks = cr.pendingTasks[1:] cr.pendingTaskCount[task.name]-- - delete(cr.pendingRegionTasks, task.id) + delete(cr.existTasks, taskID{id: task.id, name: task.name}) default: } return @@ -174,7 +180,7 @@ func (cr *ConcurrentRunner) Stop() { } // RunTask runs the task asynchronously. -func (cr *ConcurrentRunner) RunTask(id, name string, f func(), opts ...TaskOption) error { +func (cr *ConcurrentRunner) RunTask(id uint64, name string, f func(), opts ...TaskOption) error { task := &Task{ id: id, name: name, @@ -192,8 +198,11 @@ func (cr *ConcurrentRunner) RunTask(id, name string, f func(), opts ...TaskOptio }() pendingTaskNum := len(cr.pendingTasks) + tid := taskID{task.id, task.name} if pendingTaskNum > 0 { - if t, ok := cr.pendingRegionTasks[task.id]; ok { + // Here we use a map to find the task with the same ID. + // Then replace the old task with the new one. + if t, ok := cr.existTasks[tid]; ok { t.f = f t.submittedAt = time.Now() return nil @@ -211,7 +220,7 @@ func (cr *ConcurrentRunner) RunTask(id, name string, f func(), opts ...TaskOptio } } cr.pendingTasks = append(cr.pendingTasks, task) - cr.pendingRegionTasks[task.id] = task + cr.existTasks[tid] = task cr.pendingTaskCount[task.name]++ return nil } @@ -225,7 +234,7 @@ func NewSyncRunner() *SyncRunner { } // RunTask runs the task synchronously. -func (*SyncRunner) RunTask(_, _ string, f func(), _ ...TaskOption) error { +func (*SyncRunner) RunTask(_ uint64, _ string, f func(), _ ...TaskOption) error { f() return nil } diff --git a/pkg/ratelimit/runner_test.go b/pkg/ratelimit/runner_test.go index b58a31a1292..0335a78bcbe 100644 --- a/pkg/ratelimit/runner_test.go +++ b/pkg/ratelimit/runner_test.go @@ -15,7 +15,6 @@ package ratelimit import ( - "fmt" "sync" "testing" "time" @@ -34,7 +33,7 @@ func TestConcurrentRunner(t *testing.T) { time.Sleep(50 * time.Millisecond) wg.Add(1) err := runner.RunTask( - fmt.Sprintf("%d-%s", i, "test1"), + uint64(i), "test1", func() { defer wg.Done() @@ -54,7 +53,7 @@ func TestConcurrentRunner(t *testing.T) { for i := 0; i < 10; i++ { wg.Add(1) err := runner.RunTask( - fmt.Sprintf("%d-%s", i, "test2"), + uint64(i), "test2", func() { defer wg.Done() @@ -85,7 +84,7 @@ func TestConcurrentRunner(t *testing.T) { regionID = 4 } err := runner.RunTask( - fmt.Sprintf("%d-%s", regionID, "test3"), + regionID, "test3", func() { time.Sleep(time.Second) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a61dfe3b2cc..be0ba39b899 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1047,7 +1047,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.MiscRunner.RunTask( - fmt.Sprintf("%d-%s", regionID, ratelimit.ObserveRegionStatsAsync), + regionID, ratelimit.ObserveRegionStatsAsync, func() { if c.regionStats.RegionStatsNeedUpdate(region) { @@ -1059,7 +1059,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // region is not updated to the subtree. if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( - fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree), + regionID, ratelimit.UpdateSubTree, func() { c.CheckAndPutSubTree(region) @@ -1087,7 +1087,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio return err } ctx.TaskRunner.RunTask( - fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree), + regionID, ratelimit.UpdateSubTree, func() { c.CheckAndPutSubTree(region) @@ -1098,7 +1098,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.MiscRunner.RunTask( - fmt.Sprintf("%d-%s", regionID, ratelimit.HandleOverlaps), + regionID, ratelimit.HandleOverlaps, func() { cluster.HandleOverlaps(c, overlaps) @@ -1111,7 +1111,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnSaveCacheFinished() // handle region stats ctx.MiscRunner.RunTask( - fmt.Sprintf("%d-%s", regionID, ratelimit.CollectRegionStatsAsync), + regionID, ratelimit.CollectRegionStatsAsync, func() { // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", @@ -1125,7 +1125,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if c.storage != nil { if saveKV { ctx.MiscRunner.RunTask( - fmt.Sprintf("%d-%s", regionID, ratelimit.SaveRegionToKV), + regionID, ratelimit.SaveRegionToKV, func() { // If there are concurrent heartbeats from the same region, the last write will win even if From 746e7f1ad6a41b9577d23b5439c7600292f2c5ec Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 13 Jun 2024 10:20:49 +0800 Subject: [PATCH 7/7] address the comment Signed-off-by: Ryan Leung --- pkg/ratelimit/runner.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 89e72cd4bb5..2d88e36106e 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -60,8 +60,6 @@ type Task struct { // ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded") -// ConcurrentRunner is a simple task runner that limits the number of concurrent tasks. - type taskID struct { id uint64 name string