From 159ff3720a18a8a95f4c5a481fe60e9012569e4c Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 16 Oct 2023 17:12:11 +0800 Subject: [PATCH 1/5] Integrate rule watcher with the managers Signed-off-by: JmPotato --- pkg/mcs/scheduling/server/rule/watcher.go | 155 +++++++++++++++--- pkg/mcs/scheduling/server/server.go | 5 + pkg/schedule/labeler/labeler.go | 2 +- pkg/storage/endpoint/rule.go | 6 + .../integrations/mcs/scheduling/rule_test.go | 145 +++++++++++++++- 5 files changed, 287 insertions(+), 26 deletions(-) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 4cad6fdcbae..7757a63c069 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -18,8 +18,12 @@ import ( "context" "strings" "sync" + "sync/atomic" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/schedule/checker" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" @@ -49,13 +53,19 @@ type Watcher struct { etcdClient *clientv3.Client ruleStorage endpoint.RuleStorage + // checkerController is used to add the suspect key ranges to the checker when the rule changed. + checkerController atomic.Value + // ruleManager is used to manage the placement rules. + ruleManager atomic.Value + // regionLabeler is used to manage the region label rules. + regionLabeler atomic.Value + ruleWatcher *etcdutil.LoopWatcher groupWatcher *etcdutil.LoopWatcher labelWatcher *etcdutil.LoopWatcher } // NewWatcher creates a new watcher to watch the Placement Rule change from PD API server. -// Please use `GetRuleStorage` to get the underlying storage to access the Placement Rules. func NewWatcher( ctx context.Context, etcdClient *clientv3.Client, @@ -90,17 +100,48 @@ func NewWatcher( func (rw *Watcher) initializeRuleWatcher() error { prefixToTrim := rw.rulesPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - // Since the PD API server will validate the rule before saving it to etcd, - // so we could directly save the string rule in JSON to the storage here. - log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) - return rw.ruleStorage.SaveRuleJSON( - strings.TrimPrefix(string(kv.Key), prefixToTrim), - string(kv.Value), - ) + key, value := string(kv.Key), string(kv.Value) + log.Info("update placement rule", zap.String("key", key), zap.String("value", value)) + rm := rw.getRuleManager() + // If the rule manager is not set, it means that the cluster is not initialized yet, + // we should save the rule to the storage directly first. + if rm == nil { + // Since the PD API server will validate the rule before saving it to etcd, + // so we could directly save the string rule in JSON to the storage here. + return rw.ruleStorage.SaveRuleJSON( + strings.TrimPrefix(key, prefixToTrim), + value, + ) + } + rule, err := placement.NewRuleFromJSON(kv.Value) + if err != nil { + return err + } + // Update the suspect key ranges in the checker. + rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey) + if oldRule := rm.GetRule(rule.GroupID, rule.ID); oldRule != nil { + rw.getCheckerController().AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) + } + return rm.SetRule(rule) } deleteFn := func(kv *mvccpb.KeyValue) error { - log.Info("delete placement rule", zap.String("key", string(kv.Key))) - return rw.ruleStorage.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim)) + key := string(kv.Key) + log.Info("delete placement rule", zap.String("key", key)) + rm := rw.getRuleManager() + trimmedKey := strings.TrimPrefix(key, prefixToTrim) + if rm == nil { + return rw.ruleStorage.DeleteRule(trimmedKey) + } + ruleJSON, err := rw.ruleStorage.LoadRule(trimmedKey) + if err != nil { + return err + } + rule, err := placement.NewRuleFromJSON([]byte(ruleJSON)) + if err != nil { + return err + } + rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey) + return rm.DeleteRule(rule.GroupID, rule.ID) } postEventFn := func() error { return nil @@ -119,16 +160,32 @@ func (rw *Watcher) initializeRuleWatcher() error { func (rw *Watcher) initializeGroupWatcher() error { prefixToTrim := rw.ruleGroupPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) - return rw.ruleStorage.SaveRuleGroupJSON( - strings.TrimPrefix(string(kv.Key), prefixToTrim), - string(kv.Value), - ) + key, value := string(kv.Key), string(kv.Value) + log.Info("update placement rule group", zap.String("key", key), zap.String("value", value)) + rm := rw.getRuleManager() + if rm == nil { + return rw.ruleStorage.SaveRuleGroupJSON( + strings.TrimPrefix(key, prefixToTrim), + value, + ) + } + ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value) + if err != nil { + return err + } + return rm.SetRuleGroup(ruleGroup) } deleteFn := func(kv *mvccpb.KeyValue) error { - log.Info("delete placement rule group", zap.String("key", string(kv.Key))) - return rw.ruleStorage.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim)) + key := string(kv.Key) + log.Info("delete placement rule group", zap.String("key", key)) + rm := rw.getRuleManager() + trimmedKey := strings.TrimPrefix(key, prefixToTrim) + if rm == nil { + return rw.ruleStorage.DeleteRuleGroup(trimmedKey) + } + return rm.DeleteRuleGroup(trimmedKey) } + // Trigger the rule manager to reload the rule groups. postEventFn := func() error { return nil } @@ -146,15 +203,30 @@ func (rw *Watcher) initializeGroupWatcher() error { func (rw *Watcher) initializeRegionLabelWatcher() error { prefixToTrim := rw.regionLabelPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) - return rw.ruleStorage.SaveRegionRuleJSON( - strings.TrimPrefix(string(kv.Key), prefixToTrim), - string(kv.Value), - ) + key, value := string(kv.Key), string(kv.Value) + log.Info("update region label rule", zap.String("key", key), zap.String("value", value)) + rl := rw.getRegionLabeler() + if rl == nil { + return rw.ruleStorage.SaveRegionRuleJSON( + strings.TrimPrefix(key, prefixToTrim), + value, + ) + } + rule, err := labeler.NewLabelRuleFromJSON(kv.Value) + if err != nil { + return err + } + return rl.SetLabelRule(rule) } deleteFn := func(kv *mvccpb.KeyValue) error { - log.Info("delete region label rule", zap.String("key", string(kv.Key))) - return rw.ruleStorage.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim)) + key := string(kv.Key) + log.Info("delete region label rule", zap.String("key", key)) + rl := rw.getRegionLabeler() + trimmedKey := strings.TrimPrefix(key, prefixToTrim) + if rl == nil { + return rw.ruleStorage.DeleteRegionRule(trimmedKey) + } + return rl.DeleteLabelRule(trimmedKey) } postEventFn := func() error { return nil @@ -175,3 +247,38 @@ func (rw *Watcher) Close() { rw.cancel() rw.wg.Wait() } + +// SetClusterComponents sets the cluster components for the watcher. +func (rw *Watcher) SetClusterComponents( + sc *checker.Controller, + rm *placement.RuleManager, + rl *labeler.RegionLabeler, +) { + rw.checkerController.Store(sc) + rw.ruleManager.Store(rm) + rw.regionLabeler.Store(rl) +} + +func (rw *Watcher) getCheckerController() *checker.Controller { + cc := rw.checkerController.Load() + if cc == nil { + return nil + } + return cc.(*checker.Controller) +} + +func (rw *Watcher) getRuleManager() *placement.RuleManager { + rm := rw.ruleManager.Load() + if rm == nil { + return nil + } + return rm.(*placement.RuleManager) +} + +func (rw *Watcher) getRegionLabeler() *labeler.RegionLabeler { + rl := rw.regionLabeler.Load() + if rl == nil { + return nil + } + return rl.(*labeler.RegionLabeler) +} diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 02cb1ba3c70..3c326783f05 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -464,7 +464,12 @@ func (s *Server) startCluster(context.Context) error { if err != nil { return err } + // Inject the cluster components into the watchers. s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController()) + s.ruleWatcher.SetClusterComponents( + s.cluster.GetCoordinator().GetCheckerController(), + s.cluster.GetRuleManager(), + s.cluster.GetRegionLabeler()) s.cluster.StartBackgroundJobs() return nil } diff --git a/pkg/schedule/labeler/labeler.go b/pkg/schedule/labeler/labeler.go index c525ac5c44f..39722b1a038 100644 --- a/pkg/schedule/labeler/labeler.go +++ b/pkg/schedule/labeler/labeler.go @@ -254,7 +254,7 @@ func (l *RegionLabeler) Patch(patch LabelRulePatch) error { } } - // update inmemory states. + // update in-memory states. l.Lock() defer l.Unlock() diff --git a/pkg/storage/endpoint/rule.go b/pkg/storage/endpoint/rule.go index 7e2813c23bd..125c5bc31eb 100644 --- a/pkg/storage/endpoint/rule.go +++ b/pkg/storage/endpoint/rule.go @@ -22,6 +22,7 @@ import ( // RuleStorage defines the storage operations on the rule. type RuleStorage interface { + LoadRule(ruleKey string) (string, error) LoadRules(f func(k, v string)) error SaveRule(ruleKey string, rule interface{}) error SaveRuleJSON(ruleKey, rule string) error @@ -93,6 +94,11 @@ func (se *StorageEndpoint) DeleteRegionRule(ruleKey string) error { return se.Remove(regionLabelKeyPath(ruleKey)) } +// LoadRule load a placement rule from storage. +func (se *StorageEndpoint) LoadRule(ruleKey string) (string, error) { + return se.Load(ruleKeyPath(ruleKey)) +} + // LoadRules loads placement rules from storage. func (se *StorageEndpoint) LoadRules(f func(k, v string)) error { return se.loadRangeByPrefix(rulesPath+"/", f) diff --git a/tests/integrations/mcs/scheduling/rule_test.go b/tests/integrations/mcs/scheduling/rule_test.go index 104204dd625..50b4c38514f 100644 --- a/tests/integrations/mcs/scheduling/rule_test.go +++ b/tests/integrations/mcs/scheduling/rule_test.go @@ -41,7 +41,8 @@ type ruleTestSuite struct { // The PD cluster. cluster *tests.TestCluster // pdLeaderServer is the leader server of the PD cluster. - pdLeaderServer *tests.TestServer + pdLeaderServer *tests.TestServer + backendEndpoint string } func TestRule(t *testing.T) { @@ -59,6 +60,7 @@ func (suite *ruleTestSuite) SetupSuite() { re.NoError(err) leaderName := suite.cluster.WaitLeader() suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + suite.backendEndpoint = suite.pdLeaderServer.GetAddr() re.NoError(suite.pdLeaderServer.BootstrapCluster()) } @@ -235,3 +237,144 @@ func (suite *ruleTestSuite) TestRuleWatch() { re.Equal(labelRule.Labels, labelRules[1].Labels) re.Equal(labelRule.RuleType, labelRules[1].RuleType) } + +func (suite *ruleTestSuite) TestRuleWatchWithManager() { + re := suite.Require() + + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoint) + re.NoError(err) + defer tc.Destroy() + + tc.WaitForPrimaryServing(re) + cluster := tc.GetPrimaryServer().GetCluster() + ruleManager := cluster.GetRuleManager() + // Check the default rule and rule group. + rules := ruleManager.GetAllRules() + re.Len(rules, 1) + re.Equal("pd", rules[0].GroupID) + re.Equal("default", rules[0].ID) + re.Equal(0, rules[0].Index) + re.Empty(rules[0].StartKey) + re.Empty(rules[0].EndKey) + re.Equal(placement.Voter, rules[0].Role) + re.Empty(rules[0].LocationLabels) + ruleGroups := ruleManager.GetRuleGroups() + re.Len(ruleGroups, 1) + re.Equal("pd", ruleGroups[0].ID) + re.Equal(0, ruleGroups[0].Index) + re.False(ruleGroups[0].Override) + // Set a new rule via the PD API server. + apiRuleManager := suite.pdLeaderServer.GetRaftCluster().GetRuleManager() + rule := &placement.Rule{ + GroupID: "2", + ID: "3", + Role: "voter", + Count: 1, + StartKeyHex: "22", + EndKeyHex: "dd", + } + err = apiRuleManager.SetRule(rule) + re.NoError(err) + testutil.Eventually(re, func() bool { + rules = ruleManager.GetAllRules() + return len(rules) == 2 + }) + sort.Slice(rules, func(i, j int) bool { + return rules[i].ID > rules[j].ID + }) + re.Len(rules, 2) + re.Equal(rule.GroupID, rules[1].GroupID) + re.Equal(rule.ID, rules[1].ID) + re.Equal(rule.Role, rules[1].Role) + re.Equal(rule.Count, rules[1].Count) + re.Equal(rule.StartKeyHex, rules[1].StartKeyHex) + re.Equal(rule.EndKeyHex, rules[1].EndKeyHex) + // Delete the rule. + err = apiRuleManager.DeleteRule(rule.GroupID, rule.ID) + re.NoError(err) + testutil.Eventually(re, func() bool { + rules = ruleManager.GetAllRules() + return len(rules) == 1 + }) + re.Len(rules, 1) + re.Equal("pd", rules[0].GroupID) + // Create a new rule group. + ruleGroup := &placement.RuleGroup{ + ID: "2", + Index: 100, + Override: true, + } + err = apiRuleManager.SetRuleGroup(ruleGroup) + re.NoError(err) + testutil.Eventually(re, func() bool { + ruleGroups = ruleManager.GetRuleGroups() + return len(ruleGroups) == 2 + }) + re.Len(ruleGroups, 2) + re.Equal(ruleGroup.ID, ruleGroups[1].ID) + re.Equal(ruleGroup.Index, ruleGroups[1].Index) + re.Equal(ruleGroup.Override, ruleGroups[1].Override) + // Delete the rule group. + err = apiRuleManager.DeleteRuleGroup(ruleGroup.ID) + re.NoError(err) + testutil.Eventually(re, func() bool { + ruleGroups = ruleManager.GetRuleGroups() + return len(ruleGroups) == 1 + }) + re.Len(ruleGroups, 1) + + // Test the region label rule watch. + regionLabeler := cluster.GetRegionLabeler() + labelRules := regionLabeler.GetAllLabelRules() + apiRegionLabeler := suite.pdLeaderServer.GetRaftCluster().GetRegionLabeler() + apiLabelRules := apiRegionLabeler.GetAllLabelRules() + re.Len(labelRules, len(apiLabelRules)) + re.Equal(apiLabelRules[0].ID, labelRules[0].ID) + re.Equal(apiLabelRules[0].Index, labelRules[0].Index) + re.Equal(apiLabelRules[0].Labels, labelRules[0].Labels) + re.Equal(apiLabelRules[0].RuleType, labelRules[0].RuleType) + // Set a new region label rule. + labelRule := &labeler.LabelRule{ + ID: "rule1", + Labels: []labeler.RegionLabel{{Key: "k1", Value: "v1"}}, + RuleType: "key-range", + Data: labeler.MakeKeyRanges("1234", "5678"), + } + err = apiRegionLabeler.SetLabelRule(labelRule) + re.NoError(err) + testutil.Eventually(re, func() bool { + labelRules = regionLabeler.GetAllLabelRules() + return len(labelRules) == 2 + }) + sort.Slice(labelRules, func(i, j int) bool { + return labelRules[i].ID < labelRules[j].ID + }) + re.Len(labelRules, 2) + re.Equal(labelRule.ID, labelRules[1].ID) + re.Equal(labelRule.Labels, labelRules[1].Labels) + re.Equal(labelRule.RuleType, labelRules[1].RuleType) + // Patch the region label rule. + labelRule = &labeler.LabelRule{ + ID: "rule2", + Labels: []labeler.RegionLabel{{Key: "k2", Value: "v2"}}, + RuleType: "key-range", + Data: labeler.MakeKeyRanges("ab12", "cd12"), + } + patch := labeler.LabelRulePatch{ + SetRules: []*labeler.LabelRule{labelRule}, + DeleteRules: []string{"rule1"}, + } + err = apiRegionLabeler.Patch(patch) + re.NoError(err) + testutil.Eventually(re, func() bool { + labelRules = regionLabeler.GetAllLabelRules() + return len(labelRules) == 2 + }) + sort.Slice(labelRules, func(i, j int) bool { + return labelRules[i].ID < labelRules[j].ID + }) + re.Len(labelRules, 2) + re.Equal(labelRule.ID, labelRules[1].ID) + re.Equal(labelRule.Labels, labelRules[1].Labels) + re.Equal(labelRule.RuleType, labelRules[1].RuleType) +} From 11dbbb27ade2b2182a29f779e8b0c8332ce99fb3 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 16 Oct 2023 17:44:14 +0800 Subject: [PATCH 2/5] Only keep the integration test Signed-off-by: JmPotato --- pkg/mcs/scheduling/server/rule/watcher.go | 7 + .../integrations/mcs/scheduling/rule_test.go | 175 ------------------ 2 files changed, 7 insertions(+), 175 deletions(-) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 7757a63c069..725df690ab7 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -173,6 +173,10 @@ func (rw *Watcher) initializeGroupWatcher() error { if err != nil { return err } + // Add all rule key ranges within the group to the suspect key ranges. + for _, rule := range rm.GetRulesByGroup(ruleGroup.ID) { + rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey) + } return rm.SetRuleGroup(ruleGroup) } deleteFn := func(kv *mvccpb.KeyValue) error { @@ -183,6 +187,9 @@ func (rw *Watcher) initializeGroupWatcher() error { if rm == nil { return rw.ruleStorage.DeleteRuleGroup(trimmedKey) } + for _, rule := range rm.GetRulesByGroup(trimmedKey) { + rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey) + } return rm.DeleteRuleGroup(trimmedKey) } // Trigger the rule manager to reload the rule groups. diff --git a/tests/integrations/mcs/scheduling/rule_test.go b/tests/integrations/mcs/scheduling/rule_test.go index 50b4c38514f..bffa58d0fe6 100644 --- a/tests/integrations/mcs/scheduling/rule_test.go +++ b/tests/integrations/mcs/scheduling/rule_test.go @@ -19,15 +19,9 @@ import ( "sort" "testing" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/tikv/pd/pkg/keyspace" - "github.com/tikv/pd/pkg/mcs/scheduling/server/rule" - "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" - "github.com/tikv/pd/pkg/storage/endpoint" - "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" ) @@ -69,178 +63,9 @@ func (suite *ruleTestSuite) TearDownSuite() { suite.cluster.Destroy() } -func loadRules(re *require.Assertions, ruleStorage endpoint.RuleStorage) (rules []*placement.Rule) { - err := ruleStorage.LoadRules(func(_, v string) { - r, err := placement.NewRuleFromJSON([]byte(v)) - re.NoError(err) - rules = append(rules, r) - }) - re.NoError(err) - return -} - -func loadRuleGroups(re *require.Assertions, ruleStorage endpoint.RuleStorage) (groups []*placement.RuleGroup) { - err := ruleStorage.LoadRuleGroups(func(_, v string) { - rg, err := placement.NewRuleGroupFromJSON([]byte(v)) - re.NoError(err) - groups = append(groups, rg) - }) - re.NoError(err) - return -} - -func loadRegionRules(re *require.Assertions, ruleStorage endpoint.RuleStorage) (rules []*labeler.LabelRule) { - err := ruleStorage.LoadRegionRules(func(_, v string) { - lr, err := labeler.NewLabelRuleFromJSON([]byte(v)) - re.NoError(err) - rules = append(rules, lr) - }) - re.NoError(err) - return -} - func (suite *ruleTestSuite) TestRuleWatch() { re := suite.Require() - ruleStorage := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) - // Create a rule watcher. - _, err := rule.NewWatcher( - suite.ctx, - suite.pdLeaderServer.GetEtcdClient(), - suite.cluster.GetCluster().GetId(), - ruleStorage, - ) - re.NoError(err) - // Check the default rule. - rules := loadRules(re, ruleStorage) - re.Len(rules, 1) - re.Equal("pd", rules[0].GroupID) - re.Equal("default", rules[0].ID) - re.Equal(0, rules[0].Index) - re.Empty(rules[0].StartKey) - re.Empty(rules[0].EndKey) - re.Equal(placement.Voter, rules[0].Role) - re.Empty(rules[0].LocationLabels) - // Check the empty rule group. - ruleGroups := loadRuleGroups(re, ruleStorage) - re.NoError(err) - re.Empty(ruleGroups) - // Set a new rule via the PD API server. - ruleManager := suite.pdLeaderServer.GetRaftCluster().GetRuleManager() - rule := &placement.Rule{ - GroupID: "2", - ID: "3", - Role: "voter", - Count: 1, - StartKeyHex: "22", - EndKeyHex: "dd", - } - err = ruleManager.SetRule(rule) - re.NoError(err) - testutil.Eventually(re, func() bool { - rules = loadRules(re, ruleStorage) - return len(rules) == 2 - }) - sort.Slice(rules, func(i, j int) bool { - return rules[i].ID > rules[j].ID - }) - re.Len(rules, 2) - re.Equal(rule.GroupID, rules[1].GroupID) - re.Equal(rule.ID, rules[1].ID) - re.Equal(rule.Role, rules[1].Role) - re.Equal(rule.Count, rules[1].Count) - re.Equal(rule.StartKeyHex, rules[1].StartKeyHex) - re.Equal(rule.EndKeyHex, rules[1].EndKeyHex) - // Delete the rule. - err = ruleManager.DeleteRule(rule.GroupID, rule.ID) - re.NoError(err) - testutil.Eventually(re, func() bool { - rules = loadRules(re, ruleStorage) - return len(rules) == 1 - }) - re.Len(rules, 1) - re.Equal("pd", rules[0].GroupID) - // Create a new rule group. - ruleGroup := &placement.RuleGroup{ - ID: "2", - Index: 100, - Override: true, - } - err = ruleManager.SetRuleGroup(ruleGroup) - re.NoError(err) - testutil.Eventually(re, func() bool { - ruleGroups = loadRuleGroups(re, ruleStorage) - return len(ruleGroups) == 1 - }) - re.Len(ruleGroups, 1) - re.Equal(ruleGroup.ID, ruleGroups[0].ID) - re.Equal(ruleGroup.Index, ruleGroups[0].Index) - re.Equal(ruleGroup.Override, ruleGroups[0].Override) - // Delete the rule group. - err = ruleManager.DeleteRuleGroup(ruleGroup.ID) - re.NoError(err) - testutil.Eventually(re, func() bool { - ruleGroups = loadRuleGroups(re, ruleStorage) - return len(ruleGroups) == 0 - }) - re.Empty(ruleGroups) - - // Test the region label rule watch. - labelRules := loadRegionRules(re, ruleStorage) - re.Len(labelRules, 1) - defaultKeyspaceRule := keyspace.MakeLabelRule(utils.DefaultKeyspaceID) - re.Equal(defaultKeyspaceRule, labelRules[0]) - // Set a new region label rule. - labelRule := &labeler.LabelRule{ - ID: "rule1", - Labels: []labeler.RegionLabel{{Key: "k1", Value: "v1"}}, - RuleType: "key-range", - Data: labeler.MakeKeyRanges("1234", "5678"), - } - regionLabeler := suite.pdLeaderServer.GetRaftCluster().GetRegionLabeler() - err = regionLabeler.SetLabelRule(labelRule) - re.NoError(err) - testutil.Eventually(re, func() bool { - labelRules = loadRegionRules(re, ruleStorage) - return len(labelRules) == 2 - }) - sort.Slice(labelRules, func(i, j int) bool { - return labelRules[i].ID < labelRules[j].ID - }) - re.Len(labelRules, 2) - re.Equal(labelRule.ID, labelRules[1].ID) - re.Equal(labelRule.Labels, labelRules[1].Labels) - re.Equal(labelRule.RuleType, labelRules[1].RuleType) - // Patch the region label rule. - labelRule = &labeler.LabelRule{ - ID: "rule2", - Labels: []labeler.RegionLabel{{Key: "k2", Value: "v2"}}, - RuleType: "key-range", - Data: labeler.MakeKeyRanges("ab12", "cd12"), - } - patch := labeler.LabelRulePatch{ - SetRules: []*labeler.LabelRule{labelRule}, - DeleteRules: []string{"rule1"}, - } - err = regionLabeler.Patch(patch) - re.NoError(err) - testutil.Eventually(re, func() bool { - labelRules = loadRegionRules(re, ruleStorage) - return len(labelRules) == 2 - }) - sort.Slice(labelRules, func(i, j int) bool { - return labelRules[i].ID < labelRules[j].ID - }) - re.Len(labelRules, 2) - re.Equal(defaultKeyspaceRule, labelRules[0]) - re.Equal(labelRule.ID, labelRules[1].ID) - re.Equal(labelRule.Labels, labelRules[1].Labels) - re.Equal(labelRule.RuleType, labelRules[1].RuleType) -} - -func (suite *ruleTestSuite) TestRuleWatchWithManager() { - re := suite.Require() - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoint) re.NoError(err) defer tc.Destroy() From d99e1410fcca1d75b8e3bad52d313e291862a76a Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 17 Oct 2023 11:56:26 +0800 Subject: [PATCH 3/5] Reload after setting Signed-off-by: JmPotato --- pkg/mcs/scheduling/server/rule/watcher.go | 57 ++++++++++++----------- pkg/schedule/labeler/labeler.go | 8 ++++ pkg/schedule/placement/rule_manager.go | 31 +++++++++++- 3 files changed, 68 insertions(+), 28 deletions(-) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 725df690ab7..2abc84b9c47 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -18,7 +18,6 @@ import ( "context" "strings" "sync" - "sync/atomic" "github.com/pingcap/log" "github.com/tikv/pd/pkg/schedule/checker" @@ -26,6 +25,7 @@ import ( "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/syncutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -53,12 +53,16 @@ type Watcher struct { etcdClient *clientv3.Client ruleStorage endpoint.RuleStorage - // checkerController is used to add the suspect key ranges to the checker when the rule changed. - checkerController atomic.Value - // ruleManager is used to manage the placement rules. - ruleManager atomic.Value - // regionLabeler is used to manage the region label rules. - regionLabeler atomic.Value + // components is used to store the cluster components and protect them with a RWMutex lock. + components struct { + syncutil.RWMutex + // checkerController is used to add the suspect key ranges to the checker when the rule changed. + checkerController *checker.Controller + // ruleManager is used to manage the placement rules. + ruleManager *placement.RuleManager + // regionLabeler is used to manage the region label rules. + regionLabeler *labeler.RegionLabeler + } ruleWatcher *etcdutil.LoopWatcher groupWatcher *etcdutil.LoopWatcher @@ -260,32 +264,33 @@ func (rw *Watcher) SetClusterComponents( sc *checker.Controller, rm *placement.RuleManager, rl *labeler.RegionLabeler, -) { - rw.checkerController.Store(sc) - rw.ruleManager.Store(rm) - rw.regionLabeler.Store(rl) +) error { + rw.components.Lock() + defer rw.components.Unlock() + rw.components.checkerController = sc + rw.components.ruleManager = rm + rw.components.regionLabeler = rl + // Reload the rules to make sure that the rules are consistent with the storage. + if err := rm.Reload(); err != nil { + return err + } + return rl.Reload() } func (rw *Watcher) getCheckerController() *checker.Controller { - cc := rw.checkerController.Load() - if cc == nil { - return nil - } - return cc.(*checker.Controller) + rw.components.RLock() + defer rw.components.RUnlock() + return rw.components.checkerController } func (rw *Watcher) getRuleManager() *placement.RuleManager { - rm := rw.ruleManager.Load() - if rm == nil { - return nil - } - return rm.(*placement.RuleManager) + rw.components.RLock() + defer rw.components.RUnlock() + return rw.components.ruleManager } func (rw *Watcher) getRegionLabeler() *labeler.RegionLabeler { - rl := rw.regionLabeler.Load() - if rl == nil { - return nil - } - return rl.(*labeler.RegionLabeler) + rw.components.RLock() + defer rw.components.RUnlock() + return rw.components.regionLabeler } diff --git a/pkg/schedule/labeler/labeler.go b/pkg/schedule/labeler/labeler.go index 39722b1a038..892ef097405 100644 --- a/pkg/schedule/labeler/labeler.go +++ b/pkg/schedule/labeler/labeler.go @@ -149,6 +149,14 @@ func (l *RegionLabeler) buildRangeList() { l.rangeList = builder.Build() } +// Reload loads rules from storage. +func (l *RegionLabeler) Reload() error { + l.Lock() + defer l.Unlock() + l.labelRules = make(map[string]*LabelRule) + return l.loadRules() +} + // GetSplitKeys returns all split keys in the range (start, end). func (l *RegionLabeler) GetSplitKeys(start, end []byte) [][]byte { l.RLock() diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index 909c0fa1078..6532a4083ed 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -135,8 +135,10 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolat } func (m *RuleManager) loadRules() error { - var toSave []*Rule - var toDelete []string + var ( + toSave []*Rule + toDelete []string + ) err := m.storage.LoadRules(func(k, v string) { r, err := NewRuleFromJSON([]byte(v)) if err != nil { @@ -261,6 +263,31 @@ func (m *RuleManager) adjustRule(r *Rule, groupID string) (err error) { return nil } +// Reload reloads rules from storage. +func (m *RuleManager) Reload() error { + m.Lock() + defer m.Unlock() + // Only allow to reload when it is initialized. + if !m.initialized { + return nil + } + // Force the rule manager to reload rules from storage. + m.ruleConfig = newRuleConfig() + if err := m.loadRules(); err != nil { + return err + } + if err := m.loadGroups(); err != nil { + return err + } + m.ruleConfig.adjust() + ruleList, err := buildRuleList(m.ruleConfig) + if err != nil { + return err + } + m.ruleList = ruleList + return nil +} + // GetRule returns the Rule with the same (group, id). func (m *RuleManager) GetRule(group, id string) *Rule { m.RLock() From 24dffe4b9782214fd3897789f6742ac421571507 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 17 Oct 2023 14:25:23 +0800 Subject: [PATCH 4/5] Clean up the comment Signed-off-by: JmPotato --- pkg/mcs/scheduling/server/rule/watcher.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 2abc84b9c47..ec8659c1319 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -196,7 +196,6 @@ func (rw *Watcher) initializeGroupWatcher() error { } return rm.DeleteRuleGroup(trimmedKey) } - // Trigger the rule manager to reload the rule groups. postEventFn := func() error { return nil } From a78485a93560c57e421eec4f19689fe24e636dbb Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 17 Oct 2023 16:43:29 +0800 Subject: [PATCH 5/5] Start rule watcher after cluster is created Signed-off-by: JmPotato --- pkg/mcs/scheduling/server/rule/watcher.go | 137 +++++----------------- pkg/mcs/scheduling/server/server.go | 22 ++-- pkg/schedule/labeler/labeler.go | 8 -- pkg/schedule/placement/rule_manager.go | 25 ---- 4 files changed, 44 insertions(+), 148 deletions(-) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index ec8659c1319..912fb9c01e5 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -25,7 +25,6 @@ import ( "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" - "github.com/tikv/pd/pkg/utils/syncutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -53,16 +52,12 @@ type Watcher struct { etcdClient *clientv3.Client ruleStorage endpoint.RuleStorage - // components is used to store the cluster components and protect them with a RWMutex lock. - components struct { - syncutil.RWMutex - // checkerController is used to add the suspect key ranges to the checker when the rule changed. - checkerController *checker.Controller - // ruleManager is used to manage the placement rules. - ruleManager *placement.RuleManager - // regionLabeler is used to manage the region label rules. - regionLabeler *labeler.RegionLabeler - } + // checkerController is used to add the suspect key ranges to the checker when the rule changed. + checkerController *checker.Controller + // ruleManager is used to manage the placement rules. + ruleManager *placement.RuleManager + // regionLabeler is used to manage the region label rules. + regionLabeler *labeler.RegionLabeler ruleWatcher *etcdutil.LoopWatcher groupWatcher *etcdutil.LoopWatcher @@ -75,6 +70,9 @@ func NewWatcher( etcdClient *clientv3.Client, clusterID uint64, ruleStorage endpoint.RuleStorage, + checkerController *checker.Controller, + ruleManager *placement.RuleManager, + regionLabeler *labeler.RegionLabeler, ) (*Watcher, error) { ctx, cancel := context.WithCancel(ctx) rw := &Watcher{ @@ -85,6 +83,9 @@ func NewWatcher( regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID), etcdClient: etcdClient, ruleStorage: ruleStorage, + checkerController: checkerController, + ruleManager: ruleManager, + regionLabeler: regionLabeler, } err := rw.initializeRuleWatcher() if err != nil { @@ -104,39 +105,22 @@ func NewWatcher( func (rw *Watcher) initializeRuleWatcher() error { prefixToTrim := rw.rulesPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - key, value := string(kv.Key), string(kv.Value) - log.Info("update placement rule", zap.String("key", key), zap.String("value", value)) - rm := rw.getRuleManager() - // If the rule manager is not set, it means that the cluster is not initialized yet, - // we should save the rule to the storage directly first. - if rm == nil { - // Since the PD API server will validate the rule before saving it to etcd, - // so we could directly save the string rule in JSON to the storage here. - return rw.ruleStorage.SaveRuleJSON( - strings.TrimPrefix(key, prefixToTrim), - value, - ) - } + log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) rule, err := placement.NewRuleFromJSON(kv.Value) if err != nil { return err } // Update the suspect key ranges in the checker. - rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey) - if oldRule := rm.GetRule(rule.GroupID, rule.ID); oldRule != nil { - rw.getCheckerController().AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil { + rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) } - return rm.SetRule(rule) + return rw.ruleManager.SetRule(rule) } deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) log.Info("delete placement rule", zap.String("key", key)) - rm := rw.getRuleManager() - trimmedKey := strings.TrimPrefix(key, prefixToTrim) - if rm == nil { - return rw.ruleStorage.DeleteRule(trimmedKey) - } - ruleJSON, err := rw.ruleStorage.LoadRule(trimmedKey) + ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim)) if err != nil { return err } @@ -144,8 +128,8 @@ func (rw *Watcher) initializeRuleWatcher() error { if err != nil { return err } - rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey) - return rm.DeleteRule(rule.GroupID, rule.ID) + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) } postEventFn := func() error { return nil @@ -164,37 +148,25 @@ func (rw *Watcher) initializeRuleWatcher() error { func (rw *Watcher) initializeGroupWatcher() error { prefixToTrim := rw.ruleGroupPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - key, value := string(kv.Key), string(kv.Value) - log.Info("update placement rule group", zap.String("key", key), zap.String("value", value)) - rm := rw.getRuleManager() - if rm == nil { - return rw.ruleStorage.SaveRuleGroupJSON( - strings.TrimPrefix(key, prefixToTrim), - value, - ) - } + log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value) if err != nil { return err } // Add all rule key ranges within the group to the suspect key ranges. - for _, rule := range rm.GetRulesByGroup(ruleGroup.ID) { - rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey) + for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) { + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) } - return rm.SetRuleGroup(ruleGroup) + return rw.ruleManager.SetRuleGroup(ruleGroup) } deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) log.Info("delete placement rule group", zap.String("key", key)) - rm := rw.getRuleManager() trimmedKey := strings.TrimPrefix(key, prefixToTrim) - if rm == nil { - return rw.ruleStorage.DeleteRuleGroup(trimmedKey) + for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) { + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) } - for _, rule := range rm.GetRulesByGroup(trimmedKey) { - rw.getCheckerController().AddSuspectKeyRange(rule.StartKey, rule.EndKey) - } - return rm.DeleteRuleGroup(trimmedKey) + return rw.ruleManager.DeleteRuleGroup(trimmedKey) } postEventFn := func() error { return nil @@ -213,30 +185,17 @@ func (rw *Watcher) initializeGroupWatcher() error { func (rw *Watcher) initializeRegionLabelWatcher() error { prefixToTrim := rw.regionLabelPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - key, value := string(kv.Key), string(kv.Value) - log.Info("update region label rule", zap.String("key", key), zap.String("value", value)) - rl := rw.getRegionLabeler() - if rl == nil { - return rw.ruleStorage.SaveRegionRuleJSON( - strings.TrimPrefix(key, prefixToTrim), - value, - ) - } + log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) rule, err := labeler.NewLabelRuleFromJSON(kv.Value) if err != nil { return err } - return rl.SetLabelRule(rule) + return rw.regionLabeler.SetLabelRule(rule) } deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) log.Info("delete region label rule", zap.String("key", key)) - rl := rw.getRegionLabeler() - trimmedKey := strings.TrimPrefix(key, prefixToTrim) - if rl == nil { - return rw.ruleStorage.DeleteRegionRule(trimmedKey) - } - return rl.DeleteLabelRule(trimmedKey) + return rw.regionLabeler.DeleteLabelRule(strings.TrimPrefix(key, prefixToTrim)) } postEventFn := func() error { return nil @@ -257,39 +216,3 @@ func (rw *Watcher) Close() { rw.cancel() rw.wg.Wait() } - -// SetClusterComponents sets the cluster components for the watcher. -func (rw *Watcher) SetClusterComponents( - sc *checker.Controller, - rm *placement.RuleManager, - rl *labeler.RegionLabeler, -) error { - rw.components.Lock() - defer rw.components.Unlock() - rw.components.checkerController = sc - rw.components.ruleManager = rm - rw.components.regionLabeler = rl - // Reload the rules to make sure that the rules are consistent with the storage. - if err := rm.Reload(); err != nil { - return err - } - return rl.Reload() -} - -func (rw *Watcher) getCheckerController() *checker.Controller { - rw.components.RLock() - defer rw.components.RUnlock() - return rw.components.checkerController -} - -func (rw *Watcher) getRuleManager() *placement.RuleManager { - rw.components.RLock() - defer rw.components.RUnlock() - return rw.components.ruleManager -} - -func (rw *Watcher) getRegionLabeler() *labeler.RegionLabeler { - rw.components.RLock() - defer rw.components.RUnlock() - return rw.components.regionLabeler -} diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 3c326783f05..9caae932037 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -455,7 +455,7 @@ func (s *Server) startServer() (err error) { func (s *Server) startCluster(context.Context) error { s.basicCluster = core.NewBasicCluster() s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) - err := s.startWatcher() + err := s.startMetaConfWatcher() if err != nil { return err } @@ -464,12 +464,13 @@ func (s *Server) startCluster(context.Context) error { if err != nil { return err } - // Inject the cluster components into the watchers. + // Inject the cluster components into the config watcher after the scheduler controller is created. s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController()) - s.ruleWatcher.SetClusterComponents( - s.cluster.GetCoordinator().GetCheckerController(), - s.cluster.GetRuleManager(), - s.cluster.GetRegionLabeler()) + // Start the rule watcher after the cluster is created. + err = s.startRuleWatcher() + if err != nil { + return err + } s.cluster.StartBackgroundJobs() return nil } @@ -479,7 +480,7 @@ func (s *Server) stopCluster() { s.stopWatcher() } -func (s *Server) startWatcher() (err error) { +func (s *Server) startMetaConfWatcher() (err error) { s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster) if err != nil { return err @@ -488,7 +489,12 @@ func (s *Server) startWatcher() (err error) { if err != nil { return err } - s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage) + return err +} + +func (s *Server) startRuleWatcher() (err error) { + s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage, + s.cluster.GetCoordinator().GetCheckerController(), s.cluster.GetRuleManager(), s.cluster.GetRegionLabeler()) return err } diff --git a/pkg/schedule/labeler/labeler.go b/pkg/schedule/labeler/labeler.go index 892ef097405..39722b1a038 100644 --- a/pkg/schedule/labeler/labeler.go +++ b/pkg/schedule/labeler/labeler.go @@ -149,14 +149,6 @@ func (l *RegionLabeler) buildRangeList() { l.rangeList = builder.Build() } -// Reload loads rules from storage. -func (l *RegionLabeler) Reload() error { - l.Lock() - defer l.Unlock() - l.labelRules = make(map[string]*LabelRule) - return l.loadRules() -} - // GetSplitKeys returns all split keys in the range (start, end). func (l *RegionLabeler) GetSplitKeys(start, end []byte) [][]byte { l.RLock() diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index 6532a4083ed..bdca4cc1b19 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -263,31 +263,6 @@ func (m *RuleManager) adjustRule(r *Rule, groupID string) (err error) { return nil } -// Reload reloads rules from storage. -func (m *RuleManager) Reload() error { - m.Lock() - defer m.Unlock() - // Only allow to reload when it is initialized. - if !m.initialized { - return nil - } - // Force the rule manager to reload rules from storage. - m.ruleConfig = newRuleConfig() - if err := m.loadRules(); err != nil { - return err - } - if err := m.loadGroups(); err != nil { - return err - } - m.ruleConfig.adjust() - ruleList, err := buildRuleList(m.ruleConfig) - if err != nil { - return err - } - m.ruleList = ruleList - return nil -} - // GetRule returns the Rule with the same (group, id). func (m *RuleManager) GetRule(group, id string) *Rule { m.RLock()