Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Dec 5, 2024
1 parent 39ae987 commit 7c2ef71
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *Controller) PatrolRegions() {
case <-ticker.C:
c.updateTickerIfNeeded(ticker)
c.updatePatrolWorkersIfNeeded()
if !c.prepareChecker.Check(c.cluster.GetBasicCluster()) {
if !c.prepareChecker.IsPrepared() {
continue
}
if c.cluster.IsSchedulingHalted() {
Expand Down
29 changes: 22 additions & 7 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
)

const (
runSchedulerCheckInterval = 3 * time.Second
runPrepareCheckerInterval = 3 * time.Second
maxLoadConfigRetries = 10
// pushOperatorTickInterval is the interval try to push the operator.
pushOperatorTickInterval = 500 * time.Millisecond
Expand Down Expand Up @@ -203,6 +203,25 @@ func (c *Coordinator) driveSlowNodeScheduler() {
}
}

func (c *Coordinator) runPrepareChecker() {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(runPrepareCheckerInterval)
failpoint.Inject("changeCoordinatorTicker", func() {
ticker.Reset(100 * time.Millisecond)
})
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
c.prepareChecker.Check(c.cluster.GetBasicCluster())
}
}
}

// RunUntilStop runs the coordinator until receiving the stop signal.
func (c *Coordinator) RunUntilStop() {
c.Run()
Expand All @@ -215,15 +234,11 @@ func (c *Coordinator) RunUntilStop() {

// Run starts coordinator.
func (c *Coordinator) Run() {
ticker := time.NewTicker(runSchedulerCheckInterval)
failpoint.Inject("changeCoordinatorTicker", func() {
ticker.Reset(100 * time.Millisecond)
})
defer ticker.Stop()
log.Info("coordinator starts to run schedulers")
c.InitSchedulers(true)

c.wg.Add(4)
c.wg.Add(5)
go c.runPrepareChecker()
// Starts to patrol regions.
go c.PatrolRegions()
// Checks suspect key ranges
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (c *Controller) runScheduler(s *ScheduleController) {
for {
select {
case <-ticker.C:
if !c.prepareChecker.Check(c.cluster.GetBasicCluster()) {
if !c.prepareChecker.IsPrepared() {
continue
}
diagnosable := s.IsDiagnosticAllowed()
Expand Down

0 comments on commit 7c2ef71

Please sign in to comment.