Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: reorganize cluster start and stop process #7155

Merged
merged 5 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
checkMembershipCh chan struct{}
apiServerLeader atomic.Value
clusterID uint64
running atomic.Bool
}

const regionLabelGCInterval = time.Hour
Expand Down Expand Up @@ -203,6 +204,14 @@
return c.apiServerLeader.CompareAndSwap(old, new)
}

func trySend(notifier chan struct{}) {
select {
case notifier <- struct{}{}:
// If the channel is not empty, it means the check is triggered.
default:

Check warning on line 211 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L211

Added line #L211 was not covered by tests
}
}

// updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion.
func (c *Cluster) updateScheduler() {
defer logutil.LogPanic()
Expand All @@ -213,8 +222,11 @@
// Establish a notifier to listen the schedulers updating.
notifier := make(chan struct{}, 1)
// Make sure the check will be triggered once later.
notifier <- struct{}{}
trySend(notifier)
c.persistConfig.SetSchedulersUpdatingNotifier(notifier)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-c.ctx.Done():
Expand All @@ -224,6 +236,18 @@
// This is triggered by the watcher when the schedulers are updated.
}

if !c.running.Load() {
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-ticker.C:

Check warning on line 244 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L244

Added line #L244 was not covered by tests
// retry
trySend(notifier)
continue

Check warning on line 247 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L246-L247

Added lines #L246 - L247 were not covered by tests
}
}

log.Info("schedulers updating notifier is triggered, try to update the scheduler")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If stop server here, is there data race?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is the same as the current PD.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other word,is it possible to meet data race when add scheduler and coordinator wait at the same time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so but the possibility is much smaller than before.

Copy link
Member Author

@rleungx rleungx Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way: we can check the cluster status before adding a scheduler every time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there is still a gap between check status and adding scheduluer, if stop server here after checking the cluster status and before adding scheduler, it is possible to meet data race too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is the way we use the wait group for the scheduler controller is not proper instead of the wait group itself.

var (
schedulersController = c.coordinator.GetSchedulersController()
Expand Down Expand Up @@ -394,15 +418,29 @@
}
}

// runCoordinator runs the main scheduling loop.
func (c *Cluster) runCoordinator() {
defer logutil.LogPanic()
defer c.wg.Done()
c.coordinator.RunUntilStop()
}

// StartBackgroundJobs starts background jobs.
func (c *Cluster) StartBackgroundJobs() {
c.wg.Add(2)
c.wg.Add(3)
go c.updateScheduler()
go c.runUpdateStoreStats()
go c.runCoordinator()
c.running.Store(true)
}

// StopBackgroundJobs stops background jobs.
func (c *Cluster) StopBackgroundJobs() {
if !c.running.Load() {
return

Check warning on line 440 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L440

Added line #L440 was not covered by tests
}
c.running.Store(false)
c.coordinator.Stop()
c.cancel()
c.wg.Wait()
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@

// SetLogLevel sets log level.
func (s *Server) SetLogLevel(level string) error {
if !logutil.IsLevelLegal(level) {
return errors.Errorf("log level %s is illegal", level)

Check warning on line 126 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L125-L126

Added lines #L125 - L126 were not covered by tests
}
s.cfg.Log.Level = level
log.SetLevel(logutil.StringToZapLogLevel(level))
log.Warn("log level changed", zap.String("level", log.GetLevel().String()))
return nil

Check warning on line 131 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L128-L131

Added lines #L128 - L131 were not covered by tests
}

// Run runs the scheduling server.
Expand Down Expand Up @@ -456,16 +456,12 @@
}
s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController())
s.cluster.StartBackgroundJobs()
go s.GetCoordinator().RunUntilStop()
return nil
}

func (s *Server) stopCluster() {
s.GetCoordinator().Stop()
s.cluster.StopBackgroundJobs()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
s.stopWatcher()
}

func (s *Server) startWatcher() (err error) {
Expand All @@ -481,6 +477,12 @@
return err
}

func (s *Server) stopWatcher() {
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
}

// GetPersistConfig returns the persist config.
// It's used to test.
func (s *Server) GetPersistConfig() *config.PersistConfig {
Expand Down
2 changes: 2 additions & 0 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage e

// Wait waits on all schedulers to exit.
func (c *Controller) Wait() {
c.Lock()
defer c.Unlock()
c.wg.Wait()
}

Expand Down
Loading