Skip to content

Commit

Permalink
distinguish the task priority
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 13, 2024
1 parent 2278609 commit a342352
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 73 deletions.
12 changes: 6 additions & 6 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func (r *RegionInfo) isRegionRecreated() bool {

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool)
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, metaUpdated bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -742,7 +742,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, metaUpdated bool) {
logRunner := ctx.LogRunner
// print log asynchronously
debug, info := d, i
Expand Down Expand Up @@ -772,7 +772,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
saveKV, saveCache = true, true
saveKV, saveCache, metaUpdated = true, true, true
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
Expand All @@ -785,7 +785,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-version", r.GetVersion()),
)
}
saveKV, saveCache = true, true
saveKV, saveCache, metaUpdated = true, true, true
}
if r.GetConfVer() > o.GetConfVer() {
if log.GetLevel() <= zap.InfoLevel {
Expand All @@ -796,7 +796,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-confver", r.GetConfVer()),
)
}
saveKV, saveCache = true, true
saveKV, saveCache, metaUpdated = true, true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel {
Expand All @@ -807,7 +807,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
)
}
// We check it first and do not return because the log is important for us to investigate,
saveCache, needSync = true, true
saveCache, needSync, metaUpdated = true, true, true
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, needSync := RegionGuide(ContextTODO(), regionA, regionB)
_, _, needSync, _ := RegionGuide(ContextTODO(), regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}
Expand Down Expand Up @@ -980,7 +980,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) {
regionsOld.AtomicCheckAndPutRegion(ctx, regionPendingItemA)
re.Equal(int32(2), regionPendingItemA.GetRef())
// check new item
saveKV, saveCache, needSync := regionGuide(ctx, regionItemA, regionPendingItemA)
saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemA, regionPendingItemA)
re.True(needSync)
re.True(saveCache)
re.False(saveKV)
Expand Down Expand Up @@ -1009,7 +1009,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) {
re.Equal(int32(1), regionPendingItemB.GetRef())

// heartbeat again, no need updates root tree
saveKV, saveCache, needSync := regionGuide(ctx, regionItemB, regionItemB)
saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemB, regionItemB)
re.False(needSync)
re.False(saveCache)
re.False(saveKV)
Expand Down
27 changes: 26 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
_, saveCache, _ := core.GenerateRegionGuideFunc(true)(ctx, region, origin)
_, saveCache, _, metaUpdated := core.GenerateRegionGuideFunc(true)(ctx, region, origin)

if !saveCache {
// Due to some config changes need to update the region stats as well,
Expand All @@ -617,6 +617,11 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
cluster.Collect(c, region, hasRegionStats)
}
},
<<<<<<< HEAD
=======
ratelimit.WithTaskName(ratelimit.ObserveRegionStatsAsync),
ratelimit.WithMetaUpdated(metaUpdated),
>>>>>>> distinguish the task priority
)
}
// region is not updated to the subtree.
Expand All @@ -627,6 +632,11 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
<<<<<<< HEAD
=======
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
ratelimit.WithMetaUpdated(metaUpdated),
>>>>>>> distinguish the task priority
)
}
return nil
Expand All @@ -650,6 +660,11 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
<<<<<<< HEAD
=======
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
ratelimit.WithMetaUpdated(metaUpdated),
>>>>>>> distinguish the task priority
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
Expand All @@ -658,6 +673,11 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
<<<<<<< HEAD
=======
ratelimit.WithTaskName(ratelimit.HandleOverlaps),
ratelimit.WithMetaUpdated(metaUpdated),
>>>>>>> distinguish the task priority
)
}
tracer.OnSaveCacheFinished()
Expand All @@ -668,6 +688,11 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
func(_ context.Context) {
cluster.Collect(c, region, hasRegionStats)
},
<<<<<<< HEAD
=======
ratelimit.WithTaskName(ratelimit.CollectRegionStatsAsync),
ratelimit.WithMetaUpdated(metaUpdated),
>>>>>>> distinguish the task priority
)
tracer.OnCollectRegionStatsFinished()
return nil
Expand Down
32 changes: 24 additions & 8 deletions pkg/ratelimit/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,41 @@ var (
Name: "runner_task_max_waiting_duration_seconds",
Help: "The duration of tasks waiting in the runner.",
}, []string{nameStr})

