Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg: remove old duplicated task #8234

Merged
merged 8 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math"
Expand Down Expand Up @@ -754,18 +753,18 @@
if logRunner != nil {
debug = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
fmt.Sprintf("%d-%s", region.GetID(), "DebugLog"),

Check warning on line 756 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L756

Added line #L756 was not covered by tests
"DebugLog",
func(_ context.Context) {
func() {

Check warning on line 758 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L758

Added line #L758 was not covered by tests
d(msg, fields...)
},
)
}
info = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
fmt.Sprintf("%d-%s", region.GetID(), "InfoLog"),
"InfoLog",
func(_ context.Context) {
func() {
i(msg, fields...)
},
)
Expand Down
23 changes: 12 additions & 11 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -607,15 +608,15 @@
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
_, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin)

regionID := region.GetID()
if !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx,
fmt.Sprintf("%d-%s", regionID, ratelimit.ObserveRegionStatsAsync),

Check warning on line 617 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L617

Added line #L617 was not covered by tests
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
func() {

Check warning on line 619 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L619

Added line #L619 was not covered by tests
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
Expand All @@ -625,9 +626,9 @@
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree),

Check warning on line 629 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L629

Added line #L629 was not covered by tests
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {

Check warning on line 631 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L631

Added line #L631 was not covered by tests
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(true),
Expand All @@ -649,28 +650,28 @@
return err
}
ctx.TaskRunner.RunTask(
ctx,
fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree),
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(retained),
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
ctx,
fmt.Sprintf("%d-%s", regionID, ratelimit.HandleOverlaps),
ratelimit.HandleOverlaps,
func(_ context.Context) {
func() {
cluster.HandleOverlaps(c, overlaps)
},
)
}
tracer.OnSaveCacheFinished()
// handle region stats
ctx.TaskRunner.RunTask(
ctx,
fmt.Sprintf("%d-%s", regionID, ratelimit.CollectRegionStatsAsync),
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
func() {
cluster.Collect(c, region, hasRegionStats)
},
)
Expand Down
42 changes: 25 additions & 17 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ const (

// Runner is the interface for running tasks.
type Runner interface {
RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error
RunTask(id, name string, f func(), opts ...TaskOption) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think RunTask(id uint64, name string, f func(), opts ...TaskOption) error is ok. fmt.Sprint has an allocation cost.

Copy link
Member Author

@rleungx rleungx Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the same id may have different tasks. If we don't use fmt.Sprint, then we need a map[uint64]map[string]struct{}.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package main

import (
        "fmt"
        "strings"
        "testing"
)

func BenchmarkSprintfKey(b *testing.B) {
        m := make(map[string]int)
        b.ReportAllocs()
        for i := 0; i < b.N; i++ {
                key := fmt.Sprintf("%s %s", "hello", "world")
                m[key]++
        }
}

func BenchmarkStructKey(b *testing.B) {
        type myKey struct{
           id int64
           name string
        }
        m := make(map[myKey]int)
        b.ReportAllocs()
        for i := 0; i < b.N; i++ {
                key := myKey{id: 1, name: "test"}
                m[key]++
        }
}
➜  bench_sprintf go test -bench=. -benchmem
goos: darwin
goarch: arm64
pkg: bench
BenchmarkSprintfKey-12        	20207793	        60.09 ns/op	      16 B/op	       1 allocs/op
BenchmarkStructKey-12         	83140879	        14.91 ns/op	       0 B/op	       0 allocs/op

Start()
Stop()
}

// Task is a task to be run.
type Task struct {
ctx context.Context
id string
submittedAt time.Time
f func(context.Context)
f func()
name string
// retained indicates whether the task should be dropped if the task queue exceeds maxPendingDuration.
retained bool
Expand All @@ -66,11 +66,12 @@ type ConcurrentRunner struct {
limiter *ConcurrencyLimiter
maxPendingDuration time.Duration
taskChan chan *Task
pendingTasks []*Task
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
pendingTaskCount map[string]int64
pendingTaskCount map[string]int
pendingTasks []*Task
pendingRegionTasks map[string]*Task
maxWaitingDuration prometheus.Gauge
}

Expand All @@ -82,7 +83,8 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur
maxPendingDuration: maxPendingDuration,
taskChan: make(chan *Task),
pendingTasks: make([]*Task, 0, initialCapacity),
pendingTaskCount: make(map[string]int64),
pendingTaskCount: make(map[string]int),
pendingRegionTasks: make(map[string]*Task),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
}
return s
Expand All @@ -101,6 +103,7 @@ func (cr *ConcurrentRunner) Start() {
cr.stopChan = make(chan struct{})
cr.wg.Add(1)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
go func() {
defer cr.wg.Done()
for {
Expand Down Expand Up @@ -139,7 +142,7 @@ func (cr *ConcurrentRunner) Start() {

func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) {
start := time.Now()
task.f(task.ctx)
task.f()
if token != nil {
cr.limiter.ReleaseToken(token)
cr.processPendingTasks()
Expand All @@ -157,6 +160,7 @@ func (cr *ConcurrentRunner) processPendingTasks() {
case cr.taskChan <- task:
cr.pendingTasks = cr.pendingTasks[1:]
cr.pendingTaskCount[task.name]--
delete(cr.pendingRegionTasks, task.id)
default:
}
return
Expand All @@ -170,11 +174,12 @@ func (cr *ConcurrentRunner) Stop() {
}

// RunTask runs the task asynchronously.
func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error {
func (cr *ConcurrentRunner) RunTask(id, name string, f func(), opts ...TaskOption) error {
task := &Task{
ctx: ctx,
name: name,
f: f,
id: id,
name: name,
f: f,
submittedAt: time.Now(),
}
for _, opt := range opts {
opt(task)
Expand All @@ -188,22 +193,25 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con

pendingTaskNum := len(cr.pendingTasks)
if pendingTaskNum > 0 {
if t, ok := cr.pendingRegionTasks[task.id]; ok {
t.f = f
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to explain the function will be overwrite and it doesn't matter

rest lgtm

t.submittedAt = time.Now()
return nil
}
if !task.retained {
maxWait := time.Since(cr.pendingTasks[0].submittedAt)
if maxWait > cr.maxPendingDuration {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
// We use the max task number to limit the memory usage.
// It occupies around 1.5GB memory when there is 20000000 pending task.
if len(cr.pendingTasks) > maxPendingTaskNum {
if pendingTaskNum > maxPendingTaskNum {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
task.submittedAt = time.Now()
cr.pendingTasks = append(cr.pendingTasks, task)
cr.pendingRegionTasks[task.id] = task
cr.pendingTaskCount[task.name]++
return nil
}
Expand All @@ -217,8 +225,8 @@ func NewSyncRunner() *SyncRunner {
}

// RunTask runs the task synchronously.
func (*SyncRunner) RunTask(ctx context.Context, _ string, f func(context.Context), _ ...TaskOption) error {
f(ctx)
func (*SyncRunner) RunTask(_, _ string, f func(), _ ...TaskOption) error {
f()
return nil
}

Expand Down
35 changes: 30 additions & 5 deletions pkg/ratelimit/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package ratelimit

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand All @@ -34,9 +34,9 @@ func TestConcurrentRunner(t *testing.T) {
time.Sleep(50 * time.Millisecond)
wg.Add(1)
err := runner.RunTask(
context.Background(),
fmt.Sprintf("%d-%s", i, "test1"),
"test1",
func(context.Context) {
func() {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
},
Expand All @@ -54,9 +54,9 @@ func TestConcurrentRunner(t *testing.T) {
for i := 0; i < 10; i++ {
wg.Add(1)
err := runner.RunTask(
context.Background(),
fmt.Sprintf("%d-%s", i, "test2"),
"test2",
func(context.Context) {
func() {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
},
Expand All @@ -74,4 +74,29 @@ func TestConcurrentRunner(t *testing.T) {
}
wg.Wait()
})

t.Run("DuplicatedTask", func(t *testing.T) {
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Minute)
runner.Start()
defer runner.Stop()
for i := 1; i < 11; i++ {
regionID := uint64(i)
if i == 10 {
regionID = 4
}
err := runner.RunTask(
fmt.Sprintf("%d-%s", regionID, "test3"),
"test3",
func() {
time.Sleep(time.Second)
},
)
require.NoError(t, err)
time.Sleep(1 * time.Millisecond)
}

updatedSubmitted := runner.pendingTasks[1].submittedAt
lastSubmitted := runner.pendingTasks[len(runner.pendingTasks)-1].submittedAt
require.Greater(t, updatedSubmitted, lastSubmitted)
})
}
25 changes: 13 additions & 12 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// Save to cache if meta or leader is updated, or contains any down/pending peer.
saveKV, saveCache, needSync, retained := regionGuide(ctx, region, origin)
tracer.OnRegionGuideFinished()
regionID := region.GetID()
if !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
Expand All @@ -1046,9 +1047,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// We need to think of a better way to reduce this part of the cost in the future.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.MiscRunner.RunTask(
ctx.Context,
fmt.Sprintf("%d-%s", regionID, ratelimit.ObserveRegionStatsAsync),
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
func() {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
Expand All @@ -1058,9 +1059,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree),
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(true),
Expand All @@ -1086,9 +1087,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
return err
}
ctx.TaskRunner.RunTask(
ctx,
fmt.Sprintf("%d-%s", regionID, ratelimit.UpdateSubTree),
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(retained),
Expand All @@ -1097,9 +1098,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio

if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.MiscRunner.RunTask(
ctx.Context,
fmt.Sprintf("%d-%s", regionID, ratelimit.HandleOverlaps),
ratelimit.HandleOverlaps,
func(_ context.Context) {
func() {
cluster.HandleOverlaps(c, overlaps)
},
)
Expand All @@ -1110,9 +1111,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
tracer.OnSaveCacheFinished()
// handle region stats
ctx.MiscRunner.RunTask(
ctx.Context,
fmt.Sprintf("%d-%s", regionID, ratelimit.CollectRegionStatsAsync),
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
func() {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
Expand All @@ -1124,9 +1125,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if c.storage != nil {
if saveKV {
ctx.MiscRunner.RunTask(
ctx.Context,
fmt.Sprintf("%d-%s", regionID, ratelimit.SaveRegionToKV),
ratelimit.SaveRegionToKV,
func(_ context.Context) {
func() {
// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
// Not successfully saved to storage is not fatal, it only leads to longer warm-up
Expand Down