Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Dec 19, 2024
1 parent 335b245 commit 667e8c1
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 30 deletions.
3 changes: 1 addition & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ func NewCluster(
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
c.ruleManager.SetInMicroService(true)
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel())
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel(), true)
if err != nil {
cancel()
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
func (mc *Cluster) initRuleManager() {
if mc.RuleManager == nil {
mc.RuleManager = placement.NewRuleManager(mc.ctx, mc.GetStorage(), mc, mc.GetSharedConfig())
mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel)
mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel, false)
}
}

Expand Down
18 changes: 5 additions & 13 deletions pkg/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@ type RuleManager struct {
ctx context.Context
storage endpoint.RuleStorage
syncutil.RWMutex
initialized bool
inMicroService bool
ruleConfig *ruleConfig
ruleList ruleList
initialized bool
ruleConfig *ruleConfig
ruleList ruleList

// used for rule validation
keyType string
Expand All @@ -83,15 +82,15 @@ func NewRuleManager(ctx context.Context, storage endpoint.RuleStorage, storeSetI

// Initialize loads rules from storage. If Placement Rules feature is never enabled, it creates default rule that is
// compatible with previous configuration.
func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolationLevel string) error {
func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolationLevel string, skipLoadRules bool) error {
m.Lock()
defer m.Unlock()
if m.initialized {
return nil
}
// If RuleManager is initialized in micro service,
// it will load from etcd watcher and do not modify rule directly.
if m.inMicroService {
if skipLoadRules {
m.ruleList = ruleList{
rangeList: rangelist.List{},
}
Expand Down Expand Up @@ -842,13 +841,6 @@ func (m *RuleManager) SetKeyType(h string) *RuleManager {
return m
}

// SetInMicroService set whether rule manager is in micro service.
func (m *RuleManager) SetInMicroService(v bool) {
m.Lock()
defer m.Unlock()
m.inMicroService = v
}

func getStoresByRegion(storeSet StoreSet, region *core.RegionInfo) []*core.StoreInfo {
r := make([]*core.StoreInfo, 0, len(region.GetPeers()))
for _, peer := range region.GetPeers() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/placement/rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *Ru
var err error
manager := NewRuleManager(context.Background(), store, nil, mockconfig.NewTestOptions())
manager.conf.SetEnableWitness(enableWitness)
err = manager.Initialize(3, []string{"zone", "rack", "host"}, "")
err = manager.Initialize(3, []string{"zone", "rack", "host"}, "", false)
re.NoError(err)
return store, manager
}
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestSaveLoad(t *testing.T) {
}

m2 := NewRuleManager(context.Background(), store, nil, nil)
err := m2.Initialize(3, []string{"no", "labels"}, "")
err := m2.Initialize(3, []string{"no", "labels"}, "", false)
re.NoError(err)
re.Len(m2.GetAllRules(), 3)
re.Equal(m2.GetRule(DefaultGroupID, DefaultRuleID).String(), rules[0].String())
Expand All @@ -178,7 +178,7 @@ func TestSetAfterGet(t *testing.T) {
manager.SetRule(rule)

m2 := NewRuleManager(context.Background(), store, nil, nil)
err := m2.Initialize(100, []string{}, "")
err := m2.Initialize(100, []string{}, "", false)
re.NoError(err)
rule = m2.GetRule(DefaultGroupID, DefaultRuleID)
re.Equal(1, rule.Count)
Expand Down
6 changes: 3 additions & 3 deletions pkg/statistics/region_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestRegionStatistics(t *testing.T) {
re := require.New(t)
store := storage.NewStorageWithMemoryBackend()
manager := placement.NewRuleManager(context.Background(), store, nil, nil)
err := manager.Initialize(3, []string{"zone", "rack", "host"}, "")
err := manager.Initialize(3, []string{"zone", "rack", "host"}, "", false)
re.NoError(err)
opt := mockconfig.NewTestOptions()
opt.SetPlacementRuleEnabled(false)
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestRegionStatisticsWithPlacementRule(t *testing.T) {
re := require.New(t)
store := storage.NewStorageWithMemoryBackend()
manager := placement.NewRuleManager(context.Background(), store, nil, nil)
err := manager.Initialize(3, []string{"zone", "rack", "host"}, "")
err := manager.Initialize(3, []string{"zone", "rack", "host"}, "", false)
re.NoError(err)
opt := mockconfig.NewTestOptions()
opt.SetPlacementRuleEnabled(true)
Expand Down Expand Up @@ -276,7 +276,7 @@ func BenchmarkObserve(b *testing.B) {
// Setup
store := storage.NewStorageWithMemoryBackend()
manager := placement.NewRuleManager(context.Background(), store, nil, nil)
manager.Initialize(3, []string{"zone", "rack", "host"}, "")
manager.Initialize(3, []string{"zone", "rack", "host"}, "", false)
opt := mockconfig.NewTestOptions()
opt.SetPlacementRuleEnabled(false)
peers := []*metapb.Peer{
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (c *RaftCluster) InitCluster(
c.hbstreams = hbstreams
c.ruleManager = placement.NewRuleManager(c.ctx, c.storage, c, c.GetOpts())
if c.opt.IsPlacementRulesEnabled() {
err := c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel())
err := c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel(), false)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func TestSetOfflineStore(t *testing.T) {
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts())
if opt.IsPlacementRulesEnabled() {
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel())
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -450,7 +450,7 @@ func TestUpStore(t *testing.T) {
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts())
if opt.IsPlacementRulesEnabled() {
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel())
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -553,7 +553,7 @@ func TestDeleteStoreUpdatesClusterVersion(t *testing.T) {
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts())
if opt.IsPlacementRulesEnabled() {
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel())
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -1324,7 +1324,7 @@ func TestOfflineAndMerge(t *testing.T) {
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts())
if opt.IsPlacementRulesEnabled() {
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel())
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -2187,7 +2187,7 @@ func newTestRaftCluster(
rc.InitCluster(id, opt, nil, nil)
rc.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), rc, opt)
if opt.IsPlacementRulesEnabled() {
err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel())
err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error {
}
if cfg.EnablePlacementRules {
// initialize rule manager.
if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel); err != nil {
if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel, false); err != nil {
return err
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion tests/server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te
svr.GetRaftCluster().GetOpts().GetMaxReplicas(),
svr.GetRaftCluster().GetOpts().GetLocationLabels(),
svr.GetRaftCluster().GetOpts().GetIsolationLevel(),
)
false)
re.NoError(err)
}
if len(testCase.rules) > 0 {
Expand Down

0 comments on commit 667e8c1

Please sign in to comment.