RunnerTaskPendingTasks = prometheus.NewGaugeVec(
RunnerPendingTasks = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_pending_tasks",
Name: "runner_pending_tasks",
Help: "The number of pending tasks in the runner.",
}, []string{nameStr, taskStr})
RunnerTaskFailedTasks = prometheus.NewCounterVec(
RunnerFailedTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_failed_tasks_total",
Name: "runner_failed_tasks_total",
Help: "The number of failed tasks in the runner.",
}, []string{nameStr})
}, []string{nameStr, taskStr})
RunnerSucceededTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_success_tasks_total",
Help: "The number of tasks in the runner.",
}, []string{nameStr, taskStr})
RunnerTaskExecutionDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_execution_duration_seconds",
Help: "Bucketed histogram of processing time (s) of finished tasks.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
}, []string{nameStr, taskStr})
)

func init() {
prometheus.MustRegister(RunnerTaskMaxWaitingDuration)
prometheus.MustRegister(RunnerTaskPendingTasks)
prometheus.MustRegister(RunnerTaskFailedTasks)
prometheus.MustRegister(RunnerPendingTasks)
prometheus.MustRegister(RunnerFailedTasks)
prometheus.MustRegister(RunnerTaskExecutionDuration)
prometheus.MustRegister(RunnerSucceededTasks)
}
118 changes: 72 additions & 46 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,40 +58,48 @@ var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded")

// ConcurrentRunner is a simple task runner that limits the number of concurrent tasks.
type ConcurrentRunner struct {
name string
limiter *ConcurrencyLimiter
maxPendingDuration time.Duration
taskChan chan *Task
pendingTasks []*Task
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
pendingTaskCount map[string]int64
failedTaskCount prometheus.Counter
maxWaitingDuration prometheus.Gauge
name string
limiter *ConcurrencyLimiter
maxPendingDuration time.Duration
taskChan chan *Task
pendingNormalPriorityTasks []*Task
pendingHighPriorityTasks []*Task
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
pendingTaskCount map[string]int64
maxWaitingDuration prometheus.Gauge
}

// NewConcurrentRunner creates a new ConcurrentRunner.
func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDuration time.Duration) *ConcurrentRunner {
s := &ConcurrentRunner{
name: name,
limiter: limiter,
maxPendingDuration: maxPendingDuration,
taskChan: make(chan *Task),
pendingTasks: make([]*Task, 0, initialCapacity),
failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name),
pendingTaskCount: make(map[string]int64),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
name: name,
limiter: limiter,
maxPendingDuration: maxPendingDuration,
taskChan: make(chan *Task),
pendingNormalPriorityTasks: make([]*Task, 0, initialCapacity),
pendingHighPriorityTasks: make([]*Task, 0, initialCapacity),
pendingTaskCount: make(map[string]int64),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
}
return s
}

// TaskOpts is the options for RunTask.
type TaskOpts struct{}
type TaskOpts struct {
// IsMetaUpdated indicates whether the meta is updated.
MetaUpdated bool
}

// TaskOption configures TaskOp
type TaskOption func(opts *TaskOpts)

// WithMetaUpdated specify whether the meta is updated.
func WithMetaUpdated(metaUpdated bool) TaskOption {
return func(opts *TaskOpts) { opts.MetaUpdated = metaUpdated }
}

