From 667e8c19e1f798174b1c4b62a199c1b0510f2d4e Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 19 Dec 2024 16:50:08 +0800 Subject: [PATCH] address comments Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/cluster.go | 3 +-- pkg/mock/mockcluster/mockcluster.go | 2 +- pkg/schedule/placement/rule_manager.go | 18 +++++------------- pkg/schedule/placement/rule_manager_test.go | 6 +++--- pkg/statistics/region_collection_test.go | 6 +++--- server/cluster/cluster.go | 2 +- server/cluster/cluster_test.go | 10 +++++----- server/server.go | 2 +- tests/server/api/operator_test.go | 2 +- 9 files changed, 21 insertions(+), 30 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 28b1bffba95..5c7166fba09 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -113,8 +113,7 @@ func NewCluster( logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) - c.ruleManager.SetInMicroService(true) - err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) + err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel(), true) if err != nil { cancel() return nil, err diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 45d8e35a0bc..62498bff84f 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -205,7 +205,7 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { func (mc *Cluster) initRuleManager() { if mc.RuleManager == nil { mc.RuleManager = placement.NewRuleManager(mc.ctx, mc.GetStorage(), mc, mc.GetSharedConfig()) - mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel) + mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel, false) } } diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index 1a29d48a486..4470ff28424 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -57,10 +57,9 @@ type RuleManager struct { ctx context.Context storage endpoint.RuleStorage syncutil.RWMutex - initialized bool - inMicroService bool - ruleConfig *ruleConfig - ruleList ruleList + initialized bool + ruleConfig *ruleConfig + ruleList ruleList // used for rule validation keyType string @@ -83,7 +82,7 @@ func NewRuleManager(ctx context.Context, storage endpoint.RuleStorage, storeSetI // Initialize loads rules from storage. If Placement Rules feature is never enabled, it creates default rule that is // compatible with previous configuration. -func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolationLevel string) error { +func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolationLevel string, skipLoadRules bool) error { m.Lock() defer m.Unlock() if m.initialized { @@ -91,7 +90,7 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolat } // If RuleManager is initialized in micro service, // it will load from etcd watcher and do not modify rule directly. - if m.inMicroService { + if skipLoadRules { m.ruleList = ruleList{ rangeList: rangelist.List{}, } @@ -842,13 +841,6 @@ func (m *RuleManager) SetKeyType(h string) *RuleManager { return m } -// SetInMicroService set whether rule manager is in micro service. -func (m *RuleManager) SetInMicroService(v bool) { - m.Lock() - defer m.Unlock() - m.inMicroService = v -} - func getStoresByRegion(storeSet StoreSet, region *core.RegionInfo) []*core.StoreInfo { r := make([]*core.StoreInfo, 0, len(region.GetPeers())) for _, peer := range region.GetPeers() { diff --git a/pkg/schedule/placement/rule_manager_test.go b/pkg/schedule/placement/rule_manager_test.go index 5f5f457da13..29d862ddb5e 100644 --- a/pkg/schedule/placement/rule_manager_test.go +++ b/pkg/schedule/placement/rule_manager_test.go @@ -37,7 +37,7 @@ func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *Ru var err error manager := NewRuleManager(context.Background(), store, nil, mockconfig.NewTestOptions()) manager.conf.SetEnableWitness(enableWitness) - err = manager.Initialize(3, []string{"zone", "rack", "host"}, "") + err = manager.Initialize(3, []string{"zone", "rack", "host"}, "", false) re.NoError(err) return store, manager } @@ -160,7 +160,7 @@ func TestSaveLoad(t *testing.T) { } m2 := NewRuleManager(context.Background(), store, nil, nil) - err := m2.Initialize(3, []string{"no", "labels"}, "") + err := m2.Initialize(3, []string{"no", "labels"}, "", false) re.NoError(err) re.Len(m2.GetAllRules(), 3) re.Equal(m2.GetRule(DefaultGroupID, DefaultRuleID).String(), rules[0].String()) @@ -178,7 +178,7 @@ func TestSetAfterGet(t *testing.T) { manager.SetRule(rule) m2 := NewRuleManager(context.Background(), store, nil, nil) - err := m2.Initialize(100, []string{}, "") + err := m2.Initialize(100, []string{}, "", false) re.NoError(err) rule = m2.GetRule(DefaultGroupID, DefaultRuleID) re.Equal(1, rule.Count) diff --git a/pkg/statistics/region_collection_test.go b/pkg/statistics/region_collection_test.go index f97dffa893d..64442423c02 100644 --- a/pkg/statistics/region_collection_test.go +++ b/pkg/statistics/region_collection_test.go @@ -33,7 +33,7 @@ func TestRegionStatistics(t *testing.T) { re := require.New(t) store := storage.NewStorageWithMemoryBackend() manager := placement.NewRuleManager(context.Background(), store, nil, nil) - err := manager.Initialize(3, []string{"zone", "rack", "host"}, "") + err := manager.Initialize(3, []string{"zone", "rack", "host"}, "", false) re.NoError(err) opt := mockconfig.NewTestOptions() opt.SetPlacementRuleEnabled(false) @@ -122,7 +122,7 @@ func TestRegionStatisticsWithPlacementRule(t *testing.T) { re := require.New(t) store := storage.NewStorageWithMemoryBackend() manager := placement.NewRuleManager(context.Background(), store, nil, nil) - err := manager.Initialize(3, []string{"zone", "rack", "host"}, "") + err := manager.Initialize(3, []string{"zone", "rack", "host"}, "", false) re.NoError(err) opt := mockconfig.NewTestOptions() opt.SetPlacementRuleEnabled(true) @@ -276,7 +276,7 @@ func BenchmarkObserve(b *testing.B) { // Setup store := storage.NewStorageWithMemoryBackend() manager := placement.NewRuleManager(context.Background(), store, nil, nil) - manager.Initialize(3, []string{"zone", "rack", "host"}, "") + manager.Initialize(3, []string{"zone", "rack", "host"}, "", false) opt := mockconfig.NewTestOptions() opt.SetPlacementRuleEnabled(false) peers := []*metapb.Peer{ diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 5ecd787956d..699b43e7901 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -307,7 +307,7 @@ func (c *RaftCluster) InitCluster( c.hbstreams = hbstreams c.ruleManager = placement.NewRuleManager(c.ctx, c.storage, c, c.GetOpts()) if c.opt.IsPlacementRulesEnabled() { - err := c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel()) + err := c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel(), false) if err != nil { return err } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 11b5743fa13..2f6d04bbf52 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -253,7 +253,7 @@ func TestSetOfflineStore(t *testing.T) { cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false) if err != nil { panic(err) } @@ -450,7 +450,7 @@ func TestUpStore(t *testing.T) { cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false) if err != nil { panic(err) } @@ -553,7 +553,7 @@ func TestDeleteStoreUpdatesClusterVersion(t *testing.T) { cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false) if err != nil { panic(err) } @@ -1324,7 +1324,7 @@ func TestOfflineAndMerge(t *testing.T) { cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false) if err != nil { panic(err) } @@ -2187,7 +2187,7 @@ func newTestRaftCluster( rc.InitCluster(id, opt, nil, nil) rc.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), rc, opt) if opt.IsPlacementRulesEnabled() { - err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) + err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false) if err != nil { panic(err) } diff --git a/server/server.go b/server/server.go index a2cc32db9dd..3f397da4d0b 100644 --- a/server/server.go +++ b/server/server.go @@ -1047,7 +1047,7 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error { } if cfg.EnablePlacementRules { // initialize rule manager. - if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel); err != nil { + if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel, false); err != nil { return err } } else { diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index fd08a5ed556..54abdf2d236 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -472,7 +472,7 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te svr.GetRaftCluster().GetOpts().GetMaxReplicas(), svr.GetRaftCluster().GetOpts().GetLocationLabels(), svr.GetRaftCluster().GetOpts().GetIsolationLevel(), - ) + false) re.NoError(err) } if len(testCase.rules) > 0 {