Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg: remove old duplicated task #8234

Merged
merged 8 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math"
Expand Down Expand Up @@ -754,18 +753,18 @@
if logRunner != nil {
debug = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
region.GetID(),

Check warning on line 756 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L756

Added line #L756 was not covered by tests
nolouch marked this conversation as resolved.
Show resolved Hide resolved
"DebugLog",
func(_ context.Context) {
func() {

Check warning on line 758 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L758

Added line #L758 was not covered by tests
d(msg, fields...)
},
)
}
info = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
region.GetID(),
"InfoLog",
func(_ context.Context) {
func() {
i(msg, fields...)
},
)
Expand Down
22 changes: 11 additions & 11 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,15 +607,15 @@
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
_, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin)

regionID := region.GetID()
if !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx,
regionID,

Check warning on line 616 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L616

Added line #L616 was not covered by tests
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
func() {

Check warning on line 618 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L618

Added line #L618 was not covered by tests
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
Expand All @@ -625,9 +625,9 @@
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
regionID,

Check warning on line 628 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L628

Added line #L628 was not covered by tests
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {

Check warning on line 630 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L630

Added line #L630 was not covered by tests
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(true),
Expand All @@ -649,28 +649,28 @@
return err
}
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(retained),
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.HandleOverlaps,
func(_ context.Context) {
func() {
cluster.HandleOverlaps(c, overlaps)
},
)
}
tracer.OnSaveCacheFinished()
// handle region stats
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
func() {
cluster.Collect(c, region, hasRegionStats)
},
)
Expand Down
42 changes: 27 additions & 15 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ratelimit
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -42,16 +43,16 @@ const (

// Runner is the interface for running tasks.
type Runner interface {
RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error
RunTask(regionID uint64, name string, f func(), opts ...TaskOption) error
Start()
Stop()
}

// Task is a task to be run.
type Task struct {
ctx context.Context
regionID uint64
submittedAt time.Time
f func(context.Context)
f func()
name string
// retained indicates whether the task should be dropped if the task queue exceeds maxPendingDuration.
retained bool
Expand All @@ -66,11 +67,12 @@ type ConcurrentRunner struct {
limiter *ConcurrencyLimiter
maxPendingDuration time.Duration
taskChan chan *Task
pendingTasks []*Task
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
pendingTaskCount map[string]int64
pendingTaskCount map[string]int
pendingTasks []*Task
pendingRegionTasks map[string]*Task
maxWaitingDuration prometheus.Gauge
}

Expand All @@ -82,7 +84,8 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur
maxPendingDuration: maxPendingDuration,
taskChan: make(chan *Task),
pendingTasks: make([]*Task, 0, initialCapacity),
pendingTaskCount: make(map[string]int64),
pendingTaskCount: make(map[string]int),
pendingRegionTasks: make(map[string]*Task),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
}
return s
Expand All @@ -101,6 +104,7 @@ func (cr *ConcurrentRunner) Start() {
cr.stopChan = make(chan struct{})
cr.wg.Add(1)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
go func() {
defer cr.wg.Done()
for {
Expand Down Expand Up @@ -139,7 +143,7 @@ func (cr *ConcurrentRunner) Start() {

func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) {
start := time.Now()
task.f(task.ctx)
task.f()
if token != nil {
cr.limiter.ReleaseToken(token)
cr.processPendingTasks()
Expand All @@ -157,6 +161,7 @@ func (cr *ConcurrentRunner) processPendingTasks() {
case cr.taskChan <- task:
cr.pendingTasks = cr.pendingTasks[1:]
cr.pendingTaskCount[task.name]--
delete(cr.pendingRegionTasks, fmt.Sprintf("%d-%s", task.regionID, task.name))
default:
}
return
Expand All @@ -170,11 +175,12 @@ func (cr *ConcurrentRunner) Stop() {
}

// RunTask runs the task asynchronously.
func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error {
func (cr *ConcurrentRunner) RunTask(regionID uint64, name string, f func(), opts ...TaskOption) error {
task := &Task{
ctx: ctx,
name: name,
f: f,
regionID: regionID,
name: name,
f: f,
submittedAt: time.Now(),
}
for _, opt := range opts {
opt(task)
Expand All @@ -187,7 +193,13 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con
}()

pendingTaskNum := len(cr.pendingTasks)
taskID := fmt.Sprintf("%d-%s", regionID, name)
if pendingTaskNum > 0 {
if t, ok := cr.pendingRegionTasks[taskID]; ok {
t.f = f
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to explain the function will be overwrite and it doesn't matter

rest lgtm

t.submittedAt = time.Now()
return nil
}
if !task.retained {
maxWait := time.Since(cr.pendingTasks[0].submittedAt)
if maxWait > cr.maxPendingDuration {
Expand All @@ -197,13 +209,13 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con
}
// We use the max task number to limit the memory usage.
// It occupies around 1.5GB memory when there is 20000000 pending task.
if len(cr.pendingTasks) > maxPendingTaskNum {
if pendingTaskNum > maxPendingTaskNum {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
task.submittedAt = time.Now()
cr.pendingTasks = append(cr.pendingTasks, task)
cr.pendingRegionTasks[taskID] = task
cr.pendingTaskCount[task.name]++
return nil
}
Expand All @@ -217,8 +229,8 @@ func NewSyncRunner() *SyncRunner {
}

// RunTask runs the task synchronously.
func (*SyncRunner) RunTask(ctx context.Context, _ string, f func(context.Context), _ ...TaskOption) error {
f(ctx)
func (*SyncRunner) RunTask(_ uint64, _ string, f func(), _ ...TaskOption) error {
f()
return nil
}

Expand Down
34 changes: 29 additions & 5 deletions pkg/ratelimit/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package ratelimit

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -34,9 +33,9 @@ func TestConcurrentRunner(t *testing.T) {
time.Sleep(50 * time.Millisecond)
wg.Add(1)
err := runner.RunTask(
context.Background(),
uint64(i),
"test1",
func(context.Context) {
func() {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
},
Expand All @@ -54,9 +53,9 @@ func TestConcurrentRunner(t *testing.T) {
for i := 0; i < 10; i++ {
wg.Add(1)
err := runner.RunTask(
context.Background(),
uint64(i),
"test2",
func(context.Context) {
func() {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
},
Expand All @@ -74,4 +73,29 @@ func TestConcurrentRunner(t *testing.T) {
}
wg.Wait()
})

t.Run("DuplicatedTask", func(t *testing.T) {
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Minute)
runner.Start()
defer runner.Stop()
for i := 1; i < 11; i++ {
regionID := uint64(i)
if i == 10 {
regionID = 4
}
err := runner.RunTask(
regionID,
"test2",
func() {
time.Sleep(time.Second)
},
)
require.NoError(t, err)
time.Sleep(1 * time.Millisecond)
}

updatedSubmitted := runner.pendingTasks[1].submittedAt
lastSubmitted := runner.pendingTasks[len(runner.pendingTasks)-1].submittedAt
require.Greater(t, updatedSubmitted, lastSubmitted)
})
}
25 changes: 13 additions & 12 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// Save to cache if meta or leader is updated, or contains any down/pending peer.
saveKV, saveCache, needSync, retained := regionGuide(ctx, region, origin)
tracer.OnRegionGuideFinished()
regionID := region.GetID()
if !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
Expand All @@ -1046,9 +1047,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// We need to think of a better way to reduce this part of the cost in the future.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
func() {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
Expand All @@ -1058,9 +1059,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(true),
Expand All @@ -1086,9 +1087,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
return err
}
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(retained),
Expand All @@ -1097,9 +1098,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio

if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.HandleOverlaps,
func(_ context.Context) {
func() {
cluster.HandleOverlaps(c, overlaps)
},
)
Expand All @@ -1110,9 +1111,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
tracer.OnSaveCacheFinished()
// handle region stats
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
func() {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
Expand All @@ -1124,9 +1125,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if c.storage != nil {
if saveKV {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.SaveRegionToKV,
func(_ context.Context) {
func() {
// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
// Not successfully saved to storage is not fatal, it only leads to longer warm-up
Expand Down