diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index b5e42b40ac8..9fd420f76c3 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -47,6 +47,7 @@ type Cluster struct { checkMembershipCh chan struct{} apiServerLeader atomic.Value clusterID uint64 + running atomic.Bool } const regionLabelGCInterval = time.Hour @@ -203,6 +204,14 @@ func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { return c.apiServerLeader.CompareAndSwap(old, new) } +func trySend(notifier chan struct{}) { + select { + case notifier <- struct{}{}: + // If the channel is not empty, it means the check is triggered. + default: + } +} + // updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion. func (c *Cluster) updateScheduler() { defer logutil.LogPanic() @@ -213,8 +222,11 @@ func (c *Cluster) updateScheduler() { // Establish a notifier to listen the schedulers updating. notifier := make(chan struct{}, 1) // Make sure the check will be triggered once later. - notifier <- struct{}{} + trySend(notifier) c.persistConfig.SetSchedulersUpdatingNotifier(notifier) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { select { case <-c.ctx.Done(): @@ -224,6 +236,18 @@ func (c *Cluster) updateScheduler() { // This is triggered by the watcher when the schedulers are updated. } + if !c.running.Load() { + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop listening the schedulers updating notifier") + return + case <-ticker.C: + // retry + trySend(notifier) + continue + } + } + log.Info("schedulers updating notifier is triggered, try to update the scheduler") var ( schedulersController = c.coordinator.GetSchedulersController() @@ -394,15 +418,29 @@ func (c *Cluster) runUpdateStoreStats() { } } +// runCoordinator runs the main scheduling loop. +func (c *Cluster) runCoordinator() { + defer logutil.LogPanic() + defer c.wg.Done() + c.coordinator.RunUntilStop() +} + // StartBackgroundJobs starts background jobs. func (c *Cluster) StartBackgroundJobs() { - c.wg.Add(2) + c.wg.Add(3) go c.updateScheduler() go c.runUpdateStoreStats() + go c.runCoordinator() + c.running.Store(true) } // StopBackgroundJobs stops background jobs. func (c *Cluster) StopBackgroundJobs() { + if !c.running.Load() { + return + } + c.running.Store(false) + c.coordinator.Stop() c.cancel() c.wg.Wait() } diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 4ec2f2731e7..62a04599e8f 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -456,16 +456,12 @@ func (s *Server) startCluster(context.Context) error { } s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController()) s.cluster.StartBackgroundJobs() - go s.GetCoordinator().RunUntilStop() return nil } func (s *Server) stopCluster() { - s.GetCoordinator().Stop() s.cluster.StopBackgroundJobs() - s.ruleWatcher.Close() - s.configWatcher.Close() - s.metaWatcher.Close() + s.stopWatcher() } func (s *Server) startWatcher() (err error) { @@ -481,6 +477,12 @@ func (s *Server) startWatcher() (err error) { return err } +func (s *Server) stopWatcher() { + s.ruleWatcher.Close() + s.configWatcher.Close() + s.metaWatcher.Close() +} + // GetPersistConfig returns the persist config. // It's used to test. func (s *Server) GetPersistConfig() *config.PersistConfig { diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 46b4947b6cd..d58a78ca82f 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -68,6 +68,8 @@ func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage e // Wait waits on all schedulers to exit. func (c *Controller) Wait() { + c.Lock() + defer c.Unlock() c.wg.Wait() }