// Start starts the runner.
func (cr *ConcurrentRunner) Start() {
cr.stopChan = make(chan struct{})
Expand All @@ -107,24 +115,25 @@ func (cr *ConcurrentRunner) Start() {
if err != nil {
continue
}
go cr.run(task.ctx, task.f, token)
go cr.run(task, token)
} else {
go cr.run(task.ctx, task.f, nil)
go cr.run(task, nil)
}
case <-cr.stopChan:
cr.pendingMu.Lock()
cr.pendingTasks = make([]*Task, 0, initialCapacity)
cr.pendingNormalPriorityTasks = make([]*Task, 0, initialCapacity)
cr.pendingHighPriorityTasks = make([]*Task, 0, initialCapacity)
cr.pendingMu.Unlock()
log.Info("stopping async task runner", zap.String("name", cr.name))
return
case <-ticker.C:
maxDuration := time.Duration(0)
cr.pendingMu.Lock()
if len(cr.pendingTasks) > 0 {
maxDuration = time.Since(cr.pendingTasks[0].submittedAt)
if len(cr.pendingNormalPriorityTasks) > 0 {
maxDuration = time.Since(cr.pendingNormalPriorityTasks[0].submittedAt)
}
for name, cnt := range cr.pendingTaskCount {
RunnerTaskPendingTasks.WithLabelValues(cr.name, name).Set(float64(cnt))
RunnerPendingTasks.WithLabelValues(cr.name, name).Set(float64(cnt))
}
cr.pendingMu.Unlock()
cr.maxWaitingDuration.Set(maxDuration.Seconds())
Expand All @@ -133,8 +142,13 @@ func (cr *ConcurrentRunner) Start() {
}()
}

func (cr *ConcurrentRunner) run(ctx context.Context, task func(context.Context), token *TaskToken) {
task(ctx)
func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) {
start := time.Now()
defer func() {
RunnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds())
RunnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc()
}()
task.f(task.ctx)
if token != nil {
token.Release()
cr.processPendingTasks()
Expand All @@ -144,16 +158,25 @@ func (cr *ConcurrentRunner) run(ctx context.Context, task func(context.Context),
func (cr *ConcurrentRunner) processPendingTasks() {
cr.pendingMu.Lock()
defer cr.pendingMu.Unlock()
for len(cr.pendingTasks) > 0 {
task := cr.pendingTasks[0]
if len(cr.pendingHighPriorityTasks) > 0 {
task := cr.pendingHighPriorityTasks[0]
select {
case cr.taskChan <- task:
cr.pendingTasks = cr.pendingTasks[1:]
cr.pendingHighPriorityTasks = cr.pendingHighPriorityTasks[1:]
cr.pendingTaskCount[task.name]--
return
default:
return
}
return
}
if len(cr.pendingNormalPriorityTasks) > 0 {
task := cr.pendingNormalPriorityTasks[0]
select {
case cr.taskChan <- task:
cr.pendingNormalPriorityTasks = cr.pendingNormalPriorityTasks[1:]
cr.pendingTaskCount[task.name]--
default:
}
return
}
}

Expand All @@ -177,22 +200,25 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con
}

cr.processPendingTasks()
select {
case cr.taskChan <- task:
default:
cr.pendingMu.Lock()
defer cr.pendingMu.Unlock()
if len(cr.pendingTasks) > 0 {
maxWait := time.Since(cr.pendingTasks[0].submittedAt)
if maxWait > cr.maxPendingDuration {
cr.failedTaskCount.Inc()
return ErrMaxWaitingTasksExceeded
}
}
cr.pendingMu.Lock()
defer cr.pendingMu.Unlock()
if task.opts.MetaUpdated {
task.submittedAt = time.Now()
cr.pendingTasks = append(cr.pendingTasks, task)
cr.pendingHighPriorityTasks = append(cr.pendingNormalPriorityTasks, task)
cr.pendingTaskCount[task.name]++
return nil
}

if len(cr.pendingNormalPriorityTasks) > 0 {
maxWait := time.Since(cr.pendingNormalPriorityTasks[0].submittedAt)
if maxWait > cr.maxPendingDuration {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
task.submittedAt = time.Now()
cr.pendingNormalPriorityTasks = append(cr.pendingNormalPriorityTasks, task)
cr.pendingTaskCount[task.name]++
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
Tracer: core.NewNoopHeartbeatProcessTracer(),
// no limit for followers.
}
saveKV, _, _ := regionGuide(ctx, region, origin)
saveKV, _, _, _ := regionGuide(ctx, region, origin)
overlaps := bc.PutRegion(region)

if hasBuckets {
Expand Down
Loading

0 comments on commit a342352

Please sign in to comment.