Skip to content

Commit

Permalink
Store rules into the unified storage
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Oct 16, 2023
1 parent d0193d0 commit 5960e88
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 91 deletions.
98 changes: 10 additions & 88 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,80 +27,6 @@ import (
"go.uber.org/zap"
)

// ruleStorage is an in-memory storage for Placement Rules,
// which will implement the `endpoint.RuleStorage` interface.
type ruleStorage struct {
// Rule key -> rule value.
rules sync.Map
// GroupID -> rule group value.
groups sync.Map
// Region rule key -> rule value.
regionRules sync.Map
}

// LoadRules loads Placement Rules from storage.
func (rs *ruleStorage) LoadRules(f func(k, v string)) error {
rs.rules.Range(func(k, v interface{}) bool {
f(k.(string), v.(string))
return true
})
return nil
}

// SaveRule stores a rule cfg to the rulesPathPrefix.
func (rs *ruleStorage) SaveRule(ruleKey string, rule interface{}) error {
rs.rules.Store(ruleKey, rule)
return nil
}

// DeleteRule removes a rule from storage.
func (rs *ruleStorage) DeleteRule(ruleKey string) error {
rs.rules.Delete(ruleKey)
return nil
}

// LoadRuleGroups loads all rule groups from storage.
func (rs *ruleStorage) LoadRuleGroups(f func(k, v string)) error {
rs.groups.Range(func(k, v interface{}) bool {
f(k.(string), v.(string))
return true
})
return nil
}

// SaveRuleGroup stores a rule group config to storage.
func (rs *ruleStorage) SaveRuleGroup(groupID string, group interface{}) error {
rs.groups.Store(groupID, group)
return nil
}

// DeleteRuleGroup removes a rule group from storage.
func (rs *ruleStorage) DeleteRuleGroup(groupID string) error {
rs.groups.Delete(groupID)
return nil
}

// LoadRegionRules loads region rules from storage.
func (rs *ruleStorage) LoadRegionRules(f func(k, v string)) error {
rs.regionRules.Range(func(k, v interface{}) bool {
f(k.(string), v.(string))
return true
})
return nil
}

// SaveRegionRule saves a region rule to the storage.
func (rs *ruleStorage) SaveRegionRule(ruleKey string, rule interface{}) error {
rs.regionRules.Store(ruleKey, rule)
return nil
}

// DeleteRegionRule removes a region rule from storage.
func (rs *ruleStorage) DeleteRegionRule(ruleKey string) error {
rs.regionRules.Delete(ruleKey)
return nil
}

