diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 1b915b6874d..3a0b17100fd 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -615,3 +615,10 @@ func (c *Cluster) DropCacheAllRegion() { func (c *Cluster) DropCacheRegion(id uint64) { c.RemoveRegionIfExist(id) } + +// IsSchedulingHalted returns whether the scheduling is halted. +// Currently, the microservice scheduling is halted when: +// - The `HaltScheduling` persist option is set to true. +func (c *Cluster) IsSchedulingHalted() bool { + return c.persistConfig.IsSchedulingHalted() +} diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index b342724e74c..6d663b413b4 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -682,6 +682,10 @@ func (o *PersistConfig) SetSplitMergeInterval(splitMergeInterval time.Duration) o.SetScheduleConfig(v) } +// SetSchedulingAllowanceStatus sets the scheduling allowance status to help distinguish the source of the halt. +// TODO: support this metrics for the scheduling service in the future. +func (*PersistConfig) SetSchedulingAllowanceStatus(bool, string) {} + // SetHaltScheduling set HaltScheduling. func (o *PersistConfig) SetHaltScheduling(halt bool, source string) { v := o.GetScheduleConfig().Clone() diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index ebce73e3303..60d896c89d5 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -275,7 +275,7 @@ func (s *Service) AskBatchSplit(ctx context.Context, request *schedulingpb.AskBa }, nil } - if c.persistConfig.IsSchedulingHalted() { + if c.IsSchedulingHalted() { return nil, errs.ErrSchedulingIsHalted.FastGenByArgs() } if !c.persistConfig.IsTikvRegionSplitEnabled() { diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index 20c7f0dc2cf..90e489f86f3 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -46,7 +46,7 @@ func IsSchedulerRegistered(name string) bool { type SchedulerConfigProvider interface { SharedConfigProvider - IsSchedulingHalted() bool + SetSchedulingAllowanceStatus(bool, string) GetStoresLimit() map[uint64]StoreLimitConfig IsSchedulerDisabled(string) bool diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 35d9c2029a1..5ab38aad81d 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -178,7 +178,7 @@ func (c *Coordinator) PatrolRegions() { log.Info("patrol regions has been stopped") return } - if c.isSchedulingHalted() { + if c.cluster.IsSchedulingHalted() { continue } @@ -207,10 +207,6 @@ func (c *Coordinator) PatrolRegions() { } } -func (c *Coordinator) isSchedulingHalted() bool { - return c.cluster.GetSchedulerConfig().IsSchedulingHalted() -} - func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) { regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit) if len(regions) == 0 { diff --git a/pkg/schedule/core/cluster_informer.go b/pkg/schedule/core/cluster_informer.go index 63dacd0c30d..b97459d26ea 100644 --- a/pkg/schedule/core/cluster_informer.go +++ b/pkg/schedule/core/cluster_informer.go @@ -43,6 +43,7 @@ type SchedulerCluster interface { GetSchedulerConfig() sc.SchedulerConfigProvider GetRegionLabeler() *labeler.RegionLabeler GetStoreConfig() sc.StoreConfigProvider + IsSchedulingHalted() bool } // CheckerCluster is an aggregate interface that wraps multiple interfaces diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 5953ecac5e3..334a2f1199a 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -115,7 +115,7 @@ func (c *Controller) CollectSchedulerMetrics() { var allowScheduler float64 // If the scheduler is not allowed to schedule, it will disappear in Grafana panel. // See issue #1341. - if !s.IsPaused() && !c.isSchedulingHalted() { + if !s.IsPaused() && !c.cluster.IsSchedulingHalted() { allowScheduler = 1 } schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler) @@ -131,10 +131,6 @@ func (c *Controller) CollectSchedulerMetrics() { ruleStatusGauge.WithLabelValues("group_count").Set(float64(groupCnt)) } -func (c *Controller) isSchedulingHalted() bool { - return c.cluster.GetSchedulerConfig().IsSchedulingHalted() -} - // ResetSchedulerMetrics resets metrics of all schedulers. func ResetSchedulerMetrics() { schedulerStatusGauge.Reset() @@ -526,7 +522,7 @@ func (s *ScheduleController) AllowSchedule(diagnosable bool) bool { } return false } - if s.isSchedulingHalted() { + if s.cluster.IsSchedulingHalted() { if diagnosable { s.diagnosticRecorder.SetResultFromStatus(Halted) } @@ -541,10 +537,6 @@ func (s *ScheduleController) AllowSchedule(diagnosable bool) bool { return true } -func (s *ScheduleController) isSchedulingHalted() bool { - return s.cluster.GetSchedulerConfig().IsSchedulingHalted() -} - // IsPaused returns if a scheduler is paused. func (s *ScheduleController) IsPaused() bool { delayUntil := atomic.LoadInt64(&s.delayUntil) diff --git a/pkg/unsaferecovery/unsafe_recovery_controller.go b/pkg/unsaferecovery/unsafe_recovery_controller.go index 044dbd182e2..d2f6125c3f3 100644 --- a/pkg/unsaferecovery/unsafe_recovery_controller.go +++ b/pkg/unsaferecovery/unsafe_recovery_controller.go @@ -493,12 +493,11 @@ func (u *Controller) GetStage() stage { } func (u *Controller) changeStage(stage stage) { - u.stage = stage - // Halt and resume the scheduling once the running state changed. - running := isRunning(stage) - if opt := u.cluster.GetSchedulerConfig(); opt.IsSchedulingHalted() != running { - opt.SetHaltScheduling(running, "online-unsafe-recovery") + // If the running stage changes, update the scheduling allowance status to add or remove "online-unsafe-recovery" halt. + if running := isRunning(stage); running != isRunning(u.stage) { + u.cluster.GetSchedulerConfig().SetSchedulingAllowanceStatus(running, "online-unsafe-recovery") } + u.stage = stage var output StageOutput output.Time = time.Now().Format("2006-01-02 15:04:05.000") diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b11f1ff55a5..2269301f255 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -829,6 +829,14 @@ func (c *RaftCluster) SetPDServerConfig(cfg *config.PDServerConfig) { c.opt.SetPDServerConfig(cfg) } +// IsSchedulingHalted returns whether the scheduling is halted. +// Currently, the PD scheduling is halted when: +// - The `HaltScheduling` persist option is set to true. +// - Online unsafe recovery is running. +func (c *RaftCluster) IsSchedulingHalted() bool { + return c.opt.IsSchedulingHalted() || c.unsafeRecoveryController.IsRunning() +} + // GetUnsafeRecoveryController returns the unsafe recovery controller. func (c *RaftCluster) GetUnsafeRecoveryController() *unsaferecovery.Controller { return c.unsafeRecoveryController diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 5ae8fdc0396..ecf2f8dc01b 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -54,7 +54,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { // HandleAskSplit handles the split request. func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) { - if c.isSchedulingHalted() { + if c.IsSchedulingHalted() { return nil, errs.ErrSchedulingIsHalted.FastGenByArgs() } if !c.opt.IsTikvRegionSplitEnabled() { @@ -97,13 +97,9 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp return split, nil } -func (c *RaftCluster) isSchedulingHalted() bool { - return c.opt.IsSchedulingHalted() -} - // HandleAskBatchSplit handles the batch split request. func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { - if c.isSchedulingHalted() { + if c.IsSchedulingHalted() { return nil, errs.ErrSchedulingIsHalted.FastGenByArgs() } if !c.opt.IsTikvRegionSplitEnabled() { diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 144fa93e724..663806ccdd8 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -987,11 +987,8 @@ func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clien var haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling") -// SetHaltScheduling set HaltScheduling. -func (o *PersistOptions) SetHaltScheduling(halt bool, source string) { - v := o.GetScheduleConfig().Clone() - v.HaltScheduling = halt - o.SetScheduleConfig(v) +// SetSchedulingAllowanceStatus sets the scheduling allowance status to help distinguish the source of the halt. +func (*PersistOptions) SetSchedulingAllowanceStatus(halt bool, source string) { if halt { haltSchedulingStatus.Set(1) schedulingAllowanceStatusGauge.WithLabelValues(source).Set(1) @@ -1001,6 +998,14 @@ func (o *PersistOptions) SetHaltScheduling(halt bool, source string) { } } +// SetHaltScheduling set HaltScheduling. +func (o *PersistOptions) SetHaltScheduling(halt bool, source string) { + v := o.GetScheduleConfig().Clone() + v.HaltScheduling = halt + o.SetScheduleConfig(v) + o.SetSchedulingAllowanceStatus(halt, source) +} + // IsSchedulingHalted returns if PD scheduling is halted. func (o *PersistOptions) IsSchedulingHalted() bool { if o == nil { diff --git a/server/forward.go b/server/forward.go index e478f4869c9..c009d0dd32c 100644 --- a/server/forward.go +++ b/server/forward.go @@ -264,7 +264,7 @@ func forwardRegionHeartbeatToScheduling(rc *cluster.RaftCluster, forwardStream s return } // TODO: find a better way to halt scheduling immediately. - if rc.GetOpts().IsSchedulingHalted() { + if rc.IsSchedulingHalted() { continue } // The error types defined for schedulingpb and pdpb are different, so we need to convert them. diff --git a/server/server.go b/server/server.go index 410de96f63a..a548ac7de3a 100644 --- a/server/server.go +++ b/server/server.go @@ -1043,6 +1043,7 @@ func (s *Server) GetScheduleConfig() *sc.ScheduleConfig { } // SetScheduleConfig sets the balance config information. +// This function is exported to be used by the API. func (s *Server) SetScheduleConfig(cfg sc.ScheduleConfig) error { if err := cfg.Validate(); err != nil { return err @@ -1061,6 +1062,8 @@ func (s *Server) SetScheduleConfig(cfg sc.ScheduleConfig) error { errs.ZapError(err)) return err } + // Update the scheduling halt status at the same time. + s.persistOptions.SetSchedulingAllowanceStatus(cfg.HaltScheduling, "manually") log.Info("schedule config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) return nil }