From ed084151bc714bddf9863acf73092c9a8edc8867 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 21 May 2024 11:29:29 +0800 Subject: [PATCH] add more comments and rename Signed-off-by: Ryan Leung --- pkg/core/context.go | 18 ++++++------ pkg/mcs/scheduling/server/cluster.go | 35 +++++++++++++----------- pkg/ratelimit/runner.go | 3 +- server/cluster/cluster.go | 41 +++++++++++++++------------- server/cluster/cluster_worker.go | 16 +++++------ 5 files changed, 60 insertions(+), 53 deletions(-) diff --git a/pkg/core/context.go b/pkg/core/context.go index 5ed38d90eca..7410f8394c2 100644 --- a/pkg/core/context.go +++ b/pkg/core/context.go @@ -23,21 +23,21 @@ import ( // MetaProcessContext is a context for meta process. type MetaProcessContext struct { context.Context - Tracer RegionHeartbeatProcessTracer - TaskRunner ratelimit.Runner - StatisticsRunner ratelimit.Runner - LogRunner ratelimit.Runner + Tracer RegionHeartbeatProcessTracer + TaskRunner ratelimit.Runner + MiscRunner ratelimit.Runner + LogRunner ratelimit.Runner } // NewMetaProcessContext creates a new MetaProcessContext. // used in tests, can be changed if no need to test concurrency. func ContextTODO() *MetaProcessContext { return &MetaProcessContext{ - Context: context.TODO(), - Tracer: NewNoopHeartbeatProcessTracer(), - TaskRunner: ratelimit.NewSyncRunner(), - StatisticsRunner: ratelimit.NewSyncRunner(), - LogRunner: ratelimit.NewSyncRunner(), + Context: context.TODO(), + Tracer: NewNoopHeartbeatProcessTracer(), + TaskRunner: ratelimit.NewSyncRunner(), + MiscRunner: ratelimit.NewSyncRunner(), + LogRunner: ratelimit.NewSyncRunner(), // Limit default is nil } } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index f5220adfe2c..c6c365b03ad 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -54,9 +54,12 @@ type Cluster struct { clusterID uint64 running atomic.Bool - heartbeatRunner ratelimit.Runner - statisticsRunner ratelimit.Runner - logRunner ratelimit.Runner + // heartbeatRunner is used to process the subtree update task asynchronously. + heartbeatRunner ratelimit.Runner + // miscRunner is used to process the statistics and persistent tasks asynchronously. + miscRunner ratelimit.Runner + // logRunner is used to process the log asynchronously. + logRunner ratelimit.Runner } const ( @@ -95,9 +98,9 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, clusterID: clusterID, checkMembershipCh: checkMembershipCh, - heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - statisticsRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) @@ -535,7 +538,7 @@ func (c *Cluster) StartBackgroundJobs() { go c.runCoordinator() go c.runMetricsCollectionJob() c.heartbeatRunner.Start() - c.statisticsRunner.Start() + c.miscRunner.Start() c.logRunner.Start() c.running.Store(true) } @@ -548,7 +551,7 @@ func (c *Cluster) StopBackgroundJobs() { c.running.Store(false) c.coordinator.Stop() c.heartbeatRunner.Stop() - c.statisticsRunner.Stop() + c.miscRunner.Stop() c.logRunner.Stop() c.cancel() c.wg.Wait() @@ -565,19 +568,19 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { tracer = core.NewHeartbeatProcessTracer() } - var taskRunner, statisticsRunner, logRunner ratelimit.Runner - taskRunner, statisticsRunner, logRunner = syncRunner, syncRunner, syncRunner + var taskRunner, miscRunner, logRunner ratelimit.Runner + taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner { taskRunner = c.heartbeatRunner - statisticsRunner = c.statisticsRunner + miscRunner = c.miscRunner logRunner = c.logRunner } ctx := &core.MetaProcessContext{ - Context: c.ctx, - Tracer: tracer, - TaskRunner: taskRunner, - StatisticsRunner: statisticsRunner, - LogRunner: logRunner, + Context: c.ctx, + Tracer: tracer, + TaskRunner: taskRunner, + MiscRunner: miscRunner, + LogRunner: logRunner, } tracer.Begin() if err := c.processRegionHeartbeat(ctx, region); err != nil { diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 5ac4d408a03..17a45067f3d 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -53,7 +53,8 @@ type Task struct { submittedAt time.Time f func(context.Context) name string - retained bool + // retained indicates whether the task should be dropped if the task queue exceeds maxPendingDuration. + retained bool } // ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 5a0e78f6609..148b43541a2 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -174,9 +174,12 @@ type RaftCluster struct { independentServices sync.Map hbstreams *hbstream.HeartbeatStreams - heartbeatRunner ratelimit.Runner - statisticsRunner ratelimit.Runner - logRunner ratelimit.Runner + // heartbeatRunner is used to process the subtree update task asynchronously. + heartbeatRunner ratelimit.Runner + // miscRunner is used to process the statistics and persistent tasks asynchronously. + miscRunner ratelimit.Runner + // logRunner is used to process the log asynchronously. + logRunner ratelimit.Runner } // Status saves some state information. @@ -193,16 +196,16 @@ type Status struct { func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, httpClient *http.Client) *RaftCluster { return &RaftCluster{ - serverCtx: ctx, - clusterID: clusterID, - regionSyncer: regionSyncer, - httpClient: httpClient, - etcdClient: etcdClient, - core: basicCluster, - storage: storage, - heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - statisticsRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + serverCtx: ctx, + clusterID: clusterID, + regionSyncer: regionSyncer, + httpClient: httpClient, + etcdClient: etcdClient, + core: basicCluster, + storage: storage, + heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } } @@ -361,7 +364,7 @@ func (c *RaftCluster) Start(s Server) error { c.running = true c.heartbeatRunner.Start() - c.statisticsRunner.Start() + c.miscRunner.Start() c.logRunner.Start() return nil } @@ -757,7 +760,7 @@ func (c *RaftCluster) Stop() { c.stopSchedulingJobs() } c.heartbeatRunner.Stop() - c.statisticsRunner.Stop() + c.miscRunner.Stop() c.logRunner.Stop() c.Unlock() @@ -1044,7 +1047,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - ctx.StatisticsRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.ObserveRegionStatsAsync, func(_ context.Context) { @@ -1095,7 +1098,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnUpdateSubTreeFinished() if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - ctx.StatisticsRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.HandleOverlaps, func(_ context.Context) { @@ -1108,7 +1111,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnSaveCacheFinished() // handle region stats - ctx.StatisticsRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.CollectRegionStatsAsync, func(_ context.Context) { @@ -1122,7 +1125,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnCollectRegionStatsFinished() if c.storage != nil { if saveKV { - ctx.StatisticsRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.SaveRegionToKV, func(_ context.Context) { diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index a8d44dbec3e..39720e7d765 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -40,20 +40,20 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { tracer = core.NewHeartbeatProcessTracer() } defer tracer.Release() - var taskRunner, statisticsRunner, logRunner ratelimit.Runner - taskRunner, statisticsRunner, logRunner = syncRunner, syncRunner, syncRunner + var taskRunner, miscRunner, logRunner ratelimit.Runner + taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner if c.GetScheduleConfig().EnableHeartbeatConcurrentRunner { taskRunner = c.heartbeatRunner - statisticsRunner = c.statisticsRunner + miscRunner = c.miscRunner logRunner = c.logRunner } ctx := &core.MetaProcessContext{ - Context: c.ctx, - Tracer: tracer, - TaskRunner: taskRunner, - StatisticsRunner: statisticsRunner, - LogRunner: logRunner, + Context: c.ctx, + Tracer: tracer, + TaskRunner: taskRunner, + MiscRunner: miscRunner, + LogRunner: logRunner, } tracer.Begin() if err := c.processRegionHeartbeat(ctx, region); err != nil {