Skip to content

Commit

Permalink
mcs: fix rule manager initialize (#8937)
Browse files Browse the repository at this point in the history
close #8935

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Dec 20, 2024
1 parent 7889b67 commit 0e5d49f
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 35 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func NewCluster(
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel())
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel(), true)
if err != nil {
cancel()
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
func (mc *Cluster) initRuleManager() {
if mc.RuleManager == nil {
mc.RuleManager = placement.NewRuleManager(mc.ctx, mc.GetStorage(), mc, mc.GetSharedConfig())
mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel)
mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel, false)
}
}

Expand Down
12 changes: 11 additions & 1 deletion pkg/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/rangelist"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
Expand Down Expand Up @@ -81,12 +82,21 @@ func NewRuleManager(ctx context.Context, storage endpoint.RuleStorage, storeSetI

// Initialize loads rules from storage. If Placement Rules feature is never enabled, it creates default rule that is
// compatible with previous configuration.
func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolationLevel string) error {
func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolationLevel string, skipLoadRules bool) error {
m.Lock()
defer m.Unlock()
if m.initialized {
return nil
}
// If RuleManager is initialized in micro service,
// it will load from etcd watcher and do not modify rule directly.
if skipLoadRules {
m.ruleList = ruleList{
rangeList: rangelist.List{},
}
m.initialized = true
return nil
}

if err := m.loadRules(); err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/placement/rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *Ru
var err error
manager := NewRuleManager(context.Background(), store, nil, mockconfig.NewTestOptions())
manager.conf.SetEnableWitness(enableWitness)
err = manager.Initialize(3, []string{"zone", "rack", "host"}, "")
err = manager.Initialize(3, []string{"zone", "rack", "host"}, "", false)
re.NoError(err)
return store, manager
}
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestSaveLoad(t *testing.T) {
}

