From 43551e221d8689e43bc4ee2e3e6a13e10ea7c491 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 20 Oct 2023 14:13:59 +0800 Subject: [PATCH] coordinator, mcs/scheduling: fix the default schedulers initialization (#7236) close tikv/pd#7169 Fix the default scheduler initialization of the scheduling service. Signed-off-by: JmPotato --- pkg/mcs/scheduling/server/config/config.go | 18 ++++++--- pkg/schedule/coordinator.go | 21 +++++++--- .../mcs/scheduling/server_test.go | 38 +++++++++++++------ 3 files changed, 54 insertions(+), 23 deletions(-) diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 001a433ba07..4f9caca41e6 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -18,6 +18,7 @@ import ( "fmt" "os" "path/filepath" + "reflect" "strings" "sync/atomic" "time" @@ -232,6 +233,14 @@ func (o *PersistConfig) getSchedulersUpdatingNotifier() chan<- struct{} { return v.(chan<- struct{}) } +func (o *PersistConfig) tryNotifySchedulersUpdating() { + notifier := o.getSchedulersUpdatingNotifier() + if notifier == nil { + return + } + notifier <- struct{}{} +} + // GetClusterVersion returns the cluster version. func (o *PersistConfig) GetClusterVersion() *semver.Version { return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion)) @@ -251,11 +260,10 @@ func (o *PersistConfig) GetScheduleConfig() *sc.ScheduleConfig { func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) { old := o.GetScheduleConfig() o.schedule.Store(cfg) - // The coordinator is not aware of the underlying scheduler config changes, however, it - // should react on the scheduler number changes to handle the add/remove scheduler events. - if notifier := o.getSchedulersUpdatingNotifier(); notifier != nil && - len(old.Schedulers) != len(cfg.Schedulers) { - notifier <- struct{}{} + // The coordinator is not aware of the underlying scheduler config changes, + // we should notify it to update the schedulers proactively. + if !reflect.DeepEqual(old.Schedulers, cfg.Schedulers) { + o.tryNotifySchedulersUpdating() } } diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index f35bd6d4de3..8fb9ec8b286 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -458,13 +458,16 @@ func (c *Coordinator) InitSchedulers(needRun bool) { log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) continue } - log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName())) if needRun { + log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName())) if err = c.schedulers.AddScheduler(s); err != nil { log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) } - } else if err = c.schedulers.AddSchedulerHandler(s); err != nil { - log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) + } else { + log.Info("create scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName())) + if err = c.schedulers.AddSchedulerHandler(s); err != nil { + log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) + } } } @@ -484,8 +487,8 @@ func (c *Coordinator) InitSchedulers(needRun bool) { continue } - log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) if needRun { + log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) } else { @@ -493,8 +496,14 @@ func (c *Coordinator) InitSchedulers(needRun bool) { scheduleCfg.Schedulers[k] = schedulerCfg k++ } - } else if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { - log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) + } else { + log.Info("create scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) + if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { + log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) + } else { + scheduleCfg.Schedulers[k] = schedulerCfg + k++ + } } } diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 324c8e5cad5..85cf84361b4 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -265,18 +265,32 @@ func (suite *serverTestSuite) TestSchedulerSync() { api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName) checkEvictLeaderSchedulerExist(re, schedulersController, false) - // TODO: test more schedulers. - // Fixme: the following code will fail because the scheduler is not removed but not synced. - // checkDelete := func(schedulerName string) { - // re.NotNil(schedulersController.GetScheduler(schedulers.BalanceLeaderName) != nil) - // api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.BalanceLeaderName) - // testutil.Eventually(re, func() bool { - // return schedulersController.GetScheduler(schedulers.BalanceLeaderName) == nil - // }) - // } - // checkDelete(schedulers.BalanceLeaderName) - // checkDelete(schedulers.BalanceRegionName) - // checkDelete(schedulers.HotRegionName) + // The default scheduler could not be deleted, it could only be disabled. + defaultSchedulerNames := []string{ + schedulers.BalanceLeaderName, + schedulers.BalanceRegionName, + schedulers.BalanceWitnessName, + schedulers.HotRegionName, + schedulers.TransferWitnessLeaderName, + } + checkDisabled := func(name string, shouldDisabled bool) { + re.NotNil(schedulersController.GetScheduler(name), name) + testutil.Eventually(re, func() bool { + disabled, err := schedulersController.IsSchedulerDisabled(name) + re.NoError(err, name) + return disabled == shouldDisabled + }) + } + for _, name := range defaultSchedulerNames { + checkDisabled(name, false) + api.MustDeleteScheduler(re, suite.backendEndpoints, name) + checkDisabled(name, true) + } + for _, name := range defaultSchedulerNames { + checkDisabled(name, true) + api.MustAddScheduler(re, suite.backendEndpoints, name, nil) + checkDisabled(name, false) + } } func checkEvictLeaderSchedulerExist(re *require.Assertions, sc *schedulers.Controller, exist bool) {