// Watcher is used to watch the PD API server for any Placement Rule changes.
type Watcher struct {
ctx context.Context
Expand All @@ -120,8 +46,8 @@ type Watcher struct {
// - Value: labeler.LabelRule
regionLabelPathPrefix string

etcdClient *clientv3.Client
ruleStore *ruleStorage
etcdClient *clientv3.Client
ruleStorage endpoint.RuleStorage

ruleWatcher *etcdutil.LoopWatcher
groupWatcher *etcdutil.LoopWatcher
Expand All @@ -134,6 +60,7 @@ func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
clusterID uint64,
ruleStorage endpoint.RuleStorage,
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
rw := &Watcher{
Expand All @@ -143,7 +70,7 @@ func NewWatcher(
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: etcdClient,
ruleStore: &ruleStorage{},
ruleStorage: ruleStorage,
}
err := rw.initializeRuleWatcher()
if err != nil {
Expand All @@ -166,14 +93,14 @@ func (rw *Watcher) initializeRuleWatcher() error {
// Since the PD API server will validate the rule before saving it to etcd,
// so we could directly save the string rule in JSON to the storage here.
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRule(
return rw.ruleStorage.SaveRuleJSON(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule", zap.String("key", string(kv.Key)))
return rw.ruleStore.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
return rw.ruleStorage.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
return nil
Expand All @@ -193,14 +120,14 @@ func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRuleGroup(
return rw.ruleStorage.SaveRuleGroupJSON(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule group", zap.String("key", string(kv.Key)))
return rw.ruleStore.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim))
return rw.ruleStorage.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
return nil
Expand All @@ -220,14 +147,14 @@ func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRegionRule(
return rw.ruleStorage.SaveRegionRuleJSON(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete region label rule", zap.String("key", string(kv.Key)))
return rw.ruleStore.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
return rw.ruleStorage.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
return nil
Expand All @@ -248,8 +175,3 @@ func (rw *Watcher) Close() {
rw.cancel()
rw.wg.Wait()
}

// GetRuleStorage returns the rule storage.
func (rw *Watcher) GetRuleStorage() endpoint.RuleStorage {
return rw.ruleStore
}
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func (s *Server) startWatcher() (err error) {
if err != nil {
return err
}
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID)
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage)
return err
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/endpoint/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ import (
type RuleStorage interface {
LoadRules(f func(k, v string)) error
SaveRule(ruleKey string, rule interface{}) error
SaveRuleJSON(ruleKey, rule string) error
DeleteRule(ruleKey string) error
LoadRuleGroups(f func(k, v string)) error
SaveRuleGroup(groupID string, group interface{}) error
SaveRuleGroupJSON(groupID, group string) error
DeleteRuleGroup(groupID string) error
LoadRegionRules(f func(k, v string)) error
SaveRegionRule(ruleKey string, rule interface{}) error
SaveRegionRuleJSON(ruleKey, rule string) error
DeleteRegionRule(ruleKey string) error
}

Expand All @@ -40,6 +43,11 @@ func (se *StorageEndpoint) SaveRule(ruleKey string, rule interface{}) error {
return se.saveJSON(ruleKeyPath(ruleKey), rule)
}

// SaveRuleJSON stores a rule cfg JSON to the rulesPath.
func (se *StorageEndpoint) SaveRuleJSON(ruleKey, rule string) error {
return se.Save(ruleKeyPath(ruleKey), rule)
}

// DeleteRule removes a rule from storage.
func (se *StorageEndpoint) DeleteRule(ruleKey string) error {
return se.Remove(ruleKeyPath(ruleKey))
Expand All @@ -55,6 +63,11 @@ func (se *StorageEndpoint) SaveRuleGroup(groupID string, group interface{}) erro
return se.saveJSON(ruleGroupIDPath(groupID), group)
}

// SaveRuleGroupJSON stores a rule group config JSON to storage.
func (se *StorageEndpoint) SaveRuleGroupJSON(groupID, group string) error {
return se.Save(ruleGroupIDPath(groupID), group)
}

// DeleteRuleGroup removes a rule group from storage.
func (se *StorageEndpoint) DeleteRuleGroup(groupID string) error {
return se.Remove(ruleGroupIDPath(groupID))
Expand All @@ -70,6 +83,11 @@ func (se *StorageEndpoint) SaveRegionRule(ruleKey string, rule interface{}) erro
return se.saveJSON(regionLabelKeyPath(ruleKey), rule)
}

// SaveRegionRuleJSON saves a region rule JSON to the storage.
func (se *StorageEndpoint) SaveRegionRuleJSON(ruleKey, rule string) error {
return se.Save(regionLabelKeyPath(ruleKey), rule)
}

// DeleteRegionRule removes a region rule from storage.
func (se *StorageEndpoint) DeleteRegionRule(ruleKey string) error {
return se.Remove(regionLabelKeyPath(ruleKey))
Expand Down
6 changes: 4 additions & 2 deletions tests/integrations/mcs/scheduling/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/tests"
)
Expand Down Expand Up @@ -99,14 +100,15 @@ func loadRegionRules(re *require.Assertions, ruleStorage endpoint.RuleStorage) (
func (suite *ruleTestSuite) TestRuleWatch() {
re := suite.Require()

ruleStorage := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
// Create a rule watcher.
watcher, err := rule.NewWatcher(
_, err := rule.NewWatcher(
suite.ctx,
suite.pdLeaderServer.GetEtcdClient(),
suite.cluster.GetCluster().GetId(),
ruleStorage,
)
re.NoError(err)
ruleStorage := watcher.GetRuleStorage()
// Check the default rule.
rules := loadRules(re, ruleStorage)
re.Len(rules, 1)
Expand Down

0 comments on commit 5960e88

Please sign in to comment.