m2 := NewRuleManager(context.Background(), store, nil, nil)
err := m2.Initialize(3, []string{"no", "labels"}, "")
err := m2.Initialize(3, []string{"no", "labels"}, "", false)
re.NoError(err)
re.Len(m2.GetAllRules(), 3)
re.Equal(m2.GetRule(DefaultGroupID, DefaultRuleID).String(), rules[0].String())
Expand All @@ -178,7 +178,7 @@ func TestSetAfterGet(t *testing.T) {
manager.SetRule(rule)

m2 := NewRuleManager(context.Background(), store, nil, nil)
err := m2.Initialize(100, []string{}, "")
err := m2.Initialize(100, []string{}, "", false)
re.NoError(err)
rule = m2.GetRule(DefaultGroupID, DefaultRuleID)
re.Equal(1, rule.Count)
Expand Down
6 changes: 3 additions & 3 deletions pkg/statistics/region_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestRegionStatistics(t *testing.T) {
re := require.New(t)
store := storage.NewStorageWithMemoryBackend()
manager := placement.NewRuleManager(context.Background(), store, nil, nil)
err := manager.Initialize(3, []string{"zone", "rack", "host"}, "")
err := manager.Initialize(3, []string{"zone", "rack", "host"}, "", false)
re.NoError(err)
opt := mockconfig.NewTestOptions()
opt.SetPlacementRuleEnabled(false)
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestRegionStatisticsWithPlacementRule(t *testing.T) {
re := require.New(t)
store := storage.NewStorageWithMemoryBackend()
manager := placement.NewRuleManager(context.Background(), store, nil, nil)
err := manager.Initialize(3, []string{"zone", "rack", "host"}, "")
err := manager.Initialize(3, []string{"zone", "rack", "host"}, "", false)
re.NoError(err)
opt := mockconfig.NewTestOptions()
opt.SetPlacementRuleEnabled(true)
Expand Down Expand Up @@ -276,7 +276,7 @@ func BenchmarkObserve(b *testing.B) {
// Setup
store := storage.NewStorageWithMemoryBackend()
manager := placement.NewRuleManager(context.Background(), store, nil, nil)
manager.Initialize(3, []string{"zone", "rack", "host"}, "")
manager.Initialize(3, []string{"zone", "rack", "host"}, "", false)
opt := mockconfig.NewTestOptions()
opt.SetPlacementRuleEnabled(false)
peers := []*metapb.Peer{
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (c *RaftCluster) InitCluster(
c.hbstreams = hbstreams
c.ruleManager = placement.NewRuleManager(c.ctx, c.storage, c, c.GetOpts())
if c.opt.IsPlacementRulesEnabled() {
err := c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel())
err := c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel(), false)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func TestSetOfflineStore(t *testing.T) {
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts())
if opt.IsPlacementRulesEnabled() {
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel())
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -450,7 +450,7 @@ func TestUpStore(t *testing.T) {
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts())
if opt.IsPlacementRulesEnabled() {
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel())
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -553,7 +553,7 @@ func TestDeleteStoreUpdatesClusterVersion(t *testing.T) {
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts())
if opt.IsPlacementRulesEnabled() {
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel())
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -1324,7 +1324,7 @@ func TestOfflineAndMerge(t *testing.T) {
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts())
if opt.IsPlacementRulesEnabled() {
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel())
err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -2187,7 +2187,7 @@ func newTestRaftCluster(
rc.InitCluster(id, opt, nil, nil)
rc.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), rc, opt)
if opt.IsPlacementRulesEnabled() {
err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel())
err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel(), false)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error {
}
if cfg.EnablePlacementRules {
// initialize rule manager.
if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel); err != nil {
if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel, false); err != nil {
return err
}
} else {
Expand Down
60 changes: 59 additions & 1 deletion tests/integrations/mcs/scheduling/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ package scheduling

import (
"context"
"encoding/json"
"fmt"
"sort"
"testing"

"github.com/stretchr/testify/suite"

"github.com/pingcap/failpoint"

"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/testutil"
Expand All @@ -46,6 +50,7 @@ func TestRule(t *testing.T) {

func (suite *ruleTestSuite) SetupSuite() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`))

var err error
suite.ctx, suite.cancel = context.WithCancel(context.Background())
Expand All @@ -63,12 +68,14 @@ func (suite *ruleTestSuite) SetupSuite() {
func (suite *ruleTestSuite) TearDownSuite() {
suite.cancel()
suite.cluster.Destroy()
re := suite.Require()
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))
}

func (suite *ruleTestSuite) TestRuleWatch() {
re := suite.Require()

tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoint)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()

Expand Down Expand Up @@ -205,3 +212,54 @@ func (suite *ruleTestSuite) TestRuleWatch() {
re.Equal(labelRule.Labels, labelRules[1].Labels)
re.Equal(labelRule.RuleType, labelRules[1].RuleType)
}

func (suite *ruleTestSuite) TestSchedulingSwitch() {
re := suite.Require()

tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)

// Add a new rule from "" to ""
url := fmt.Sprintf("%s/pd/api/v1/config/placement-rule", suite.pdLeaderServer.GetAddr())
respBundle := make([]placement.GroupBundle, 0)
testutil.Eventually(re, func() bool {
err = testutil.CheckGetJSON(tests.TestDialClient, url, nil,
testutil.StatusOK(re), testutil.ExtractJSON(re, &respBundle))
re.NoError(err)
return len(respBundle) == 1 && len(respBundle[0].Rules) == 1
})

b2 := placement.GroupBundle{
ID: "pd",
Index: 1,
Rules: []*placement.Rule{
{GroupID: "pd", ID: "rule0", Index: 1, Role: placement.Voter, Count: 3},
},
}
data, err := json.Marshal(b2)
re.NoError(err)

err = testutil.CheckPostJSON(tests.TestDialClient, url+"/pd", data, testutil.StatusOK(re))
re.NoError(err)
testutil.Eventually(re, func() bool {
err = testutil.CheckGetJSON(tests.TestDialClient, url, nil,
testutil.StatusOK(re), testutil.ExtractJSON(re, &respBundle))
re.NoError(err)
return len(respBundle) == 1 && len(respBundle[0].Rules) == 1
})

// Switch another server
oldPrimary := tc.GetPrimaryServer()
oldPrimary.Close()
tc.WaitForPrimaryServing(re)
newPrimary := tc.GetPrimaryServer()
re.NotEqual(oldPrimary.GetAddr(), newPrimary.GetAddr())
testutil.Eventually(re, func() bool {
err = testutil.CheckGetJSON(tests.TestDialClient, url, nil,
testutil.StatusOK(re), testutil.ExtractJSON(re, &respBundle))
re.NoError(err)
return len(respBundle) == 1 && len(respBundle[0].Rules) == 1
})
}
24 changes: 12 additions & 12 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (suite *serverTestSuite) TearDownSuite() {
func (suite *serverTestSuite) TestAllocID() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`))
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand All @@ -110,7 +110,7 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() {
err = pd2.Run()
re.NotEmpty(suite.cluster.WaitLeader())
re.NoError(err)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand Down Expand Up @@ -138,7 +138,7 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() {

func (suite *serverTestSuite) TestPrimaryChange() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand All @@ -164,7 +164,7 @@ func (suite *serverTestSuite) TestPrimaryChange() {

func (suite *serverTestSuite) TestForwardStoreHeartbeat() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand Down Expand Up @@ -225,7 +225,7 @@ func (suite *serverTestSuite) TestSchedulingServiceFallback() {
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})

tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand All @@ -242,7 +242,7 @@ func (suite *serverTestSuite) TestSchedulingServiceFallback() {
testutil.Eventually(re, func() bool {
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
tc1, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc1, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc1.Destroy()
tc1.WaitForPrimaryServing(re)
Expand Down Expand Up @@ -278,7 +278,7 @@ func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() {
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})

tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand All @@ -302,7 +302,7 @@ func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() {

func (suite *serverTestSuite) TestSchedulerSync() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand Down Expand Up @@ -422,7 +422,7 @@ func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller,

func (suite *serverTestSuite) TestForwardRegionHeartbeat() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand Down Expand Up @@ -499,7 +499,7 @@ func (suite *serverTestSuite) TestForwardRegionHeartbeat() {

func (suite *serverTestSuite) TestStoreLimit() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand Down Expand Up @@ -660,7 +660,7 @@ func (suite *multipleServerTestSuite) TestReElectLeader() {
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck"))
}()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand Down Expand Up @@ -692,7 +692,7 @@ func (suite *multipleServerTestSuite) TestReElectLeader() {

func (suite *serverTestSuite) TestOnlineProgress() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand Down
Loading

0 comments on commit 0e5d49f

Please sign in to comment.