From ad8addd9cdd77d930475782029ae25dceb08081d Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 19 Dec 2024 16:05:49 +0800 Subject: [PATCH 1/3] mcs: fix rule manager initialize Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/cluster.go | 1 + pkg/schedule/placement/rule_manager.go | 24 +++++++- .../integrations/mcs/scheduling/rule_test.go | 60 ++++++++++++++++++- .../mcs/scheduling/server_test.go | 24 ++++---- tests/scheduling_cluster.go | 11 +++- tests/testutil.go | 2 +- 6 files changed, 102 insertions(+), 20 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 20e5acca379..28b1bffba95 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -113,6 +113,7 @@ func NewCluster( logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) + c.ruleManager.SetInMicroService(true) err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) if err != nil { cancel() diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index f5101f0250c..1a29d48a486 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -35,6 +35,7 @@ import ( "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/rangelist" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" @@ -56,9 +57,10 @@ type RuleManager struct { ctx context.Context storage endpoint.RuleStorage syncutil.RWMutex - initialized bool - ruleConfig *ruleConfig - ruleList ruleList + initialized bool + inMicroService bool + ruleConfig *ruleConfig + ruleList ruleList // used for rule validation keyType string @@ -87,6 +89,15 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolat if m.initialized { return nil } + // If RuleManager is initialized in micro service, + // it will load from etcd watcher and do not modify rule directly. + if m.inMicroService { + m.ruleList = ruleList{ + rangeList: rangelist.List{}, + } + m.initialized = true + return nil + } if err := m.loadRules(); err != nil { return err @@ -831,6 +842,13 @@ func (m *RuleManager) SetKeyType(h string) *RuleManager { return m } +// SetInMicroService set whether rule manager is in micro service. +func (m *RuleManager) SetInMicroService(v bool) { + m.Lock() + defer m.Unlock() + m.inMicroService = v +} + func getStoresByRegion(storeSet StoreSet, region *core.RegionInfo) []*core.StoreInfo { r := make([]*core.StoreInfo, 0, len(region.GetPeers())) for _, peer := range region.GetPeers() { diff --git a/tests/integrations/mcs/scheduling/rule_test.go b/tests/integrations/mcs/scheduling/rule_test.go index a137619afbf..880dfddbb16 100644 --- a/tests/integrations/mcs/scheduling/rule_test.go +++ b/tests/integrations/mcs/scheduling/rule_test.go @@ -16,11 +16,15 @@ package scheduling import ( "context" + "encoding/json" + "fmt" "sort" "testing" "github.com/stretchr/testify/suite" + "github.com/pingcap/failpoint" + "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/testutil" @@ -46,6 +50,7 @@ func TestRule(t *testing.T) { func (suite *ruleTestSuite) SetupSuite() { re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) @@ -63,12 +68,14 @@ func (suite *ruleTestSuite) SetupSuite() { func (suite *ruleTestSuite) TearDownSuite() { suite.cancel() suite.cluster.Destroy() + re := suite.Require() + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) } func (suite *ruleTestSuite) TestRuleWatch() { re := suite.Require() - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoint) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster) re.NoError(err) defer tc.Destroy() @@ -205,3 +212,54 @@ func (suite *ruleTestSuite) TestRuleWatch() { re.Equal(labelRule.Labels, labelRules[1].Labels) re.Equal(labelRule.RuleType, labelRules[1].RuleType) } + +func (suite *ruleTestSuite) TestSchedulingSwitch() { + re := suite.Require() + + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.cluster) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + // Add a new rule from "" to "" + url := fmt.Sprintf("%s/pd/api/v1/config/placement-rule", suite.pdLeaderServer.GetAddr()) + respBundle := make([]placement.GroupBundle, 0) + testutil.Eventually(re, func() bool { + err = testutil.CheckGetJSON(tests.TestDialClient, url, nil, + testutil.StatusOK(re), testutil.ExtractJSON(re, &respBundle)) + re.NoError(err) + return len(respBundle) == 1 && len(respBundle[0].Rules) == 1 + }) + + b2 := placement.GroupBundle{ + ID: "pd", + Index: 1, + Rules: []*placement.Rule{ + {GroupID: "pd", ID: "rule0", Index: 1, Role: placement.Voter, Count: 3}, + }, + } + data, err := json.Marshal(b2) + re.NoError(err) + + err = testutil.CheckPostJSON(tests.TestDialClient, url+"/pd", data, testutil.StatusOK(re)) + re.NoError(err) + testutil.Eventually(re, func() bool { + err = testutil.CheckGetJSON(tests.TestDialClient, url, nil, + testutil.StatusOK(re), testutil.ExtractJSON(re, &respBundle)) + re.NoError(err) + return len(respBundle) == 1 && len(respBundle[0].Rules) == 1 + }) + + // Switch another server + oldPrimary := tc.GetPrimaryServer() + oldPrimary.Close() + tc.WaitForPrimaryServing(re) + newPrimary := tc.GetPrimaryServer() + re.NotEqual(oldPrimary.GetAddr(), newPrimary.GetAddr()) + testutil.Eventually(re, func() bool { + err = testutil.CheckGetJSON(tests.TestDialClient, url, nil, + testutil.StatusOK(re), testutil.ExtractJSON(re, &respBundle)) + re.NoError(err) + return len(respBundle) == 1 && len(respBundle[0].Rules) == 1 + }) +} diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index d3850e4667c..ea1e9df0b50 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -91,7 +91,7 @@ func (suite *serverTestSuite) TearDownSuite() { func (suite *serverTestSuite) TestAllocID() { re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`)) - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) @@ -110,7 +110,7 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() { err = pd2.Run() re.NotEmpty(suite.cluster.WaitLeader()) re.NoError(err) - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) @@ -138,7 +138,7 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() { func (suite *serverTestSuite) TestPrimaryChange() { re := suite.Require() - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.cluster) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) @@ -164,7 +164,7 @@ func (suite *serverTestSuite) TestPrimaryChange() { func (suite *serverTestSuite) TestForwardStoreHeartbeat() { re := suite.Require() - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) @@ -225,7 +225,7 @@ func (suite *serverTestSuite) TestSchedulingServiceFallback() { return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() }) - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) @@ -242,7 +242,7 @@ func (suite *serverTestSuite) TestSchedulingServiceFallback() { testutil.Eventually(re, func() bool { return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() }) - tc1, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + tc1, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster) re.NoError(err) defer tc1.Destroy() tc1.WaitForPrimaryServing(re) @@ -278,7 +278,7 @@ func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() { return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() }) - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) @@ -302,7 +302,7 @@ func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() { func (suite *serverTestSuite) TestSchedulerSync() { re := suite.Require() - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) @@ -422,7 +422,7 @@ func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller, func (suite *serverTestSuite) TestForwardRegionHeartbeat() { re := suite.Require() - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) @@ -499,7 +499,7 @@ func (suite *serverTestSuite) TestForwardRegionHeartbeat() { func (suite *serverTestSuite) TestStoreLimit() { re := suite.Require() - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) @@ -660,7 +660,7 @@ func (suite *multipleServerTestSuite) TestReElectLeader() { defer func() { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck")) }() - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) @@ -692,7 +692,7 @@ func (suite *multipleServerTestSuite) TestReElectLeader() { func (suite *serverTestSuite) TestOnlineProgress() { re := suite.Require() - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) diff --git a/tests/scheduling_cluster.go b/tests/scheduling_cluster.go index b5fc2429043..5aa1e220be9 100644 --- a/tests/scheduling_cluster.go +++ b/tests/scheduling_cluster.go @@ -22,6 +22,7 @@ import ( scheduling "github.com/tikv/pd/pkg/mcs/scheduling/server" sc "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" @@ -31,17 +32,19 @@ import ( type TestSchedulingCluster struct { ctx context.Context + pd *TestCluster backendEndpoints string servers map[string]*scheduling.Server cleanupFuncs map[string]testutil.CleanupFunc } // NewTestSchedulingCluster creates a new scheduling test cluster. -func NewTestSchedulingCluster(ctx context.Context, initialServerCount int, backendEndpoints string) (tc *TestSchedulingCluster, err error) { +func NewTestSchedulingCluster(ctx context.Context, initialServerCount int, pd *TestCluster) (tc *TestSchedulingCluster, err error) { schedulers.Register() tc = &TestSchedulingCluster{ ctx: ctx, - backendEndpoints: backendEndpoints, + pd: pd, + backendEndpoints: pd.GetLeaderServer().GetAddr(), servers: make(map[string]*scheduling.Server, initialServerCount), cleanupFuncs: make(map[string]testutil.CleanupFunc, initialServerCount), } @@ -115,7 +118,9 @@ func (tc *TestSchedulingCluster) WaitForPrimaryServing(re *require.Assertions) * } return false }, testutil.WithWaitFor(10*time.Second), testutil.WithTickInterval(50*time.Millisecond)) - + testutil.Eventually(re, func() bool { + return tc.pd.GetLeaderServer().GetRaftCluster().IsServiceIndependent(constant.SchedulingServiceName) + }) return primary } diff --git a/tests/testutil.go b/tests/testutil.go index 4f2a6beb261..5e99b3dbeda 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -389,7 +389,7 @@ func (s *SchedulingTestEnvironment) startCluster(m SchedulerMode) { re.NoError(leaderServer.BootstrapCluster()) leaderServer.GetRaftCluster().SetPrepared() // start scheduling cluster - tc, err := NewTestSchedulingCluster(ctx, 1, leaderServer.GetAddr()) + tc, err := NewTestSchedulingCluster(ctx, 1, cluster) re.NoError(err) tc.WaitForPrimaryServing(re) tc.GetPrimaryServer().GetCluster().SetPrepared() From 335b245e8f54bebee6f57d92a638bc7443028e88 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 19 Dec 2024 16:18:41 +0800 Subject: [PATCH 2/3] fix test Signed-off-by: lhy1024 --- tools/pd-ctl/tests/config/config_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tools/pd-ctl/tests/config/config_test.go b/tools/pd-ctl/tests/config/config_test.go index cf9e4163457..b6c58fe2bc6 100644 --- a/tools/pd-ctl/tests/config/config_test.go +++ b/tools/pd-ctl/tests/config/config_test.go @@ -740,7 +740,9 @@ func (suite *configTestSuite) checkPlacementRuleBundle(cluster *pdTests.TestClus output, err = tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "placement-rules", "rule-bundle", "get", placement.DefaultGroupID) re.NoError(err) re.NoError(json.Unmarshal(output, &bundle)) - re.Equal(placement.GroupBundle{ID: placement.DefaultGroupID, Index: 0, Override: false, Rules: []*placement.Rule{{GroupID: placement.DefaultGroupID, ID: placement.DefaultRuleID, Role: placement.Voter, Count: 3}}}, bundle) + expect := placement.GroupBundle{ID: placement.DefaultGroupID, Index: 0, Override: false, Rules: []*placement.Rule{{GroupID: placement.DefaultGroupID, ID: placement.DefaultRuleID, Role: placement.Voter, Count: 3}}} + expect.Rules[0].CreateTimestamp = bundle.Rules[0].CreateTimestamp // skip create timestamp in mcs + re.Equal(expect, bundle) f, err := os.CreateTemp("", "pd_tests") re.NoError(err) From 667e8c19e1f798174b1c4b62a199c1b0510f2d4e Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 19 Dec 2024 16:50:08 +0800 Subject: [PATCH 3/3] address comments Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/cluster.go | 3 +-- pkg/mock/mockcluster/mockcluster.go | 2 +- pkg/schedule/placement/rule_manager.go | 18 +++++------------- pkg/schedule/placement/rule_manager_test.go | 6 +++--- pkg/statistics/region_collection_test.go | 6 +++--- server/cluster/cluster.go | 2 +- server/cluster/cluster_test.go | 10 +++++----- server/server.go | 2 +- tests/server/api/operator_test.go | 2 +- 9 files changed, 21 insertions(+), 30 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 28b1bffba95..5c7166fba09 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -113,8 +113,7 @@ func NewCluster( logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) - c.ruleManager.SetInMicroService(true) - err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) + err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel(), true) if err != nil { cancel() return nil, err diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 45d8e35a0bc..62498bff84f 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -205,7 +205,7 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { func (mc *Cluster) initRuleManager() { if mc.RuleManager == nil { mc.RuleManager = placement.NewRuleManager(mc.ctx, mc.GetStorage(), mc, mc.GetSharedConfig()) - mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel) + mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel, false) } } diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index 1a29d48a486..4470ff28424 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -57,10 +57,9 @@ type RuleManager struct { ctx context.Context storage endpoint.RuleStorage syncutil.RWMutex - initialized bool - inMicroService bool - ruleConfig *ruleConfig - ruleList ruleList + initialized bool + ruleConfig *ruleConfig + ruleList ruleList // used for rule validation keyType string @@ -83,7 +82,7 @@ func NewRuleManager(ctx context.Context, storage endpoint.RuleStorage, storeSetI // Initialize loads rules from storage. If Placement Rules feature is never enabled, it creates default rule that is // compatible with previous configuration. -func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolationLevel string) error { +func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolationLevel string, skipLoadRules bool) error { m.Lock() defer m.Unlock() if m.initialized { @@ -91,7 +90,7 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolat } // If RuleManager is initialized in micro service, // it will load from etcd watcher and do not modify rule directly. - if m.inMicroService { + if skipLoadRules { m.ruleList = ruleList{ rangeList: rangelist.List{}, } @@ -842,13 +841,6 @@ func (m *RuleManager) SetKeyType(h string) *RuleManager { return m } -// SetInMicroService set whether rule manager is in micro service. -func (m *RuleManager) SetInMicroService(v bool) { - m.Lock() - defer m.Unlock() - m.inMicroService = v -} - func getStoresByRegion(storeSet StoreSet, region *core.RegionInfo) []*core.StoreInfo { r := make([]*core.StoreInfo, 0, len(region.GetPeers())) for _, peer := range region.GetPeers() { diff --git a/pkg/schedule/placement/rule_manager_test.go b/pkg/schedule/placement/rule_manager_test.go index 5f5f457da13..29d862ddb5e 100644 --- a/pkg/schedule/placement/rule_manager_test.go +++ b/pkg/schedule/placement/rule_manager_test.go @@ -37,7 +37,7 @@ func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *Ru var err error manager := NewRuleManager(context.Background(), store, nil, mockconfig.NewTestOptions()) manager.conf.SetEnableWitness(enableWitness) - err = manager.Initialize(3, []string{"zone", "rack", "host"}, "") + err = manager.Initialize(3, []string{"zone", "rack", "host"}, "", false) re.NoError(err) return store, manager } @@ -160,7 +160,7 @@ func TestSaveLoad(t *testing.T) { } m2 := NewRuleManager(context.Background(), store, nil, nil) - err := m2.Initialize(3, []string{"no", "labels"}, "") + err := m2.Initialize(3, []string{"no", "labels"}, "", false) re.NoError(err) re.Len(m2.GetAllRules(), 3) re.Equal(m2.GetRule(DefaultGroupID, DefaultRuleID).String(), rules[0].String()) @@ -178,7 +178,7 @@ func TestSetAfterGet(t *testing.T) { manager.SetRule(rule) m2 := NewRuleManager(context.Background(), store, nil, nil) - err := m2.Initialize(100, []string{}, "") + err := m2.Initialize(100, []string{}, "", false) re.NoError(err) rule = m2.GetRule(DefaultGroupID, DefaultRuleID) re.Equal(1, rule.Count) diff --git a/pkg/statistics/region_collection_test.go b/pkg/statistics/region_collection_test.go index f97dffa893d..64442423c02 100644 --- a/pkg/statistics/region_collection_test.go +++ b/pkg/statistics/region_collection_test.go @@ -33,7 +33,7 @@ func TestRegionStatistics(t *testing.T) { re := require.New(t) store := storage.NewStorageWithMemoryBackend() manager := placement.NewRuleManager(context.Background(), store, nil, nil) - err := manager.Initialize(3, []string{"zone", "rack", "host"}, "") + err := manager.Initialize(3, []string{"zone", "rack", "host"}, "", false) re.NoError(err) opt := mockconfig.NewTestOptions() opt.SetPlacementRuleEnabled(false) @@ -122,7 +122,7 @@ func TestRegionStatisticsWithPlacementRule(t *testing.T) { re := require.New(t) store := storage.NewStorageWithMemoryBackend() manager := placement.NewRuleManager(context.Background(), store, nil, nil) - err := manager.Initialize(3, []string{"zone", "rack", "host"}, "") + err := manager.Initialize(3, []string{"zone", "rack", "host"}, "", false) re.NoError(err) opt := mockconfig.NewTestOptions() opt.SetPlacementRuleEnabled(true) @@ -276,7 +276,7 @@ func BenchmarkObserve(b *testing.B) { // Setup store := storage.NewStorageWithMemoryBackend() manager := placement.NewRuleManager(context.Background(), store, nil, nil) - manager.Initialize(3, []string{"zone", "rack", "host"}, "") + manager.Initialize(3, []string{"zone", "rack", "host"}, "", false) opt := mockconfig.NewTestOptions() opt.SetPlacementRuleEnabled(false) peers := []*metapb.Peer{ diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 5ecd787956d..699b43e7901 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -307,7 +307,7 @@ func (c *RaftCluster) InitCluster( c.hbstreams = hbstreams c.ruleManager = placement.NewRuleManager(c.ctx, c.storage, c, c.GetOpts()) if c.opt.IsPlacementRulesEnabled() { - err := c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel()) + err := c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel(), false) if err != nil { return err } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 11b5743fa13..2f6d04bbf52 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -253,7 +253,7 @@ func TestSetOfflineStore(t *testing.T) { cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false) if err != nil { panic(err) } @@ -450,7 +450,7 @@ func TestUpStore(t *testing.T) { cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false) if err != nil { panic(err) } @@ -553,7 +553,7 @@ func TestDeleteStoreUpdatesClusterVersion(t *testing.T) { cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false) if err != nil { panic(err) } @@ -1324,7 +1324,7 @@ func TestOfflineAndMerge(t *testing.T) { cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false) if err != nil { panic(err) } @@ -2187,7 +2187,7 @@ func newTestRaftCluster( rc.InitCluster(id, opt, nil, nil) rc.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), rc, opt) if opt.IsPlacementRulesEnabled() { - err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) + err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false) if err != nil { panic(err) } diff --git a/server/server.go b/server/server.go index a2cc32db9dd..3f397da4d0b 100644 --- a/server/server.go +++ b/server/server.go @@ -1047,7 +1047,7 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error { } if cfg.EnablePlacementRules { // initialize rule manager. - if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel); err != nil { + if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel, false); err != nil { return err } } else { diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index fd08a5ed556..54abdf2d236 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -472,7 +472,7 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te svr.GetRaftCluster().GetOpts().GetMaxReplicas(), svr.GetRaftCluster().GetOpts().GetLocationLabels(), svr.GetRaftCluster().GetOpts().GetIsolationLevel(), - ) + false) re.NoError(err) } if len(testCase.rules) > 0 {