Skip to content

Commit

Permalink
Start rule watcher after cluster is created
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Oct 17, 2023
1 parent 24dffe4 commit f216e26
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 115 deletions.
137 changes: 30 additions & 107 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand Down Expand Up @@ -53,16 +52,12 @@ type Watcher struct {
etcdClient *clientv3.Client
ruleStorage endpoint.RuleStorage

// components is used to store the cluster components and protect them with a RWMutex lock.
components struct {
syncutil.RWMutex
// checkerController is used to add the suspect key ranges to the checker when the rule changed.
checkerController *checker.Controller
// ruleManager is used to manage the placement rules.
ruleManager *placement.RuleManager
// regionLabeler is used to manage the region label rules.
regionLabeler *labeler.RegionLabeler
}
// checkerController is used to add the suspect key ranges to the checker when the rule changed.
checkerController *checker.Controller
// ruleManager is used to manage the placement rules.
ruleManager *placement.RuleManager
// regionLabeler is used to manage the region label rules.
regionLabeler *labeler.RegionLabeler

ruleWatcher *etcdutil.LoopWatcher
groupWatcher *etcdutil.LoopWatcher
Expand All @@ -75,6 +70,9 @@ func NewWatcher(
etcdClient *clientv3.Client,
clusterID uint64,
ruleStorage endpoint.RuleStorage,
checkerController *checker.Controller,
ruleManager *placement.RuleManager,
regionLabeler *labeler.RegionLabeler,
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
rw := &Watcher{
Expand All @@ -85,6 +83,9 @@ func NewWatcher(
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: etcdClient,
ruleStorage: ruleStorage,
checkerController: checkerController,
ruleManager: ruleManager,
regionLabeler: regionLabeler,
}
err := rw.initializeRuleWatcher()
if err != nil {
Expand All @@ -104,48 +105,31 @@ func NewWatcher(
func (rw *Watcher) initializeRuleWatcher() error {
prefixToTrim := rw.rulesPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
key, value := string(kv.Key), string(kv.Value)
log.Info("update placement rule", zap.String("key", key), zap.String("value", value))
rm := rw.getRuleManager()
// If the rule manager is not set, it means that the cluster is not initialized yet,
// we should save the rule to the storage directly first.
if rm == nil {
// Since the PD API server will validate the rule before saving it to etcd,
// so we could directly save the string rule in JSON to the storage here.
return rw.ruleStorage.SaveRuleJSON(
strings.TrimPrefix(key, prefixToTrim),
value,
)
}
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err
}
// Update the suspect key ranges in the checker.
rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey)
if oldRule := rm.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.getCheckerController().AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
}
return rm.SetRule(rule)
return rw.ruleManager.SetRule(rule)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete placement rule", zap.String("key", key))
rm := rw.getRuleManager()
trimmedKey := strings.TrimPrefix(key, prefixToTrim)
if rm == nil {
return rw.ruleStorage.DeleteRule(trimmedKey)
}
ruleJSON, err := rw.ruleStorage.LoadRule(trimmedKey)
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim))
if err != nil {
return err
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
return err
}
rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rm.DeleteRule(rule.GroupID, rule.ID)
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
}
postEventFn := func() error {
return nil
Expand All @@ -164,37 +148,25 @@ func (rw *Watcher) initializeRuleWatcher() error {
func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
key, value := string(kv.Key), string(kv.Value)
log.Info("update placement rule group", zap.String("key", key), zap.String("value", value))
rm := rw.getRuleManager()
if rm == nil {
return rw.ruleStorage.SaveRuleGroupJSON(
strings.TrimPrefix(key, prefixToTrim),
value,
)
}
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err
}
// Add all rule key ranges within the group to the suspect key ranges.
for _, rule := range rm.GetRulesByGroup(ruleGroup.ID) {
rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey)
for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rm.SetRuleGroup(ruleGroup)
return rw.ruleManager.SetRuleGroup(ruleGroup)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete placement rule group", zap.String("key", key))
rm := rw.getRuleManager()
trimmedKey := strings.TrimPrefix(key, prefixToTrim)
if rm == nil {
return rw.ruleStorage.DeleteRuleGroup(trimmedKey)
for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
for _, rule := range rm.GetRulesByGroup(trimmedKey) {
rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rm.DeleteRuleGroup(trimmedKey)
return rw.ruleManager.DeleteRuleGroup(trimmedKey)
}
postEventFn := func() error {
return nil
Expand All @@ -213,30 +185,17 @@ func (rw *Watcher) initializeGroupWatcher() error {
func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
key, value := string(kv.Key), string(kv.Value)
log.Info("update region label rule", zap.String("key", key), zap.String("value", value))
rl := rw.getRegionLabeler()
if rl == nil {
return rw.ruleStorage.SaveRegionRuleJSON(
strings.TrimPrefix(key, prefixToTrim),
value,
)
}
log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
rule, err := labeler.NewLabelRuleFromJSON(kv.Value)
if err != nil {
return err
}
return rl.SetLabelRule(rule)
return rw.regionLabeler.SetLabelRule(rule)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete region label rule", zap.String("key", key))
rl := rw.getRegionLabeler()
trimmedKey := strings.TrimPrefix(key, prefixToTrim)
if rl == nil {
return rw.ruleStorage.DeleteRegionRule(trimmedKey)
}
return rl.DeleteLabelRule(trimmedKey)
return rw.regionLabeler.DeleteLabelRule(strings.TrimPrefix(key, prefixToTrim))
}
postEventFn := func() error {
return nil
Expand All @@ -257,39 +216,3 @@ func (rw *Watcher) Close() {
rw.cancel()
rw.wg.Wait()
}

// SetClusterComponents sets the cluster components for the watcher.
func (rw *Watcher) SetClusterComponents(
sc *checker.Controller,
rm *placement.RuleManager,
rl *labeler.RegionLabeler,
) error {
rw.components.Lock()
defer rw.components.Unlock()
rw.components.checkerController = sc
rw.components.ruleManager = rm
rw.components.regionLabeler = rl
// Reload the rules to make sure that the rules are consistent with the storage.
if err := rm.Reload(); err != nil {
return err
}
return rl.Reload()
}

func (rw *Watcher) getCheckerController() *checker.Controller {
rw.components.RLock()
defer rw.components.RUnlock()
return rw.components.checkerController
}

func (rw *Watcher) getRuleManager() *placement.RuleManager {
rw.components.RLock()
defer rw.components.RUnlock()
return rw.components.ruleManager
}

func (rw *Watcher) getRegionLabeler() *labeler.RegionLabeler {
rw.components.RLock()
defer rw.components.RUnlock()
return rw.components.regionLabeler
}
22 changes: 14 additions & 8 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (s *Server) startServer() (err error) {
func (s *Server) startCluster(context.Context) error {
s.basicCluster = core.NewBasicCluster()
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
err := s.startWatcher()
err := s.startMetaConfWatcher()
if err != nil {
return err
}
Expand All @@ -464,12 +464,13 @@ func (s *Server) startCluster(context.Context) error {
if err != nil {
return err
}
// Inject the cluster components into the watchers.
// Inject the cluster components into the config watcher after the scheduler controller is created.
s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController())
s.ruleWatcher.SetClusterComponents(
s.cluster.GetCoordinator().GetCheckerController(),
s.cluster.GetRuleManager(),
s.cluster.GetRegionLabeler())
// Start the rule watcher after the cluster is created.
err = s.startRuleWatcher()
if err != nil {
return err
}
s.cluster.StartBackgroundJobs()
return nil
}
Expand All @@ -479,7 +480,7 @@ func (s *Server) stopCluster() {
s.stopWatcher()
}

func (s *Server) startWatcher() (err error) {
func (s *Server) startMetaConfWatcher() (err error) {
s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster)
if err != nil {
return err
Expand All @@ -488,7 +489,12 @@ func (s *Server) startWatcher() (err error) {
if err != nil {
return err
}
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage)
return err
}

func (s *Server) startRuleWatcher() (err error) {
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage,
s.cluster.GetCoordinator().GetCheckerController(), s.cluster.GetRuleManager(), s.cluster.GetRegionLabeler())
return err
}

Expand Down

0 comments on commit f216e26

Please sign in to comment.