diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index ec8659c1319..912fb9c01e5 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -25,7 +25,6 @@ import ( "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" - "github.com/tikv/pd/pkg/utils/syncutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -53,16 +52,12 @@ type Watcher struct { etcdClient *clientv3.Client ruleStorage endpoint.RuleStorage - // components is used to store the cluster components and protect them with a RWMutex lock. - components struct { - syncutil.RWMutex - // checkerController is used to add the suspect key ranges to the checker when the rule changed. - checkerController *checker.Controller - // ruleManager is used to manage the placement rules. - ruleManager *placement.RuleManager - // regionLabeler is used to manage the region label rules. - regionLabeler *labeler.RegionLabeler - } + // checkerController is used to add the suspect key ranges to the checker when the rule changed. + checkerController *checker.Controller + // ruleManager is used to manage the placement rules. + ruleManager *placement.RuleManager + // regionLabeler is used to manage the region label rules. + regionLabeler *labeler.RegionLabeler ruleWatcher *etcdutil.LoopWatcher groupWatcher *etcdutil.LoopWatcher @@ -75,6 +70,9 @@ func NewWatcher( etcdClient *clientv3.Client, clusterID uint64, ruleStorage endpoint.RuleStorage, + checkerController *checker.Controller, + ruleManager *placement.RuleManager, + regionLabeler *labeler.RegionLabeler, ) (*Watcher, error) { ctx, cancel := context.WithCancel(ctx) rw := &Watcher{ @@ -85,6 +83,9 @@ func NewWatcher( regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID), etcdClient: etcdClient, ruleStorage: ruleStorage, + checkerController: checkerController, + ruleManager: ruleManager, + regionLabeler: regionLabeler, } err := rw.initializeRuleWatcher() if err != nil { @@ -104,39 +105,22 @@ func NewWatcher( func (rw *Watcher) initializeRuleWatcher() error { prefixToTrim := rw.rulesPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - key, value := string(kv.Key), string(kv.Value) - log.Info("update placement rule", zap.String("key", key), zap.String("value", value)) - rm := rw.getRuleManager() - // If the rule manager is not set, it means that the cluster is not initialized yet, - // we should save the rule to the storage directly first. - if rm == nil { - // Since the PD API server will validate the rule before saving it to etcd, - // so we could directly save the string rule in JSON to the storage here. - return rw.ruleStorage.SaveRuleJSON( - strings.TrimPrefix(key, prefixToTrim), - value, - ) - } + log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) rule, err := placement.NewRuleFromJSON(kv.Value) if err != nil { return err } // Update the suspect key ranges in the checker. - rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey) - if oldRule := rm.GetRule(rule.GroupID, rule.ID); oldRule != nil { - rw.getCheckerController().AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil { + rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) } - return rm.SetRule(rule) + return rw.ruleManager.SetRule(rule) } deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) log.Info("delete placement rule", zap.String("key", key)) - rm := rw.getRuleManager() - trimmedKey := strings.TrimPrefix(key, prefixToTrim) - if rm == nil { - return rw.ruleStorage.DeleteRule(trimmedKey) - } - ruleJSON, err := rw.ruleStorage.LoadRule(trimmedKey) + ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim)) if err != nil { return err } @@ -144,8 +128,8 @@ func (rw *Watcher) initializeRuleWatcher() error { if err != nil { return err } - rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey) - return rm.DeleteRule(rule.GroupID, rule.ID) + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) } postEventFn := func() error { return nil @@ -164,37 +148,25 @@ func (rw *Watcher) initializeRuleWatcher() error { func (rw *Watcher) initializeGroupWatcher() error { prefixToTrim := rw.ruleGroupPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - key, value := string(kv.Key), string(kv.Value) - log.Info("update placement rule group", zap.String("key", key), zap.String("value", value)) - rm := rw.getRuleManager() - if rm == nil { - return rw.ruleStorage.SaveRuleGroupJSON( - strings.TrimPrefix(key, prefixToTrim), - value, - ) - } + log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value) if err != nil { return err } // Add all rule key ranges within the group to the suspect key ranges. - for _, rule := range rm.GetRulesByGroup(ruleGroup.ID) { - rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey) + for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) { + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) } - return rm.SetRuleGroup(ruleGroup) + return rw.ruleManager.SetRuleGroup(ruleGroup) } deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) log.Info("delete placement rule group", zap.String("key", key)) - rm := rw.getRuleManager() trimmedKey := strings.TrimPrefix(key, prefixToTrim) - if rm == nil { - return rw.ruleStorage.DeleteRuleGroup(trimmedKey) + for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) { + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) } - for _, rule := range rm.GetRulesByGroup(trimmedKey) { - rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey) - } - return rm.DeleteRuleGroup(trimmedKey) + return rw.ruleManager.DeleteRuleGroup(trimmedKey) } postEventFn := func() error { return nil @@ -213,30 +185,17 @@ func (rw *Watcher) initializeGroupWatcher() error { func (rw *Watcher) initializeRegionLabelWatcher() error { prefixToTrim := rw.regionLabelPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - key, value := string(kv.Key), string(kv.Value) - log.Info("update region label rule", zap.String("key", key), zap.String("value", value)) - rl := rw.getRegionLabeler() - if rl == nil { - return rw.ruleStorage.SaveRegionRuleJSON( - strings.TrimPrefix(key, prefixToTrim), - value, - ) - } + log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) rule, err := labeler.NewLabelRuleFromJSON(kv.Value) if err != nil { return err } - return rl.SetLabelRule(rule) + return rw.regionLabeler.SetLabelRule(rule) } deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) log.Info("delete region label rule", zap.String("key", key)) - rl := rw.getRegionLabeler() - trimmedKey := strings.TrimPrefix(key, prefixToTrim) - if rl == nil { - return rw.ruleStorage.DeleteRegionRule(trimmedKey) - } - return rl.DeleteLabelRule(trimmedKey) + return rw.regionLabeler.DeleteLabelRule(strings.TrimPrefix(key, prefixToTrim)) } postEventFn := func() error { return nil @@ -257,39 +216,3 @@ func (rw *Watcher) Close() { rw.cancel() rw.wg.Wait() } - -// SetClusterComponents sets the cluster components for the watcher. -func (rw *Watcher) SetClusterComponents( - sc *checker.Controller, - rm *placement.RuleManager, - rl *labeler.RegionLabeler, -) error { - rw.components.Lock() - defer rw.components.Unlock() - rw.components.checkerController = sc - rw.components.ruleManager = rm - rw.components.regionLabeler = rl - // Reload the rules to make sure that the rules are consistent with the storage. - if err := rm.Reload(); err != nil { - return err - } - return rl.Reload() -} - -func (rw *Watcher) getCheckerController() *checker.Controller { - rw.components.RLock() - defer rw.components.RUnlock() - return rw.components.checkerController -} - -func (rw *Watcher) getRuleManager() *placement.RuleManager { - rw.components.RLock() - defer rw.components.RUnlock() - return rw.components.ruleManager -} - -func (rw *Watcher) getRegionLabeler() *labeler.RegionLabeler { - rw.components.RLock() - defer rw.components.RUnlock() - return rw.components.regionLabeler -} diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 3c326783f05..9caae932037 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -455,7 +455,7 @@ func (s *Server) startServer() (err error) { func (s *Server) startCluster(context.Context) error { s.basicCluster = core.NewBasicCluster() s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) - err := s.startWatcher() + err := s.startMetaConfWatcher() if err != nil { return err } @@ -464,12 +464,13 @@ func (s *Server) startCluster(context.Context) error { if err != nil { return err } - // Inject the cluster components into the watchers. + // Inject the cluster components into the config watcher after the scheduler controller is created. s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController()) - s.ruleWatcher.SetClusterComponents( - s.cluster.GetCoordinator().GetCheckerController(), - s.cluster.GetRuleManager(), - s.cluster.GetRegionLabeler()) + // Start the rule watcher after the cluster is created. + err = s.startRuleWatcher() + if err != nil { + return err + } s.cluster.StartBackgroundJobs() return nil } @@ -479,7 +480,7 @@ func (s *Server) stopCluster() { s.stopWatcher() } -func (s *Server) startWatcher() (err error) { +func (s *Server) startMetaConfWatcher() (err error) { s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster) if err != nil { return err @@ -488,7 +489,12 @@ func (s *Server) startWatcher() (err error) { if err != nil { return err } - s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage) + return err +} + +func (s *Server) startRuleWatcher() (err error) { + s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage, + s.cluster.GetCoordinator().GetCheckerController(), s.cluster.GetRuleManager(), s.cluster.GetRegionLabeler()) return err } diff --git a/pkg/schedule/labeler/labeler.go b/pkg/schedule/labeler/labeler.go index 892ef097405..39722b1a038 100644 --- a/pkg/schedule/labeler/labeler.go +++ b/pkg/schedule/labeler/labeler.go @@ -149,14 +149,6 @@ func (l *RegionLabeler) buildRangeList() { l.rangeList = builder.Build() } -// Reload loads rules from storage. -func (l *RegionLabeler) Reload() error { - l.Lock() - defer l.Unlock() - l.labelRules = make(map[string]*LabelRule) - return l.loadRules() -} - // GetSplitKeys returns all split keys in the range (start, end). func (l *RegionLabeler) GetSplitKeys(start, end []byte) [][]byte { l.RLock() diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index 6532a4083ed..bdca4cc1b19 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -263,31 +263,6 @@ func (m *RuleManager) adjustRule(r *Rule, groupID string) (err error) { return nil } -// Reload reloads rules from storage. -func (m *RuleManager) Reload() error { - m.Lock() - defer m.Unlock() - // Only allow to reload when it is initialized. - if !m.initialized { - return nil - } - // Force the rule manager to reload rules from storage. - m.ruleConfig = newRuleConfig() - if err := m.loadRules(); err != nil { - return err - } - if err := m.loadGroups(); err != nil { - return err - } - m.ruleConfig.adjust() - ruleList, err := buildRuleList(m.ruleConfig) - if err != nil { - return err - } - m.ruleList = ruleList - return nil -} - // GetRule returns the Rule with the same (group, id). func (m *RuleManager) GetRule(group, id string) *Rule { m.RLock()