diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go old mode 100755 new mode 100644 diff --git a/pkg/core/region.go b/pkg/core/region.go index 19c1d0d4794..5d7880823e8 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -16,7 +16,6 @@ package core import ( "bytes" - "context" "encoding/hex" "fmt" "math" @@ -751,21 +750,22 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { logRunner := ctx.LogRunner // print log asynchronously debug, info := d, i + regionID := region.GetID() if logRunner != nil { debug = func(msg string, fields ...zap.Field) { logRunner.RunTask( - ctx.Context, + regionID, "DebugLog", - func(_ context.Context) { + func() { d(msg, fields...) }, ) } info = func(msg string, fields ...zap.Field) { logRunner.RunTask( - ctx.Context, + regionID, "InfoLog", - func(_ context.Context) { + func() { i(msg, fields...) }, ) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index caaafe42c87..5fac3e1604a 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -607,15 +607,15 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. _, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin) - + regionID := region.GetID() if !saveCache { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( - ctx, + regionID, ratelimit.ObserveRegionStatsAsync, - func(_ context.Context) { + func() { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) } @@ -625,9 +625,9 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // region is not updated to the subtree. if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( - ctx, + regionID, ratelimit.UpdateSubTree, - func(_ context.Context) { + func() { c.CheckAndPutSubTree(region) }, ratelimit.WithRetained(true), @@ -649,18 +649,18 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c return err } ctx.TaskRunner.RunTask( - ctx, + regionID, ratelimit.UpdateSubTree, - func(_ context.Context) { + func() { c.CheckAndPutSubTree(region) }, ratelimit.WithRetained(retained), ) tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( - ctx, + regionID, ratelimit.HandleOverlaps, - func(_ context.Context) { + func() { cluster.HandleOverlaps(c, overlaps) }, ) @@ -668,9 +668,9 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c tracer.OnSaveCacheFinished() // handle region stats ctx.TaskRunner.RunTask( - ctx, + regionID, ratelimit.CollectRegionStatsAsync, - func(_ context.Context) { + func() { cluster.Collect(c, region, hasRegionStats) }, ) diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 17a45067f3d..2d88e36106e 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -42,16 +42,16 @@ const ( // Runner is the interface for running tasks. type Runner interface { - RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error + RunTask(id uint64, name string, f func(), opts ...TaskOption) error Start() Stop() } // Task is a task to be run. type Task struct { - ctx context.Context + id uint64 submittedAt time.Time - f func(context.Context) + f func() name string // retained indicates whether the task should be dropped if the task queue exceeds maxPendingDuration. retained bool @@ -60,17 +60,22 @@ type Task struct { // ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded") -// ConcurrentRunner is a simple task runner that limits the number of concurrent tasks. +type taskID struct { + id uint64 + name string +} + type ConcurrentRunner struct { name string limiter *ConcurrencyLimiter maxPendingDuration time.Duration taskChan chan *Task - pendingTasks []*Task pendingMu sync.Mutex stopChan chan struct{} wg sync.WaitGroup - pendingTaskCount map[string]int64 + pendingTaskCount map[string]int + pendingTasks []*Task + existTasks map[taskID]*Task maxWaitingDuration prometheus.Gauge } @@ -82,7 +87,8 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), pendingTasks: make([]*Task, 0, initialCapacity), - pendingTaskCount: make(map[string]int64), + pendingTaskCount: make(map[string]int), + existTasks: make(map[taskID]*Task), maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s @@ -101,6 +107,7 @@ func (cr *ConcurrentRunner) Start() { cr.stopChan = make(chan struct{}) cr.wg.Add(1) ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() go func() { defer cr.wg.Done() for { @@ -139,7 +146,7 @@ func (cr *ConcurrentRunner) Start() { func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) { start := time.Now() - task.f(task.ctx) + task.f() if token != nil { cr.limiter.ReleaseToken(token) cr.processPendingTasks() @@ -157,6 +164,7 @@ func (cr *ConcurrentRunner) processPendingTasks() { case cr.taskChan <- task: cr.pendingTasks = cr.pendingTasks[1:] cr.pendingTaskCount[task.name]-- + delete(cr.existTasks, taskID{id: task.id, name: task.name}) default: } return @@ -170,11 +178,12 @@ func (cr *ConcurrentRunner) Stop() { } // RunTask runs the task asynchronously. -func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error { +func (cr *ConcurrentRunner) RunTask(id uint64, name string, f func(), opts ...TaskOption) error { task := &Task{ - ctx: ctx, - name: name, - f: f, + id: id, + name: name, + f: f, + submittedAt: time.Now(), } for _, opt := range opts { opt(task) @@ -187,7 +196,15 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con }() pendingTaskNum := len(cr.pendingTasks) + tid := taskID{task.id, task.name} if pendingTaskNum > 0 { + // Here we use a map to find the task with the same ID. + // Then replace the old task with the new one. + if t, ok := cr.existTasks[tid]; ok { + t.f = f + t.submittedAt = time.Now() + return nil + } if !task.retained { maxWait := time.Since(cr.pendingTasks[0].submittedAt) if maxWait > cr.maxPendingDuration { @@ -195,15 +212,13 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con return ErrMaxWaitingTasksExceeded } } - // We use the max task number to limit the memory usage. - // It occupies around 1.5GB memory when there is 20000000 pending task. - if len(cr.pendingTasks) > maxPendingTaskNum { + if pendingTaskNum > maxPendingTaskNum { RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded } } - task.submittedAt = time.Now() cr.pendingTasks = append(cr.pendingTasks, task) + cr.existTasks[tid] = task cr.pendingTaskCount[task.name]++ return nil } @@ -217,8 +232,8 @@ func NewSyncRunner() *SyncRunner { } // RunTask runs the task synchronously. -func (*SyncRunner) RunTask(ctx context.Context, _ string, f func(context.Context), _ ...TaskOption) error { - f(ctx) +func (*SyncRunner) RunTask(_ uint64, _ string, f func(), _ ...TaskOption) error { + f() return nil } diff --git a/pkg/ratelimit/runner_test.go b/pkg/ratelimit/runner_test.go index 0241536686b..0335a78bcbe 100644 --- a/pkg/ratelimit/runner_test.go +++ b/pkg/ratelimit/runner_test.go @@ -15,7 +15,6 @@ package ratelimit import ( - "context" "sync" "testing" "time" @@ -34,9 +33,9 @@ func TestConcurrentRunner(t *testing.T) { time.Sleep(50 * time.Millisecond) wg.Add(1) err := runner.RunTask( - context.Background(), + uint64(i), "test1", - func(context.Context) { + func() { defer wg.Done() time.Sleep(100 * time.Millisecond) }, @@ -54,9 +53,9 @@ func TestConcurrentRunner(t *testing.T) { for i := 0; i < 10; i++ { wg.Add(1) err := runner.RunTask( - context.Background(), + uint64(i), "test2", - func(context.Context) { + func() { defer wg.Done() time.Sleep(100 * time.Millisecond) }, @@ -74,4 +73,29 @@ func TestConcurrentRunner(t *testing.T) { } wg.Wait() }) + + t.Run("DuplicatedTask", func(t *testing.T) { + runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Minute) + runner.Start() + defer runner.Stop() + for i := 1; i < 11; i++ { + regionID := uint64(i) + if i == 10 { + regionID = 4 + } + err := runner.RunTask( + regionID, + "test3", + func() { + time.Sleep(time.Second) + }, + ) + require.NoError(t, err) + time.Sleep(1 * time.Millisecond) + } + + updatedSubmitted := runner.pendingTasks[1].submittedAt + lastSubmitted := runner.pendingTasks[len(runner.pendingTasks)-1].submittedAt + require.Greater(t, updatedSubmitted, lastSubmitted) + }) } diff --git a/pkg/storage/leveldb_backend.go b/pkg/storage/leveldb_backend.go old mode 100755 new mode 100644 diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go old mode 100755 new mode 100644 diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 70d6b46b980..be0ba39b899 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1038,6 +1038,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // Save to cache if meta or leader is updated, or contains any down/pending peer. saveKV, saveCache, needSync, retained := regionGuide(ctx, region, origin) tracer.OnRegionGuideFinished() + regionID := region.GetID() if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. @@ -1046,9 +1047,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.MiscRunner.RunTask( - ctx.Context, + regionID, ratelimit.ObserveRegionStatsAsync, - func(_ context.Context) { + func() { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) } @@ -1058,9 +1059,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // region is not updated to the subtree. if origin.GetRef() < 2 { ctx.TaskRunner.RunTask( - ctx, + regionID, ratelimit.UpdateSubTree, - func(_ context.Context) { + func() { c.CheckAndPutSubTree(region) }, ratelimit.WithRetained(true), @@ -1086,9 +1087,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio return err } ctx.TaskRunner.RunTask( - ctx, + regionID, ratelimit.UpdateSubTree, - func(_ context.Context) { + func() { c.CheckAndPutSubTree(region) }, ratelimit.WithRetained(retained), @@ -1097,9 +1098,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.MiscRunner.RunTask( - ctx.Context, + regionID, ratelimit.HandleOverlaps, - func(_ context.Context) { + func() { cluster.HandleOverlaps(c, overlaps) }, ) @@ -1110,9 +1111,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnSaveCacheFinished() // handle region stats ctx.MiscRunner.RunTask( - ctx.Context, + regionID, ratelimit.CollectRegionStatsAsync, - func(_ context.Context) { + func() { // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. @@ -1124,9 +1125,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if c.storage != nil { if saveKV { ctx.MiscRunner.RunTask( - ctx.Context, + regionID, ratelimit.SaveRegionToKV, - func(_ context.Context) { + func() { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it. // Not successfully saved to storage is not fatal, it only leads to longer warm-up