Skip to content

Commit

Permalink
add more comments and rename
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 21, 2024
1 parent 62d98f4 commit ed08415
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 53 deletions.
18 changes: 9 additions & 9 deletions pkg/core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@ import (
// MetaProcessContext is a context for meta process.
type MetaProcessContext struct {
context.Context
Tracer RegionHeartbeatProcessTracer
TaskRunner ratelimit.Runner
StatisticsRunner ratelimit.Runner
LogRunner ratelimit.Runner
Tracer RegionHeartbeatProcessTracer
TaskRunner ratelimit.Runner
MiscRunner ratelimit.Runner
LogRunner ratelimit.Runner
}

// NewMetaProcessContext creates a new MetaProcessContext.
// used in tests, can be changed if no need to test concurrency.
func ContextTODO() *MetaProcessContext {
return &MetaProcessContext{
Context: context.TODO(),
Tracer: NewNoopHeartbeatProcessTracer(),
TaskRunner: ratelimit.NewSyncRunner(),
StatisticsRunner: ratelimit.NewSyncRunner(),
LogRunner: ratelimit.NewSyncRunner(),
Context: context.TODO(),
Tracer: NewNoopHeartbeatProcessTracer(),
TaskRunner: ratelimit.NewSyncRunner(),
MiscRunner: ratelimit.NewSyncRunner(),
LogRunner: ratelimit.NewSyncRunner(),
// Limit default is nil
}
}
35 changes: 19 additions & 16 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ type Cluster struct {
clusterID uint64
running atomic.Bool

heartbeatRunner ratelimit.Runner
statisticsRunner ratelimit.Runner
logRunner ratelimit.Runner
// heartbeatRunner is used to process the subtree update task asynchronously.
heartbeatRunner ratelimit.Runner
// miscRunner is used to process the statistics and persistent tasks asynchronously.
miscRunner ratelimit.Runner
// logRunner is used to process the log asynchronously.
logRunner ratelimit.Runner
}

const (
Expand Down Expand Up @@ -95,9 +98,9 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig,
clusterID: clusterID,
checkMembershipCh: checkMembershipCh,

heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
statisticsRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel())
Expand Down Expand Up @@ -535,7 +538,7 @@ func (c *Cluster) StartBackgroundJobs() {
go c.runCoordinator()
go c.runMetricsCollectionJob()
c.heartbeatRunner.Start()
c.statisticsRunner.Start()
c.miscRunner.Start()
c.logRunner.Start()
c.running.Store(true)
}
Expand All @@ -548,7 +551,7 @@ func (c *Cluster) StopBackgroundJobs() {
c.running.Store(false)
c.coordinator.Stop()
c.heartbeatRunner.Stop()
c.statisticsRunner.Stop()
c.miscRunner.Stop()
c.logRunner.Stop()
c.cancel()
c.wg.Wait()
Expand All @@ -565,19 +568,19 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics {
tracer = core.NewHeartbeatProcessTracer()
}
var taskRunner, statisticsRunner, logRunner ratelimit.Runner
taskRunner, statisticsRunner, logRunner = syncRunner, syncRunner, syncRunner
var taskRunner, miscRunner, logRunner ratelimit.Runner
taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner
if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner {
taskRunner = c.heartbeatRunner
statisticsRunner = c.statisticsRunner
miscRunner = c.miscRunner
logRunner = c.logRunner
}
ctx := &core.MetaProcessContext{
Context: c.ctx,
Tracer: tracer,
TaskRunner: taskRunner,
StatisticsRunner: statisticsRunner,
LogRunner: logRunner,
Context: c.ctx,
Tracer: tracer,
TaskRunner: taskRunner,
MiscRunner: miscRunner,
LogRunner: logRunner,
}
tracer.Begin()
if err := c.processRegionHeartbeat(ctx, region); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type Task struct {
submittedAt time.Time
f func(context.Context)
name string
retained bool
// retained indicates whether the task should be dropped if the task queue exceeds maxPendingDuration.
retained bool
}

// ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum.
Expand Down
41 changes: 22 additions & 19 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,12 @@ type RaftCluster struct {
independentServices sync.Map
hbstreams *hbstream.HeartbeatStreams

heartbeatRunner ratelimit.Runner
statisticsRunner ratelimit.Runner
logRunner ratelimit.Runner
// heartbeatRunner is used to process the subtree update task asynchronously.
heartbeatRunner ratelimit.Runner
// miscRunner is used to process the statistics and persistent tasks asynchronously.
miscRunner ratelimit.Runner
// logRunner is used to process the log asynchronously.
logRunner ratelimit.Runner
}

// Status saves some state information.
Expand All @@ -193,16 +196,16 @@ type Status struct {
func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client,
httpClient *http.Client) *RaftCluster {
return &RaftCluster{
serverCtx: ctx,
clusterID: clusterID,
regionSyncer: regionSyncer,
httpClient: httpClient,
etcdClient: etcdClient,
core: basicCluster,
storage: storage,
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
statisticsRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
serverCtx: ctx,
clusterID: clusterID,
regionSyncer: regionSyncer,
httpClient: httpClient,
etcdClient: etcdClient,
core: basicCluster,
storage: storage,
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
}
}

Expand Down Expand Up @@ -361,7 +364,7 @@ func (c *RaftCluster) Start(s Server) error {

c.running = true
c.heartbeatRunner.Start()
c.statisticsRunner.Start()
c.miscRunner.Start()
c.logRunner.Start()
return nil
}
Expand Down Expand Up @@ -757,7 +760,7 @@ func (c *RaftCluster) Stop() {
c.stopSchedulingJobs()
}
c.heartbeatRunner.Stop()
c.statisticsRunner.Stop()
c.miscRunner.Stop()
c.logRunner.Stop()
c.Unlock()

Expand Down Expand Up @@ -1044,7 +1047,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.StatisticsRunner.RunTask(
ctx.MiscRunner.RunTask(
ctx.Context,
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
Expand Down Expand Up @@ -1095,7 +1098,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
tracer.OnUpdateSubTreeFinished()

if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.StatisticsRunner.RunTask(
ctx.MiscRunner.RunTask(
ctx.Context,
ratelimit.HandleOverlaps,
func(_ context.Context) {
Expand All @@ -1108,7 +1111,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio

tracer.OnSaveCacheFinished()
// handle region stats
ctx.StatisticsRunner.RunTask(
ctx.MiscRunner.RunTask(
ctx.Context,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
Expand All @@ -1122,7 +1125,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
tracer.OnCollectRegionStatsFinished()
if c.storage != nil {
if saveKV {
ctx.StatisticsRunner.RunTask(
ctx.MiscRunner.RunTask(
ctx.Context,
ratelimit.SaveRegionToKV,
func(_ context.Context) {
Expand Down
16 changes: 8 additions & 8 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,20 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
tracer = core.NewHeartbeatProcessTracer()
}
defer tracer.Release()
var taskRunner, statisticsRunner, logRunner ratelimit.Runner
taskRunner, statisticsRunner, logRunner = syncRunner, syncRunner, syncRunner
var taskRunner, miscRunner, logRunner ratelimit.Runner
taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner
if c.GetScheduleConfig().EnableHeartbeatConcurrentRunner {
taskRunner = c.heartbeatRunner
statisticsRunner = c.statisticsRunner
miscRunner = c.miscRunner
logRunner = c.logRunner
}

ctx := &core.MetaProcessContext{
Context: c.ctx,
Tracer: tracer,
TaskRunner: taskRunner,
StatisticsRunner: statisticsRunner,
LogRunner: logRunner,
Context: c.ctx,
Tracer: tracer,
TaskRunner: taskRunner,
MiscRunner: miscRunner,
LogRunner: logRunner,
}
tracer.Begin()
if err := c.processRegionHeartbeat(ctx, region); err != nil {
Expand Down

0 comments on commit ed08415

Please sign in to comment.