Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg: remove old duplicated task #8234

Merged
merged 8 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math"
Expand Down Expand Up @@ -751,21 +750,22 @@
logRunner := ctx.LogRunner
// print log asynchronously
debug, info := d, i
regionID := region.GetID()
if logRunner != nil {
debug = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
regionID,

Check warning on line 757 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L757

Added line #L757 was not covered by tests
"DebugLog",
func(_ context.Context) {
func() {

Check warning on line 759 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L759

Added line #L759 was not covered by tests
d(msg, fields...)
},
)
}
info = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
regionID,
"InfoLog",
func(_ context.Context) {
func() {
i(msg, fields...)
},
)
Expand Down
22 changes: 11 additions & 11 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,15 +607,15 @@
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
_, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin)

regionID := region.GetID()
if !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx,
regionID,

Check warning on line 616 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L616

Added line #L616 was not covered by tests
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
func() {

Check warning on line 618 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L618

Added line #L618 was not covered by tests
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
Expand All @@ -625,9 +625,9 @@
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
regionID,

Check warning on line 628 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L628

Added line #L628 was not covered by tests
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {

Check warning on line 630 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L630

Added line #L630 was not covered by tests
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(true),
Expand All @@ -649,28 +649,28 @@
return err
}
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(retained),
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.HandleOverlaps,
func(_ context.Context) {
func() {
cluster.HandleOverlaps(c, overlaps)
},
)
}
tracer.OnSaveCacheFinished()
// handle region stats
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
func() {
cluster.Collect(c, region, hasRegionStats)
},
)
Expand Down
51 changes: 34 additions & 17 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ const (

// Runner is the interface for running tasks.
type Runner interface {
RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error
RunTask(id uint64, name string, f func(), opts ...TaskOption) error
Start()
Stop()
}

// Task is a task to be run.
type Task struct {
ctx context.Context
id uint64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use taskID directly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok and use less memory.

submittedAt time.Time
f func(context.Context)
f func()
name string
// retained indicates whether the task should be dropped if the task queue exceeds maxPendingDuration.
retained bool
Expand All @@ -61,16 +61,23 @@ type Task struct {
var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded")

// ConcurrentRunner is a simple task runner that limits the number of concurrent tasks.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a mismatched comment line.


type taskID struct {
id uint64
name string
}

type ConcurrentRunner struct {
name string
limiter *ConcurrencyLimiter
maxPendingDuration time.Duration
taskChan chan *Task
pendingTasks []*Task
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
pendingTaskCount map[string]int64
pendingTaskCount map[string]int
pendingTasks []*Task
existTasks map[taskID]*Task
maxWaitingDuration prometheus.Gauge
}

Expand All @@ -82,7 +89,8 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur
maxPendingDuration: maxPendingDuration,
taskChan: make(chan *Task),
pendingTasks: make([]*Task, 0, initialCapacity),
pendingTaskCount: make(map[string]int64),
pendingTaskCount: make(map[string]int),
existTasks: make(map[taskID]*Task),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
}
return s
Expand All @@ -101,6 +109,7 @@ func (cr *ConcurrentRunner) Start() {
cr.stopChan = make(chan struct{})
cr.wg.Add(1)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
go func() {
defer cr.wg.Done()
for {
Expand Down Expand Up @@ -139,7 +148,7 @@ func (cr *ConcurrentRunner) Start() {

func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) {
start := time.Now()
task.f(task.ctx)
task.f()
if token != nil {
cr.limiter.ReleaseToken(token)
cr.processPendingTasks()
Expand All @@ -157,6 +166,7 @@ func (cr *ConcurrentRunner) processPendingTasks() {
case cr.taskChan <- task:
cr.pendingTasks = cr.pendingTasks[1:]
cr.pendingTaskCount[task.name]--
delete(cr.existTasks, taskID{id: task.id, name: task.name})
default:
}
return
Expand All @@ -170,11 +180,12 @@ func (cr *ConcurrentRunner) Stop() {
}

// RunTask runs the task asynchronously.
func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error {
func (cr *ConcurrentRunner) RunTask(id uint64, name string, f func(), opts ...TaskOption) error {
task := &Task{
ctx: ctx,
name: name,
f: f,
id: id,
name: name,
f: f,
submittedAt: time.Now(),
}
for _, opt := range opts {
opt(task)
Expand All @@ -187,23 +198,29 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con
}()

pendingTaskNum := len(cr.pendingTasks)
tid := taskID{task.id, task.name}
if pendingTaskNum > 0 {
// Here we use a map to find the task with the same ID.
// Then replace the old task with the new one.
if t, ok := cr.existTasks[tid]; ok {
t.f = f
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to explain the function will be overwrite and it doesn't matter

rest lgtm

t.submittedAt = time.Now()
return nil
}
if !task.retained {
maxWait := time.Since(cr.pendingTasks[0].submittedAt)
if maxWait > cr.maxPendingDuration {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
// We use the max task number to limit the memory usage.
// It occupies around 1.5GB memory when there is 20000000 pending task.
if len(cr.pendingTasks) > maxPendingTaskNum {
if pendingTaskNum > maxPendingTaskNum {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
task.submittedAt = time.Now()
cr.pendingTasks = append(cr.pendingTasks, task)
cr.existTasks[tid] = task
cr.pendingTaskCount[task.name]++
return nil
}
Expand All @@ -217,8 +234,8 @@ func NewSyncRunner() *SyncRunner {
}

// RunTask runs the task synchronously.
func (*SyncRunner) RunTask(ctx context.Context, _ string, f func(context.Context), _ ...TaskOption) error {
f(ctx)
func (*SyncRunner) RunTask(_ uint64, _ string, f func(), _ ...TaskOption) error {
f()
return nil
}

Expand Down
34 changes: 29 additions & 5 deletions pkg/ratelimit/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package ratelimit

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -34,9 +33,9 @@ func TestConcurrentRunner(t *testing.T) {
time.Sleep(50 * time.Millisecond)
wg.Add(1)
err := runner.RunTask(
context.Background(),
uint64(i),
"test1",
func(context.Context) {
func() {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
},
Expand All @@ -54,9 +53,9 @@ func TestConcurrentRunner(t *testing.T) {
for i := 0; i < 10; i++ {
wg.Add(1)
err := runner.RunTask(
context.Background(),
uint64(i),
"test2",
func(context.Context) {
func() {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
},
Expand All @@ -74,4 +73,29 @@ func TestConcurrentRunner(t *testing.T) {
}
wg.Wait()
})

t.Run("DuplicatedTask", func(t *testing.T) {
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Minute)
runner.Start()
defer runner.Stop()
for i := 1; i < 11; i++ {
regionID := uint64(i)
if i == 10 {
regionID = 4
}
err := runner.RunTask(
regionID,
"test3",
func() {
time.Sleep(time.Second)
},
)
require.NoError(t, err)
time.Sleep(1 * time.Millisecond)
}

updatedSubmitted := runner.pendingTasks[1].submittedAt
lastSubmitted := runner.pendingTasks[len(runner.pendingTasks)-1].submittedAt
require.Greater(t, updatedSubmitted, lastSubmitted)
})
}
25 changes: 13 additions & 12 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// Save to cache if meta or leader is updated, or contains any down/pending peer.
saveKV, saveCache, needSync, retained := regionGuide(ctx, region, origin)
tracer.OnRegionGuideFinished()
regionID := region.GetID()
if !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
Expand All @@ -1046,9 +1047,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// We need to think of a better way to reduce this part of the cost in the future.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
func() {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
Expand All @@ -1058,9 +1059,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(true),
Expand All @@ -1086,9 +1087,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
return err
}
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(retained),
Expand All @@ -1097,9 +1098,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio

if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.HandleOverlaps,
func(_ context.Context) {
func() {
cluster.HandleOverlaps(c, overlaps)
},
)
Expand All @@ -1110,9 +1111,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
tracer.OnSaveCacheFinished()
// handle region stats
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
func() {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
Expand All @@ -1124,9 +1125,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if c.storage != nil {
if saveKV {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.SaveRegionToKV,
func(_ context.Context) {
func() {
// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
// Not successfully saved to storage is not fatal, it only leads to longer warm-up
Expand Down