From 7c2ef71aa3b1f95b46b52e5fe419f1b4ebd9a0bd Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 5 Dec 2024 14:06:50 +0800 Subject: [PATCH] fix Signed-off-by: Ryan Leung --- pkg/schedule/checker/checker_controller.go | 2 +- pkg/schedule/coordinator.go | 29 ++++++++++++++----- .../schedulers/scheduler_controller.go | 2 +- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 587cf2f80cf..09d951427ef 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -138,7 +138,7 @@ func (c *Controller) PatrolRegions() { case <-ticker.C: c.updateTickerIfNeeded(ticker) c.updatePatrolWorkersIfNeeded() - if !c.prepareChecker.Check(c.cluster.GetBasicCluster()) { + if !c.prepareChecker.IsPrepared() { continue } if c.cluster.IsSchedulingHalted() { diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 93aa1a13bec..1603bb01c34 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -43,7 +43,7 @@ import ( ) const ( - runSchedulerCheckInterval = 3 * time.Second + runPrepareCheckerInterval = 3 * time.Second maxLoadConfigRetries = 10 // pushOperatorTickInterval is the interval try to push the operator. pushOperatorTickInterval = 500 * time.Millisecond @@ -203,6 +203,25 @@ func (c *Coordinator) driveSlowNodeScheduler() { } } +func (c *Coordinator) runPrepareChecker() { + defer logutil.LogPanic() + defer c.wg.Done() + + ticker := time.NewTicker(runPrepareCheckerInterval) + failpoint.Inject("changeCoordinatorTicker", func() { + ticker.Reset(100 * time.Millisecond) + }) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + c.prepareChecker.Check(c.cluster.GetBasicCluster()) + } + } +} + // RunUntilStop runs the coordinator until receiving the stop signal. func (c *Coordinator) RunUntilStop() { c.Run() @@ -215,15 +234,11 @@ func (c *Coordinator) RunUntilStop() { // Run starts coordinator. func (c *Coordinator) Run() { - ticker := time.NewTicker(runSchedulerCheckInterval) - failpoint.Inject("changeCoordinatorTicker", func() { - ticker.Reset(100 * time.Millisecond) - }) - defer ticker.Stop() log.Info("coordinator starts to run schedulers") c.InitSchedulers(true) - c.wg.Add(4) + c.wg.Add(5) + go c.runPrepareChecker() // Starts to patrol regions. go c.PatrolRegions() // Checks suspect key ranges diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index f8d48ca6bda..caeab9c349c 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -370,7 +370,7 @@ func (c *Controller) runScheduler(s *ScheduleController) { for { select { case <-ticker.C: - if !c.prepareChecker.Check(c.cluster.GetBasicCluster()) { + if !c.prepareChecker.IsPrepared() { continue } diagnosable := s.IsDiagnosticAllowed()