Skip to content

Commit

Permalink
Cherry-pick the changes
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed May 22, 2024
1 parent 5a3d21d commit b9a1e3e
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 32 deletions.
7 changes: 7 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,3 +568,10 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
func (c *Cluster) IsPrepared() bool {
return c.coordinator.GetPrepareChecker().IsPrepared()
}

// IsSchedulingHalted returns whether the scheduling is halted.
// Currently, the microservice scheduling is halted when:
// - The `HaltScheduling` persist option is set to true.
func (c *Cluster) IsSchedulingHalted() bool {
return c.persistConfig.IsSchedulingHalted()
}
4 changes: 4 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,10 @@ func (o *PersistConfig) SetSplitMergeInterval(splitMergeInterval time.Duration)
o.SetScheduleConfig(v)
}

// SetSchedulingAllowanceStatus sets the scheduling allowance status to help distinguish the source of the halt.
// TODO: support this metrics for the scheduling service in the future.
func (*PersistConfig) SetSchedulingAllowanceStatus(bool, string) {}

// SetHaltScheduling set HaltScheduling.
func (o *PersistConfig) SetHaltScheduling(halt bool, source string) {
v := o.GetScheduleConfig().Clone()
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func IsSchedulerRegistered(name string) bool {
type SchedulerConfigProvider interface {
SharedConfigProvider

IsSchedulingHalted() bool
SetSchedulingAllowanceStatus(bool, string)
GetStoresLimit() map[uint64]StoreLimitConfig

IsSchedulerDisabled(string) bool
Expand Down
6 changes: 1 addition & 5 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (c *Coordinator) PatrolRegions() {
log.Info("patrol regions has been stopped")
return
}
if c.isSchedulingHalted() {
if c.cluster.IsSchedulingHalted() {
continue
}

Expand Down Expand Up @@ -207,10 +207,6 @@ func (c *Coordinator) PatrolRegions() {
}
}

func (c *Coordinator) isSchedulingHalted() bool {
return c.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) {
regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit)
if len(regions) == 0 {
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type SchedulerCluster interface {
GetSchedulerConfig() sc.SchedulerConfigProvider
GetRegionLabeler() *labeler.RegionLabeler
GetStoreConfig() sc.StoreConfigProvider
IsSchedulingHalted() bool
}

// CheckerCluster is an aggregate interface that wraps multiple interfaces
Expand Down
12 changes: 2 additions & 10 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *Controller) CollectSchedulerMetrics() {
var allowScheduler float64
// If the scheduler is not allowed to schedule, it will disappear in Grafana panel.
// See issue #1341.
if !s.IsPaused() && !c.isSchedulingHalted() {
if !s.IsPaused() && !c.cluster.IsSchedulingHalted() {
allowScheduler = 1
}
schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler)
Expand All @@ -130,10 +130,6 @@ func (c *Controller) CollectSchedulerMetrics() {
ruleStatusGauge.WithLabelValues("group_count").Set(float64(groupCnt))
}

func (c *Controller) isSchedulingHalted() bool {
return c.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

// ResetSchedulerMetrics resets metrics of all schedulers.
func ResetSchedulerMetrics() {
schedulerStatusGauge.Reset()
Expand Down Expand Up @@ -518,7 +514,7 @@ func (s *ScheduleController) AllowSchedule(diagnosable bool) bool {
}
return false
}
if s.isSchedulingHalted() {
if s.cluster.IsSchedulingHalted() {
if diagnosable {
s.diagnosticRecorder.SetResultFromStatus(Halted)
}
Expand All @@ -533,10 +529,6 @@ func (s *ScheduleController) AllowSchedule(diagnosable bool) bool {
return true
}

func (s *ScheduleController) isSchedulingHalted() bool {
return s.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

// IsPaused returns if a scheduler is paused.
func (s *ScheduleController) IsPaused() bool {
delayUntil := atomic.LoadInt64(&s.delayUntil)
Expand Down
9 changes: 4 additions & 5 deletions pkg/unsaferecovery/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,12 +492,11 @@ func (u *Controller) GetStage() stage {
}

func (u *Controller) changeStage(stage stage) {
u.stage = stage
// Halt and resume the scheduling once the running state changed.
running := isRunning(stage)
if opt := u.cluster.GetSchedulerConfig(); opt.IsSchedulingHalted() != running {
opt.SetHaltScheduling(running, "online-unsafe-recovery")
// If the running stage changes, update the scheduling allowance status to add or remove "online-unsafe-recovery" halt.
if running := isRunning(stage); running != isRunning(u.stage) {
u.cluster.GetSchedulerConfig().SetSchedulingAllowanceStatus(running, "online-unsafe-recovery")
}
u.stage = stage

var output StageOutput
output.Time = time.Now().Format("2006-01-02 15:04:05.000")
Expand Down
8 changes: 8 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,14 @@ func (c *RaftCluster) RemoveSuspectRegion(id uint64) {
c.coordinator.GetCheckerController().RemoveSuspectRegion(id)
}

// IsSchedulingHalted returns whether the scheduling is halted.
// Currently, the PD scheduling is halted when:
// - The `HaltScheduling` persist option is set to true.
// - Online unsafe recovery is running.
func (c *RaftCluster) IsSchedulingHalted() bool {
return c.opt.IsSchedulingHalted() || c.unsafeRecoveryController.IsRunning()
}

// GetUnsafeRecoveryController returns the unsafe recovery controller.
func (c *RaftCluster) GetUnsafeRecoveryController() *unsaferecovery.Controller {
return c.unsafeRecoveryController
Expand Down
8 changes: 2 additions & 6 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {

// HandleAskSplit handles the split request.
func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) {
if c.isSchedulingHalted() {
if c.IsSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.opt.IsTikvRegionSplitEnabled() {
Expand Down Expand Up @@ -86,10 +86,6 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp
return split, nil
}

func (c *RaftCluster) isSchedulingHalted() bool {
return c.opt.IsSchedulingHalted()
}

// ValidRequestRegion is used to decide if the region is valid.
func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error {
startKey := reqRegion.GetStartKey()
Expand All @@ -109,7 +105,7 @@ func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error {

// HandleAskBatchSplit handles the batch split request.
func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) {
if c.isSchedulingHalted() {
if c.IsSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.opt.IsTikvRegionSplitEnabled() {
Expand Down
15 changes: 10 additions & 5 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,11 +973,8 @@ func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clien

var haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling")

// SetHaltScheduling set HaltScheduling.
func (o *PersistOptions) SetHaltScheduling(halt bool, source string) {
v := o.GetScheduleConfig().Clone()
v.HaltScheduling = halt
o.SetScheduleConfig(v)
// SetSchedulingAllowanceStatus sets the scheduling allowance status to help distinguish the source of the halt.
func (*PersistOptions) SetSchedulingAllowanceStatus(halt bool, source string) {
if halt {
haltSchedulingStatus.Set(1)
schedulingAllowanceStatusGauge.WithLabelValues(source).Set(1)
Expand All @@ -987,6 +984,14 @@ func (o *PersistOptions) SetHaltScheduling(halt bool, source string) {
}
}

// SetHaltScheduling set HaltScheduling.
func (o *PersistOptions) SetHaltScheduling(halt bool, source string) {
v := o.GetScheduleConfig().Clone()
v.HaltScheduling = halt
o.SetScheduleConfig(v)
o.SetSchedulingAllowanceStatus(halt, source)
}

// IsSchedulingHalted returns if PD scheduling is halted.
func (o *PersistOptions) IsSchedulingHalted() bool {
if o == nil {
Expand Down
2 changes: 2 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,8 @@ func (s *Server) SetScheduleConfig(cfg sc.ScheduleConfig) error {
errs.ZapError(err))
return err
}
// Update the scheduling halt status at the same time.
s.persistOptions.SetSchedulingAllowanceStatus(cfg.HaltScheduling, "manually")
log.Info("schedule config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
return nil
}
Expand Down

0 comments on commit b9a1e3e

Please sign in to comment.