From 5ba4db7d0cabd767e0369f1a03d08f85d6351932 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Sun, 29 Sep 2024 15:34:27 +0800 Subject: [PATCH 1/4] pd-ctl: reduce duplicate params in grant hot region scheduler (#8673) close tikv/pd#8672 Signed-off-by: lhy1024 --- pkg/schedule/schedulers/grant_hot_region.go | 18 ++---- pkg/schedule/schedulers/init.go | 4 +- tools/pd-ctl/pdctl/command/scheduler.go | 2 +- .../pd-ctl/tests/scheduler/scheduler_test.go | 58 ++++++++++++++++--- 4 files changed, 59 insertions(+), 23 deletions(-) diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 61bfb82162a..88b9f5c6c93 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -49,17 +49,14 @@ type grantHotRegionSchedulerConfig struct { StoreLeaderID uint64 `json:"store-leader-id"` } -func (conf *grantHotRegionSchedulerConfig) setStore(leaderID uint64, peers []uint64) bool { +func (conf *grantHotRegionSchedulerConfig) setStore(leaderID uint64, peers []uint64) { conf.Lock() defer conf.Unlock() - ret := slice.AnyOf(peers, func(i int) bool { - return leaderID == peers[i] - }) - if ret { - conf.StoreLeaderID = leaderID - conf.StoreIDs = peers + if !slice.Contains(peers, leaderID) { + peers = append(peers, leaderID) } - return ret + conf.StoreLeaderID = leaderID + conf.StoreIDs = peers } func (conf *grantHotRegionSchedulerConfig) getStoreLeaderID() uint64 { @@ -194,10 +191,7 @@ func (handler *grantHotRegionHandler) updateConfig(w http.ResponseWriter, r *htt handler.rd.JSON(w, http.StatusBadRequest, errs.ErrBytesToUint64) return } - if !handler.config.setStore(leaderID, storeIDs) { - handler.rd.JSON(w, http.StatusBadRequest, errs.ErrSchedulerConfig) - return - } + handler.config.setStore(leaderID, storeIDs) if err = handler.config.persist(); err != nil { handler.config.setStoreLeaderID(0) diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index f124182ece3..48b4f1c4239 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -216,9 +216,7 @@ func schedulersRegister() { } storeIDs = append(storeIDs, storeID) } - if !conf.setStore(leaderID, storeIDs) { - return errs.ErrSchedulerConfig - } + conf.setStore(leaderID, storeIDs) return nil } }) diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index d03fdbbed94..b8e05604b16 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -355,7 +355,7 @@ func NewSplitBucketSchedulerCommand() *cobra.Command { // NewGrantHotRegionSchedulerCommand returns a command to add a grant-hot-region-scheduler. func NewGrantHotRegionSchedulerCommand() *cobra.Command { c := &cobra.Command{ - Use: "grant-hot-region-scheduler ", + Use: "grant-hot-region-scheduler ", Short: `add a scheduler to grant hot region to fixed stores. Note: If there is balance-hot-region-scheduler running, please remove it first, otherwise grant-hot-region-scheduler will not work.`, Run: addSchedulerForGrantHotRegionCommandFunc, diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index 332fe79614e..314f9e3a762 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -559,11 +559,11 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust re.Contains(echo, "Success!") } -func (suite *schedulerTestSuite) TestGrantLeaderScheduler() { - suite.env.RunTestBasedOnMode(suite.checkGrantLeaderScheduler) +func (suite *schedulerTestSuite) TestGrantHotRegionScheduler() { + suite.env.RunTestBasedOnMode(suite.checkGrantHotRegionScheduler) } -func (suite *schedulerTestSuite) checkGrantLeaderScheduler(cluster *pdTests.TestCluster) { +func (suite *schedulerTestSuite) checkGrantHotRegionScheduler(cluster *pdTests.TestCluster) { re := suite.Require() pdAddr := cluster.GetConfig().GetClientURL() cmd := ctl.GetRootCmd() @@ -617,7 +617,7 @@ func (suite *schedulerTestSuite) checkGrantLeaderScheduler(cluster *pdTests.Test "evict-slow-store-scheduler": true, }) - checkSchedulerCommand(re, cmd, pdAddr, []string{"-u", pdAddr, "scheduler", "add", "grant-hot-region-scheduler", "1", "1,2,3"}, map[string]bool{ + checkSchedulerCommand(re, cmd, pdAddr, []string{"-u", pdAddr, "scheduler", "add", "grant-hot-region-scheduler", "1", "2,3"}, map[string]bool{ "balance-region-scheduler": true, "balance-leader-scheduler": true, "grant-hot-region-scheduler": true, @@ -631,14 +631,33 @@ func (suite *schedulerTestSuite) checkGrantLeaderScheduler(cluster *pdTests.Test "store-leader-id": float64(1), } mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) - re.Equal(expected3, conf3) + re.True(compareGrantHotRegionSchedulerConfig(expected3, conf3)) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,2,3"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,3"}, nil) re.Contains(echo, "Success!") expected3["store-leader-id"] = float64(2) testutil.Eventually(re, func() bool { mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) - return reflect.DeepEqual(expected3, conf3) + return compareGrantHotRegionSchedulerConfig(expected3, conf3) + }) + + checkSchedulerCommand(re, cmd, pdAddr, []string{"-u", pdAddr, "scheduler", "remove", "grant-hot-region-scheduler"}, map[string]bool{ + "balance-region-scheduler": true, + "balance-leader-scheduler": true, + "evict-slow-store-scheduler": true, + }) + + // use duplicate store id + checkSchedulerCommand(re, cmd, pdAddr, []string{"-u", pdAddr, "scheduler", "add", "grant-hot-region-scheduler", "3", "1,2,3"}, map[string]bool{ + "balance-region-scheduler": true, + "balance-leader-scheduler": true, + "grant-hot-region-scheduler": true, + "evict-slow-store-scheduler": true, + }) + expected3["store-leader-id"] = float64(3) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) + return compareGrantHotRegionSchedulerConfig(expected3, conf3) }) // case 5: remove grant-hot-region-scheduler @@ -980,3 +999,28 @@ func checkSchedulerCommand(re *require.Assertions, cmd *cobra.Command, pdAddr st return true }) } + +func compareGrantHotRegionSchedulerConfig(expect, actual map[string]any) bool { + if expect["store-leader-id"] != actual["store-leader-id"] { + return false + } + expectStoreID := expect["store-id"].([]any) + actualStoreID := actual["store-id"].([]any) + if len(expectStoreID) != len(actualStoreID) { + return false + } + count := map[float64]any{} + for _, id := range expectStoreID { + // check if the store id is duplicated + if _, ok := count[id.(float64)]; ok { + return false + } + count[id.(float64)] = nil + } + for _, id := range actualStoreID { + if _, ok := count[id.(float64)]; !ok { + return false + } + } + return true +} From f8a0d802ae93adb4cebdc2675b356d32a322bea3 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Sun, 29 Sep 2024 17:10:58 +0800 Subject: [PATCH 2/4] *: remove tidb security test files (#8676) ref tikv/pd#6910 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- tests/integrations/client/cert_opt.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integrations/client/cert_opt.sh b/tests/integrations/client/cert_opt.sh index 02f72249db7..b39750ce318 100755 --- a/tests/integrations/client/cert_opt.sh +++ b/tests/integrations/client/cert_opt.sh @@ -49,6 +49,7 @@ function cleanup_certs() { rm -f ca.pem ca-key.pem ca.srl rm -f pd-server.pem pd-server-key.pem pd-server.csr rm -f client.pem client-key.pem client.csr + rm -f tidb-client.pem tidb-client-key.pem tidb-client.csr } if [[ "$1" == "generate" ]]; then From 25dedabf528edcd0e274c955ce26703d5fd252e0 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Sun, 29 Sep 2024 17:41:39 +0800 Subject: [PATCH 3/4] *: reduce rand NewSource (#8675) close tikv/pd#8674 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/schedule/checker/replica_checker.go | 99 ++++++++++--------- pkg/schedule/checker/replica_strategy.go | 7 +- pkg/schedule/checker/rule_checker.go | 12 ++- pkg/schedule/filter/candidates.go | 5 +- pkg/schedule/filter/candidates_test.go | 4 +- pkg/schedule/schedulers/balance_leader.go | 2 +- pkg/schedule/schedulers/balance_region.go | 2 +- pkg/schedule/schedulers/base_scheduler.go | 4 +- pkg/schedule/schedulers/evict_leader.go | 11 ++- pkg/schedule/schedulers/evict_slow_store.go | 2 +- pkg/schedule/schedulers/evict_slow_trend.go | 2 +- pkg/schedule/schedulers/label.go | 2 +- pkg/schedule/schedulers/random_merge.go | 2 +- pkg/schedule/schedulers/shuffle_leader.go | 2 +- pkg/schedule/schedulers/shuffle_region.go | 4 +- .../schedulers/transfer_witness_leader.go | 8 +- plugin/scheduler_example/evict_leader.go | 2 +- 17 files changed, 94 insertions(+), 76 deletions(-) diff --git a/pkg/schedule/checker/replica_checker.go b/pkg/schedule/checker/replica_checker.go index a21e19b3d66..fabee683c92 100644 --- a/pkg/schedule/checker/replica_checker.go +++ b/pkg/schedule/checker/replica_checker.go @@ -16,6 +16,8 @@ package checker import ( "fmt" + "math/rand" + "time" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -45,6 +47,7 @@ type ReplicaChecker struct { cluster sche.CheckerCluster conf config.CheckerConfigProvider pendingProcessedRegions *cache.TTLUint64 + r *rand.Rand } // NewReplicaChecker creates a replica checker. @@ -53,6 +56,7 @@ func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigPro cluster: cluster, conf: conf, pendingProcessedRegions: pendingProcessedRegions, + r: rand.New(rand.NewSource(time.Now().UnixNano())), } } @@ -67,40 +71,40 @@ func (*ReplicaChecker) GetType() types.CheckerSchedulerType { } // Check verifies a region's replicas, creating an operator.Operator if need. -func (r *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator { +func (c *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator { replicaCheckerCounter.Inc() - if r.IsPaused() { + if c.IsPaused() { replicaCheckerPausedCounter.Inc() return nil } - if op := r.checkDownPeer(region); op != nil { + if op := c.checkDownPeer(region); op != nil { replicaCheckerNewOpCounter.Inc() op.SetPriorityLevel(constant.High) return op } - if op := r.checkOfflinePeer(region); op != nil { + if op := c.checkOfflinePeer(region); op != nil { replicaCheckerNewOpCounter.Inc() op.SetPriorityLevel(constant.High) return op } - if op := r.checkMakeUpReplica(region); op != nil { + if op := c.checkMakeUpReplica(region); op != nil { replicaCheckerNewOpCounter.Inc() op.SetPriorityLevel(constant.High) return op } - if op := r.checkRemoveExtraReplica(region); op != nil { + if op := c.checkRemoveExtraReplica(region); op != nil { replicaCheckerNewOpCounter.Inc() return op } - if op := r.checkLocationReplacement(region); op != nil { + if op := c.checkLocationReplacement(region); op != nil { replicaCheckerNewOpCounter.Inc() return op } return nil } -func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsRemoveDownReplicaEnabled() { +func (c *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsRemoveDownReplicaEnabled() { return nil } @@ -110,22 +114,22 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operat continue } storeID := peer.GetStoreId() - store := r.cluster.GetStore(storeID) + store := c.cluster.GetStore(storeID) if store == nil { log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID)) return nil } // Only consider the state of the Store, not `stats.DownSeconds`. - if store.DownTime() < r.conf.GetMaxStoreDownTime() { + if store.DownTime() < c.conf.GetMaxStoreDownTime() { continue } - return r.fixPeer(region, storeID, downStatus) + return c.fixPeer(region, storeID, downStatus) } return nil } -func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsReplaceOfflineReplicaEnabled() { +func (c *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsReplaceOfflineReplicaEnabled() { return nil } @@ -136,7 +140,7 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope for _, peer := range region.GetPeers() { storeID := peer.GetStoreId() - store := r.cluster.GetStore(storeID) + store := c.cluster.GetStore(storeID) if store == nil { log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID)) return nil @@ -145,32 +149,32 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope continue } - return r.fixPeer(region, storeID, offlineStatus) + return c.fixPeer(region, storeID, offlineStatus) } return nil } -func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsMakeUpReplicaEnabled() { +func (c *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsMakeUpReplicaEnabled() { return nil } - if len(region.GetPeers()) >= r.conf.GetMaxReplicas() { + if len(region.GetPeers()) >= c.conf.GetMaxReplicas() { return nil } log.Debug("region has fewer than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers()))) - regionStores := r.cluster.GetRegionStores(region) - target, filterByTempState := r.strategy(region).SelectStoreToAdd(regionStores) + regionStores := c.cluster.GetRegionStores(region) + target, filterByTempState := c.strategy(c.r, region).SelectStoreToAdd(regionStores) if target == 0 { log.Debug("no store to add replica", zap.Uint64("region-id", region.GetID())) replicaCheckerNoTargetStoreCounter.Inc() if filterByTempState { - r.pendingProcessedRegions.Put(region.GetID(), nil) + c.pendingProcessedRegions.Put(region.GetID(), nil) } return nil } newPeer := &metapb.Peer{StoreId: target} - op, err := operator.CreateAddPeerOperator("make-up-replica", r.cluster, region, newPeer, operator.OpReplica) + op, err := operator.CreateAddPeerOperator("make-up-replica", c.cluster, region, newPeer, operator.OpReplica) if err != nil { log.Debug("create make-up-replica operator fail", errs.ZapError(err)) return nil @@ -178,24 +182,24 @@ func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.O return op } -func (r *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsRemoveExtraReplicaEnabled() { +func (c *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsRemoveExtraReplicaEnabled() { return nil } // when add learner peer, the number of peer will exceed max replicas for a while, // just comparing the the number of voters to avoid too many cancel add operator log. - if len(region.GetVoters()) <= r.conf.GetMaxReplicas() { + if len(region.GetVoters()) <= c.conf.GetMaxReplicas() { return nil } log.Debug("region has more than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers()))) - regionStores := r.cluster.GetRegionStores(region) - old := r.strategy(region).SelectStoreToRemove(regionStores) + regionStores := c.cluster.GetRegionStores(region) + old := c.strategy(c.r, region).SelectStoreToRemove(regionStores) if old == 0 { replicaCheckerNoWorstPeerCounter.Inc() - r.pendingProcessedRegions.Put(region.GetID(), nil) + c.pendingProcessedRegions.Put(region.GetID(), nil) return nil } - op, err := operator.CreateRemovePeerOperator("remove-extra-replica", r.cluster, operator.OpReplica, region, old) + op, err := operator.CreateRemovePeerOperator("remove-extra-replica", c.cluster, operator.OpReplica, region, old) if err != nil { replicaCheckerCreateOpFailedCounter.Inc() return nil @@ -203,13 +207,13 @@ func (r *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *opera return op } -func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsLocationReplacementEnabled() { +func (c *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsLocationReplacementEnabled() { return nil } - strategy := r.strategy(region) - regionStores := r.cluster.GetRegionStores(region) + strategy := c.strategy(c.r, region) + regionStores := c.cluster.GetRegionStores(region) oldStore := strategy.SelectStoreToRemove(regionStores) if oldStore == 0 { replicaCheckerAllRightCounter.Inc() @@ -223,7 +227,7 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper } newPeer := &metapb.Peer{StoreId: newStore} - op, err := operator.CreateMovePeerOperator("move-to-better-location", r.cluster, region, operator.OpReplica, oldStore, newPeer) + op, err := operator.CreateMovePeerOperator("move-to-better-location", c.cluster, region, operator.OpReplica, oldStore, newPeer) if err != nil { replicaCheckerCreateOpFailedCounter.Inc() return nil @@ -231,11 +235,11 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper return op } -func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator { +func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator { // Check the number of replicas first. - if len(region.GetVoters()) > r.conf.GetMaxReplicas() { + if len(region.GetVoters()) > c.conf.GetMaxReplicas() { removeExtra := fmt.Sprintf("remove-extra-%s-replica", status) - op, err := operator.CreateRemovePeerOperator(removeExtra, r.cluster, operator.OpReplica, region, storeID) + op, err := operator.CreateRemovePeerOperator(removeExtra, c.cluster, operator.OpReplica, region, storeID) if err != nil { if status == offlineStatus { replicaCheckerRemoveExtraOfflineFailedCounter.Inc() @@ -247,8 +251,8 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status return op } - regionStores := r.cluster.GetRegionStores(region) - target, filterByTempState := r.strategy(region).SelectStoreToFix(regionStores, storeID) + regionStores := c.cluster.GetRegionStores(region) + target, filterByTempState := c.strategy(c.r, region).SelectStoreToFix(regionStores, storeID) if target == 0 { if status == offlineStatus { replicaCheckerNoStoreOfflineCounter.Inc() @@ -257,13 +261,13 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status } log.Debug("no best store to add replica", zap.Uint64("region-id", region.GetID())) if filterByTempState { - r.pendingProcessedRegions.Put(region.GetID(), nil) + c.pendingProcessedRegions.Put(region.GetID(), nil) } return nil } newPeer := &metapb.Peer{StoreId: target} replace := fmt.Sprintf("replace-%s-replica", status) - op, err := operator.CreateMovePeerOperator(replace, r.cluster, region, operator.OpReplica, storeID, newPeer) + op, err := operator.CreateMovePeerOperator(replace, c.cluster, region, operator.OpReplica, storeID, newPeer) if err != nil { if status == offlineStatus { replicaCheckerReplaceOfflineFailedCounter.Inc() @@ -275,12 +279,13 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status return op } -func (r *ReplicaChecker) strategy(region *core.RegionInfo) *ReplicaStrategy { +func (c *ReplicaChecker) strategy(r *rand.Rand, region *core.RegionInfo) *ReplicaStrategy { return &ReplicaStrategy{ - checkerName: r.Name(), - cluster: r.cluster, - locationLabels: r.conf.GetLocationLabels(), - isolationLevel: r.conf.GetIsolationLevel(), + checkerName: c.Name(), + cluster: c.cluster, + locationLabels: c.conf.GetLocationLabels(), + isolationLevel: c.conf.GetIsolationLevel(), region: region, + r: r, } } diff --git a/pkg/schedule/checker/replica_strategy.go b/pkg/schedule/checker/replica_strategy.go index e234189fe96..ffb25f90ad5 100644 --- a/pkg/schedule/checker/replica_strategy.go +++ b/pkg/schedule/checker/replica_strategy.go @@ -15,6 +15,8 @@ package checker import ( + "math/rand" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" @@ -26,6 +28,7 @@ import ( // ReplicaStrategy collects some utilities to manipulate region peers. It // exists to allow replica_checker and rule_checker to reuse common logics. type ReplicaStrategy struct { + r *rand.Rand checkerName string // replica-checker / rule-checker cluster sche.CheckerCluster locationLabels []string @@ -76,7 +79,7 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores) strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowFastFailover: s.fastFailover, OperatorLevel: level} - targetCandidate := filter.NewCandidates(s.cluster.GetStores()). + targetCandidate := filter.NewCandidates(s.r, s.cluster.GetStores()). FilterTarget(s.cluster.GetCheckerConfig(), nil, nil, filters...). KeepTheTopStores(isolationComparer, false) // greater isolation score is better if targetCandidate.Len() == 0 { @@ -143,7 +146,7 @@ func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo if s.fastFailover { level = constant.Urgent } - source := filter.NewCandidates(coLocationStores). + source := filter.NewCandidates(s.r, coLocationStores). FilterSource(s.cluster.GetCheckerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}). KeepTheTopStores(isolationComparer, true). PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetCheckerConfig()), false) diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index 82807441bf8..6b3f50d6d14 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -18,6 +18,7 @@ import ( "context" "errors" "math" + "math/rand" "time" "github.com/pingcap/kvproto/pkg/metapb" @@ -56,6 +57,7 @@ type RuleChecker struct { pendingList cache.Cache switchWitnessCache *cache.TTLUint64 record *recorder + r *rand.Rand } // NewRuleChecker creates a checker instance. @@ -67,6 +69,7 @@ func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManage pendingList: cache.NewDefaultCache(maxPendingListLen), switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()), record: newRecord(), + r: rand.New(rand.NewSource(time.Now().UnixNano())), } } @@ -201,7 +204,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region ruleStores := c.getRuleFitStores(rf) isWitness := rf.Rule.IsWitness && c.isWitnessEnabled() // If the peer to be added is a witness, since no snapshot is needed, we also reuse the fast failover logic. - store, filterByTempState := c.strategy(region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores) + store, filterByTempState := c.strategy(c.r, region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores) if store == 0 { ruleCheckerNoStoreAddCounter.Inc() c.handleFilterState(region, filterByTempState) @@ -252,7 +255,7 @@ func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *pla fastFailover = false } ruleStores := c.getRuleFitStores(rf) - store, filterByTempState := c.strategy(region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId()) + store, filterByTempState := c.strategy(c.r, region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId()) if store == 0 { ruleCheckerNoStoreReplaceCounter.Inc() c.handleFilterState(region, filterByTempState) @@ -393,7 +396,7 @@ func (c *RuleChecker) fixBetterLocation(region *core.RegionInfo, rf *placement.R isWitness := rf.Rule.IsWitness && c.isWitnessEnabled() // If the peer to be moved is a witness, since no snapshot is needed, we also reuse the fast failover logic. - strategy := c.strategy(region, rf.Rule, isWitness) + strategy := c.strategy(c.r, region, rf.Rule, isWitness) ruleStores := c.getRuleFitStores(rf) oldStore := strategy.SelectStoreToRemove(ruleStores) if oldStore == 0 { @@ -618,7 +621,7 @@ func (c *RuleChecker) hasAvailableWitness(region *core.RegionInfo, peer *metapb. return nil, false } -func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy { +func (c *RuleChecker) strategy(r *rand.Rand, region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy { return &ReplicaStrategy{ checkerName: c.Name(), cluster: c.cluster, @@ -627,6 +630,7 @@ func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fa region: region, extraFilters: []filter.Filter{filter.NewLabelConstraintFilter(c.Name(), rule.LabelConstraints)}, fastFailover: fastFailover, + r: r, } } diff --git a/pkg/schedule/filter/candidates.go b/pkg/schedule/filter/candidates.go index 35ca4a2e284..9877bbaa0ac 100644 --- a/pkg/schedule/filter/candidates.go +++ b/pkg/schedule/filter/candidates.go @@ -17,7 +17,6 @@ package filter import ( "math/rand" "sort" - "time" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/config" @@ -32,8 +31,8 @@ type StoreCandidates struct { } // NewCandidates creates StoreCandidates with store list. -func NewCandidates(stores []*core.StoreInfo) *StoreCandidates { - return &StoreCandidates{r: rand.New(rand.NewSource(time.Now().UnixNano())), Stores: stores} +func NewCandidates(r *rand.Rand, stores []*core.StoreInfo) *StoreCandidates { + return &StoreCandidates{r: r, Stores: stores} } // FilterSource keeps stores that can pass all source filters. diff --git a/pkg/schedule/filter/candidates_test.go b/pkg/schedule/filter/candidates_test.go index 0d805312ba7..2e81c747353 100644 --- a/pkg/schedule/filter/candidates_test.go +++ b/pkg/schedule/filter/candidates_test.go @@ -15,7 +15,9 @@ package filter import ( + "math/rand" "testing" + "time" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" @@ -112,7 +114,7 @@ func newTestCandidates(ids ...uint64) *StoreCandidates { for _, id := range ids { stores = append(stores, core.NewStoreInfo(&metapb.Store{Id: id})) } - return NewCandidates(stores) + return NewCandidates(rand.New(rand.NewSource(time.Now().UnixNano())), stores) } func check(re *require.Assertions, candidates *StoreCandidates, ids ...uint64) { diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 7617f8e5b1d..e1412c43b23 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -498,7 +498,7 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { finalFilters = append(l.filters, leaderFilter) } - target := filter.NewCandidates([]*core.StoreInfo{solver.Target}). + target := filter.NewCandidates(l.R, []*core.StoreInfo{solver.Target}). FilterTarget(conf, nil, l.filterCounter, finalFilters...). PickFirst() if target == nil { diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index 31d7a0488bf..5fdfa29d96d 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -215,7 +215,7 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co filter.NewPlacementSafeguard(s.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, solver.fit), } - candidates := filter.NewCandidates(dstStores).FilterTarget(conf, collector, s.filterCounter, filters...) + candidates := filter.NewCandidates(s.R, dstStores).FilterTarget(conf, collector, s.filterCounter, filters...) if len(candidates.Stores) != 0 { solver.Step++ } diff --git a/pkg/schedule/schedulers/base_scheduler.go b/pkg/schedule/schedulers/base_scheduler.go index 26042eaf023..ef5c6bf7ae7 100644 --- a/pkg/schedule/schedulers/base_scheduler.go +++ b/pkg/schedule/schedulers/base_scheduler.go @@ -16,6 +16,7 @@ package schedulers import ( "fmt" + "math/rand" "net/http" "time" @@ -62,6 +63,7 @@ func intervalGrow(x time.Duration, maxInterval time.Duration, typ intervalGrowth // BaseScheduler is a basic scheduler for all other complex scheduler type BaseScheduler struct { OpController *operator.Controller + R *rand.Rand name string tp types.CheckerSchedulerType @@ -74,7 +76,7 @@ func NewBaseScheduler( tp types.CheckerSchedulerType, conf schedulerConfig, ) *BaseScheduler { - return &BaseScheduler{OpController: opController, tp: tp, conf: conf} + return &BaseScheduler{OpController: opController, tp: tp, conf: conf, R: rand.New(rand.NewSource(time.Now().UnixNano()))} } func (*BaseScheduler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index d3c9e1ebff2..eb74ae09a12 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -15,6 +15,7 @@ package schedulers import ( + "math/rand" "net/http" "strconv" @@ -285,7 +286,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) // Schedule implements the Scheduler interface. func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { evictLeaderCounter.Inc() - return scheduleEvictLeaderBatch(s.GetName(), cluster, s.conf), nil + return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf), nil } func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator { @@ -309,11 +310,11 @@ type evictLeaderStoresConf interface { getBatch() int } -func scheduleEvictLeaderBatch(name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { +func scheduleEvictLeaderBatch(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { var ops []*operator.Operator batchSize := conf.getBatch() for i := 0; i < batchSize; i++ { - once := scheduleEvictLeaderOnce(name, cluster, conf) + once := scheduleEvictLeaderOnce(r, name, cluster, conf) // no more regions if len(once) == 0 { break @@ -327,7 +328,7 @@ func scheduleEvictLeaderBatch(name string, cluster sche.SchedulerCluster, conf e return ops } -func scheduleEvictLeaderOnce(name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { +func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { stores := conf.getStores() ops := make([]*operator.Operator, 0, len(stores)) for _, storeID := range stores { @@ -358,7 +359,7 @@ func scheduleEvictLeaderOnce(name string, cluster sche.SchedulerCluster, conf ev } filters = append(filters, &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) - candidates := filter.NewCandidates(cluster.GetFollowerStores(region)). + candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) // Compatible with old TiKV transfer leader logic. target := candidates.RandomPick() diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index 8e50efb90dd..93bd5e48395 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -241,7 +241,7 @@ func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster sche.SchedulerClust } func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator { - return scheduleEvictLeaderBatch(s.GetName(), cluster, s.conf) + return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf) } // IsScheduleAllowed implements the Scheduler interface. diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index 6682b10dd35..6e3caa2e8fe 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -354,7 +354,7 @@ func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.SchedulerClus return nil } storeSlowTrendEvictedStatusGauge.WithLabelValues(store.GetAddress(), strconv.FormatUint(store.GetID(), 10)).Set(1) - return scheduleEvictLeaderBatch(s.GetName(), cluster, s.conf) + return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf) } // IsScheduleAllowed implements the Scheduler interface. diff --git a/pkg/schedule/schedulers/label.go b/pkg/schedule/schedulers/label.go index 5ba3ad962fc..a27ea29687e 100644 --- a/pkg/schedule/schedulers/label.go +++ b/pkg/schedule/schedulers/label.go @@ -91,7 +91,7 @@ func (s *labelScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*ope } f := filter.NewExcludedFilter(s.GetName(), nil, excludeStores) - target := filter.NewCandidates(cluster.GetFollowerStores(region)). + target := filter.NewCandidates(s.R, cluster.GetFollowerStores(region)). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.Medium}, f). RandomPick() if target == nil { diff --git a/pkg/schedule/schedulers/random_merge.go b/pkg/schedule/schedulers/random_merge.go index 50ff6175ca0..7cd9954ce4b 100644 --- a/pkg/schedule/schedulers/random_merge.go +++ b/pkg/schedule/schedulers/random_merge.go @@ -69,7 +69,7 @@ func (s *randomMergeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) func (s *randomMergeScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { randomMergeCounter.Inc() - store := filter.NewCandidates(cluster.GetStores()). + store := filter.NewCandidates(s.R, cluster.GetStores()). FilterSource(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.GetName(), MoveRegion: true, OperatorLevel: constant.Low}). RandomPick() if store == nil { diff --git a/pkg/schedule/schedulers/shuffle_leader.go b/pkg/schedule/schedulers/shuffle_leader.go index 71f44e49fbb..e2a256af7a7 100644 --- a/pkg/schedule/schedulers/shuffle_leader.go +++ b/pkg/schedule/schedulers/shuffle_leader.go @@ -74,7 +74,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) // 1. random select a valid store. // 2. transfer a leader to the store. shuffleLeaderCounter.Inc() - targetStore := filter.NewCandidates(cluster.GetStores()). + targetStore := filter.NewCandidates(s.R, cluster.GetStores()). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, s.filters...). RandomPick() if targetStore == nil { diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index 33eea1d638c..1fbc1f08e67 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -106,7 +106,7 @@ func (s *shuffleRegionScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) } func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster sche.SchedulerCluster) (*core.RegionInfo, *metapb.Peer) { - candidates := filter.NewCandidates(cluster.GetStores()). + candidates := filter.NewCandidates(s.R, cluster.GetStores()). FilterSource(cluster.GetSchedulerConfig(), nil, nil, s.filters...). Shuffle() @@ -146,7 +146,7 @@ func (s *shuffleRegionScheduler) scheduleAddPeer(cluster sche.SchedulerCluster, scoreGuard := filter.NewPlacementSafeguard(s.GetName(), cluster.GetSchedulerConfig(), cluster.GetBasicCluster(), cluster.GetRuleManager(), region, store, nil) excludedFilter := filter.NewExcludedFilter(s.GetName(), nil, region.GetStoreIDs()) - target := filter.NewCandidates(cluster.GetStores()). + target := filter.NewCandidates(s.R, cluster.GetStores()). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, append(s.filters, scoreGuard, excludedFilter)...). RandomPick() if target == nil { diff --git a/pkg/schedule/schedulers/transfer_witness_leader.go b/pkg/schedule/schedulers/transfer_witness_leader.go index 52cd875719c..994559843a7 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader.go +++ b/pkg/schedule/schedulers/transfer_witness_leader.go @@ -15,6 +15,8 @@ package schedulers import ( + "math/rand" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" @@ -67,7 +69,7 @@ batchLoop: for i := 0; i < batchSize; i++ { select { case region := <-s.regions: - op, err := scheduleTransferWitnessLeader(name, cluster, region) + op, err := scheduleTransferWitnessLeader(s.R, name, cluster, region) if err != nil { log.Debug("fail to create transfer leader operator", errs.ZapError(err)) continue @@ -84,7 +86,7 @@ batchLoop: return ops } -func scheduleTransferWitnessLeader(name string, cluster sche.SchedulerCluster, region *core.RegionInfo) (*operator.Operator, error) { +func scheduleTransferWitnessLeader(r *rand.Rand, name string, cluster sche.SchedulerCluster, region *core.RegionInfo) (*operator.Operator, error) { var filters []filter.Filter unhealthyPeerStores := make(map[uint64]struct{}) for _, peer := range region.GetDownPeers() { @@ -95,7 +97,7 @@ func scheduleTransferWitnessLeader(name string, cluster sche.SchedulerCluster, r } filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores), &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) - candidates := filter.NewCandidates(cluster.GetFollowerStores(region)).FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) + candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)).FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) // Compatible with old TiKV transfer leader logic. target := candidates.RandomPick() targets := candidates.PickAll() diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index 901dd94c983..2f55b2d8ecb 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -222,7 +222,7 @@ func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ( if region == nil { continue } - target := filter.NewCandidates(cluster.GetFollowerStores(region)). + target := filter.NewCandidates(s.R, cluster.GetFollowerStores(region)). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true, OperatorLevel: constant.Urgent}). RandomPick() if target == nil { From 26ced22044e45ba2e315fe639c3378880d61a208 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Sun, 29 Sep 2024 18:11:30 +0800 Subject: [PATCH 4/4] *: unify the receiver naming style (#8677) ref tikv/pd#8379 Signed-off-by: Ryan Leung --- pkg/schedule/checker/learner_checker.go | 6 +- pkg/schedule/checker/merge_checker.go | 50 +++---- pkg/schedule/schedulers/balance_leader.go | 116 ++++++++-------- pkg/schedule/schedulers/balance_witness.go | 98 ++++++------- pkg/schedule/schedulers/hot_region.go | 152 ++++++++++----------- pkg/schedule/schedulers/scatter_range.go | 58 ++++---- 6 files changed, 240 insertions(+), 240 deletions(-) diff --git a/pkg/schedule/checker/learner_checker.go b/pkg/schedule/checker/learner_checker.go index f9c4f7efb2b..8590904a760 100644 --- a/pkg/schedule/checker/learner_checker.go +++ b/pkg/schedule/checker/learner_checker.go @@ -36,13 +36,13 @@ func NewLearnerChecker(cluster sche.CheckerCluster) *LearnerChecker { } // Check verifies a region's role, creating an Operator if need. -func (l *LearnerChecker) Check(region *core.RegionInfo) *operator.Operator { - if l.IsPaused() { +func (c *LearnerChecker) Check(region *core.RegionInfo) *operator.Operator { + if c.IsPaused() { learnerCheckerPausedCounter.Inc() return nil } for _, p := range region.GetLearners() { - op, err := operator.CreatePromoteLearnerOperator("promote-learner", l.cluster, region, p) + op, err := operator.CreatePromoteLearnerOperator("promote-learner", c.cluster, region, p) if err != nil { log.Debug("fail to create promote learner operator", errs.ZapError(err)) continue diff --git a/pkg/schedule/checker/merge_checker.go b/pkg/schedule/checker/merge_checker.go index d5a39da83ae..bf7fe4f2496 100644 --- a/pkg/schedule/checker/merge_checker.go +++ b/pkg/schedule/checker/merge_checker.go @@ -76,32 +76,32 @@ func (*MergeChecker) GetType() types.CheckerSchedulerType { // RecordRegionSplit put the recently split region into cache. MergeChecker // will skip check it for a while. -func (m *MergeChecker) RecordRegionSplit(regionIDs []uint64) { +func (c *MergeChecker) RecordRegionSplit(regionIDs []uint64) { for _, regionID := range regionIDs { - m.splitCache.PutWithTTL(regionID, nil, m.conf.GetSplitMergeInterval()) + c.splitCache.PutWithTTL(regionID, nil, c.conf.GetSplitMergeInterval()) } } // Check verifies a region's replicas, creating an Operator if need. -func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { +func (c *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { mergeCheckerCounter.Inc() - if m.IsPaused() { + if c.IsPaused() { mergeCheckerPausedCounter.Inc() return nil } // update the split cache. // It must be called before the following merge checker logic. - m.splitCache.UpdateTTL(m.conf.GetSplitMergeInterval()) + c.splitCache.UpdateTTL(c.conf.GetSplitMergeInterval()) - expireTime := m.startTime.Add(m.conf.GetSplitMergeInterval()) + expireTime := c.startTime.Add(c.conf.GetSplitMergeInterval()) if time.Now().Before(expireTime) { mergeCheckerRecentlyStartCounter.Inc() return nil } - if m.splitCache.Exists(region.GetID()) { + if c.splitCache.Exists(region.GetID()) { mergeCheckerRecentlySplitCounter.Inc() return nil } @@ -113,7 +113,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { } // region is not small enough - if !region.NeedMerge(int64(m.conf.GetMaxMergeRegionSize()), int64(m.conf.GetMaxMergeRegionKeys())) { + if !region.NeedMerge(int64(c.conf.GetMaxMergeRegionSize()), int64(c.conf.GetMaxMergeRegionKeys())) { mergeCheckerNoNeedCounter.Inc() return nil } @@ -124,24 +124,24 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { return nil } - if !filter.IsRegionReplicated(m.cluster, region) { + if !filter.IsRegionReplicated(c.cluster, region) { mergeCheckerAbnormalReplicaCounter.Inc() return nil } // skip hot region - if m.cluster.IsRegionHot(region) { + if c.cluster.IsRegionHot(region) { mergeCheckerHotRegionCounter.Inc() return nil } - prev, next := m.cluster.GetAdjacentRegions(region) + prev, next := c.cluster.GetAdjacentRegions(region) var target *core.RegionInfo - if m.checkTarget(region, next) { + if c.checkTarget(region, next) { target = next } - if !m.conf.IsOneWayMergeEnabled() && m.checkTarget(region, prev) { // allow a region can be merged by two ways. + if !c.conf.IsOneWayMergeEnabled() && c.checkTarget(region, prev) { // allow a region can be merged by two ways. if target == nil || prev.GetApproximateSize() < next.GetApproximateSize() { // pick smaller target = prev } @@ -152,7 +152,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { return nil } - regionMaxSize := m.cluster.GetStoreConfig().GetRegionMaxSize() + regionMaxSize := c.cluster.GetStoreConfig().GetRegionMaxSize() maxTargetRegionSizeThreshold := int64(float64(regionMaxSize) * float64(maxTargetRegionFactor)) if maxTargetRegionSizeThreshold < maxTargetRegionSize { maxTargetRegionSizeThreshold = maxTargetRegionSize @@ -161,14 +161,14 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { mergeCheckerTargetTooLargeCounter.Inc() return nil } - if err := m.cluster.GetStoreConfig().CheckRegionSize(uint64(target.GetApproximateSize()+region.GetApproximateSize()), - m.conf.GetMaxMergeRegionSize()); err != nil { + if err := c.cluster.GetStoreConfig().CheckRegionSize(uint64(target.GetApproximateSize()+region.GetApproximateSize()), + c.conf.GetMaxMergeRegionSize()); err != nil { mergeCheckerSplitSizeAfterMergeCounter.Inc() return nil } - if err := m.cluster.GetStoreConfig().CheckRegionKeys(uint64(target.GetApproximateKeys()+region.GetApproximateKeys()), - m.conf.GetMaxMergeRegionKeys()); err != nil { + if err := c.cluster.GetStoreConfig().CheckRegionKeys(uint64(target.GetApproximateKeys()+region.GetApproximateKeys()), + c.conf.GetMaxMergeRegionKeys()); err != nil { mergeCheckerSplitKeysAfterMergeCounter.Inc() return nil } @@ -176,7 +176,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { log.Debug("try to merge region", logutil.ZapRedactStringer("from", core.RegionToHexMeta(region.GetMeta())), logutil.ZapRedactStringer("to", core.RegionToHexMeta(target.GetMeta()))) - ops, err := operator.CreateMergeRegionOperator("merge-region", m.cluster, region, target, operator.OpMerge) + ops, err := operator.CreateMergeRegionOperator("merge-region", c.cluster, region, target, operator.OpMerge) if err != nil { log.Warn("create merge region operator failed", errs.ZapError(err)) return nil @@ -189,28 +189,28 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { return ops } -func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool { +func (c *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool { if adjacent == nil { mergeCheckerAdjNotExistCounter.Inc() return false } - if m.splitCache.Exists(adjacent.GetID()) { + if c.splitCache.Exists(adjacent.GetID()) { mergeCheckerAdjRecentlySplitCounter.Inc() return false } - if m.cluster.IsRegionHot(adjacent) { + if c.cluster.IsRegionHot(adjacent) { mergeCheckerAdjRegionHotCounter.Inc() return false } - if !AllowMerge(m.cluster, region, adjacent) { + if !AllowMerge(c.cluster, region, adjacent) { mergeCheckerAdjDisallowMergeCounter.Inc() return false } - if !checkPeerStore(m.cluster, region, adjacent) { + if !checkPeerStore(c.cluster, region, adjacent) { mergeCheckerAdjAbnormalPeerStoreCounter.Inc() return false } @@ -220,7 +220,7 @@ func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool { return false } - if !filter.IsRegionReplicated(m.cluster, adjacent) { + if !filter.IsRegionReplicated(c.cluster, adjacent) { mergeCheckerAdjAbnormalReplicaCounter.Inc() return false } diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index e1412c43b23..44605f9c5b8 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -184,8 +184,8 @@ func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceL } // ServeHTTP implements the http.Handler interface. -func (l *balanceLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - l.handler.ServeHTTP(w, r) +func (s *balanceLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) } // BalanceLeaderCreateOption is used to create a scheduler with an option. @@ -199,31 +199,31 @@ func WithBalanceLeaderName(name string) BalanceLeaderCreateOption { } // EncodeConfig implements the Scheduler interface. -func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) { - l.conf.RLock() - defer l.conf.RUnlock() - return EncodeConfig(l.conf) +func (s *balanceLeaderScheduler) EncodeConfig() ([]byte, error) { + s.conf.RLock() + defer s.conf.RUnlock() + return EncodeConfig(s.conf) } // ReloadConfig implements the Scheduler interface. -func (l *balanceLeaderScheduler) ReloadConfig() error { - l.conf.Lock() - defer l.conf.Unlock() +func (s *balanceLeaderScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() newCfg := &balanceLeaderSchedulerConfig{} - if err := l.conf.load(newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } - l.conf.Ranges = newCfg.Ranges - l.conf.Batch = newCfg.Batch + s.conf.Ranges = newCfg.Ranges + s.conf.Batch = newCfg.Batch return nil } // IsScheduleAllowed implements the Scheduler interface. -func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { - allowed := l.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() +func (s *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() if !allowed { - operator.IncOperatorLimitCounter(l.GetType(), operator.OpLeader) + operator.IncOperatorLimitCounter(s.GetType(), operator.OpLeader) } return allowed } @@ -321,18 +321,18 @@ func (cs *candidateStores) resortStoreWithPos(pos int) { } // Schedule implements the Scheduler interface. -func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector if dryRun { collector = plan.NewCollector(basePlan) } - defer l.filterCounter.Flush() - batch := l.conf.getBatch() + defer s.filterCounter.Flush() + batch := s.conf.getBatch() balanceLeaderScheduleCounter.Inc() leaderSchedulePolicy := cluster.GetSchedulerConfig().GetLeaderSchedulePolicy() - opInfluence := l.OpController.GetOpInfluence(cluster.GetBasicCluster()) + opInfluence := s.OpController.GetOpInfluence(cluster.GetBasicCluster()) kind := constant.NewScheduleKind(constant.LeaderKind, leaderSchedulePolicy) solver := newSolver(basePlan, kind, cluster, opInfluence) @@ -340,15 +340,15 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun scoreFunc := func(store *core.StoreInfo) float64 { return store.LeaderScore(solver.kind.Policy, solver.getOpInfluence(store.GetID())) } - sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, l.filters, cluster.GetSchedulerConfig(), collector, l.filterCounter), false, scoreFunc) - targetCandidate := newCandidateStores(filter.SelectTargetStores(stores, l.filters, cluster.GetSchedulerConfig(), nil, l.filterCounter), true, scoreFunc) + sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, s.filters, cluster.GetSchedulerConfig(), collector, s.filterCounter), false, scoreFunc) + targetCandidate := newCandidateStores(filter.SelectTargetStores(stores, s.filters, cluster.GetSchedulerConfig(), nil, s.filterCounter), true, scoreFunc) usedRegions := make(map[uint64]struct{}) result := make([]*operator.Operator, 0, batch) for sourceCandidate.hasStore() || targetCandidate.hasStore() { // first choose source if sourceCandidate.hasStore() { - op := createTransferLeaderOperator(sourceCandidate, transferOut, l, solver, usedRegions, collector) + op := createTransferLeaderOperator(sourceCandidate, transferOut, s, solver, usedRegions, collector) if op != nil { result = append(result, op) if len(result) >= batch { @@ -359,7 +359,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun } // next choose target if targetCandidate.hasStore() { - op := createTransferLeaderOperator(targetCandidate, transferIn, l, solver, usedRegions, nil) + op := createTransferLeaderOperator(targetCandidate, transferIn, s, solver, usedRegions, nil) if op != nil { result = append(result, op) if len(result) >= batch { @@ -369,24 +369,24 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun } } } - l.retryQuota.gc(append(sourceCandidate.stores, targetCandidate.stores...)) + s.retryQuota.gc(append(sourceCandidate.stores, targetCandidate.stores...)) return result, collector.GetPlans() } -func createTransferLeaderOperator(cs *candidateStores, dir string, l *balanceLeaderScheduler, +func createTransferLeaderOperator(cs *candidateStores, dir string, s *balanceLeaderScheduler, ssolver *solver, usedRegions map[uint64]struct{}, collector *plan.Collector) *operator.Operator { store := cs.getStore() ssolver.Step++ defer func() { ssolver.Step-- }() - retryLimit := l.retryQuota.getLimit(store) + retryLimit := s.retryQuota.getLimit(store) var creator func(*solver, *plan.Collector) *operator.Operator switch dir { case transferOut: ssolver.Source, ssolver.Target = store, nil - creator = l.transferLeaderOut + creator = s.transferLeaderOut case transferIn: ssolver.Source, ssolver.Target = nil, store - creator = l.transferLeaderIn + creator = s.transferLeaderIn } var op *operator.Operator for i := 0; i < retryLimit; i++ { @@ -398,10 +398,10 @@ func createTransferLeaderOperator(cs *candidateStores, dir string, l *balanceLea } } if op != nil { - l.retryQuota.resetLimit(store) + s.retryQuota.resetLimit(store) } else { - l.attenuate(store) - log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64(dir, store.GetID())) + s.attenuate(store) + log.Debug("no operator created for selected stores", zap.String("scheduler", s.GetName()), zap.Uint64(dir, store.GetID())) cs.next() } return op @@ -425,16 +425,16 @@ func makeInfluence(op *operator.Operator, plan *solver, usedRegions map[uint64]s // transferLeaderOut transfers leader from the source store. // It randomly selects a health region from the source store, then picks // the best follower peer and transfers the leader. -func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *plan.Collector) *operator.Operator { - solver.Region = filter.SelectOneRegion(solver.RandLeaderRegions(solver.sourceStoreID(), l.conf.getRanges()), +func (s *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *plan.Collector) *operator.Operator { + solver.Region = filter.SelectOneRegion(solver.RandLeaderRegions(solver.sourceStoreID(), s.conf.getRanges()), collector, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { - log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.sourceStoreID())) + log.Debug("store has no leader", zap.String("scheduler", s.GetName()), zap.Uint64("store-id", solver.sourceStoreID())) balanceLeaderNoLeaderRegionCounter.Inc() return nil } if solver.IsRegionHot(solver.Region) { - log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.Region.GetID())) + log.Debug("region is hot region, ignore it", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID())) if collector != nil { collector.Collect(plan.SetResource(solver.Region), plan.SetStatus(plan.NewStatus(plan.StatusRegionHot))) } @@ -444,12 +444,12 @@ func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl solver.Step++ defer func() { solver.Step-- }() targets := solver.GetFollowerStores(solver.Region) - finalFilters := l.filters + finalFilters := s.filters conf := solver.GetSchedulerConfig() - if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { - finalFilters = append(l.filters, leaderFilter) + if leaderFilter := filter.NewPlacementLeaderSafeguard(s.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { + finalFilters = append(s.filters, leaderFilter) } - targets = filter.SelectTargetStores(targets, finalFilters, conf, collector, l.filterCounter) + targets = filter.SelectTargetStores(targets, finalFilters, conf, collector, s.filterCounter) leaderSchedulePolicy := conf.GetLeaderSchedulePolicy() sort.Slice(targets, func(i, j int) bool { iOp := solver.getOpInfluence(targets[i].GetID()) @@ -457,11 +457,11 @@ func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl return targets[i].LeaderScore(leaderSchedulePolicy, iOp) < targets[j].LeaderScore(leaderSchedulePolicy, jOp) }) for _, solver.Target = range targets { - if op := l.createOperator(solver, collector); op != nil { + if op := s.createOperator(solver, collector); op != nil { return op } } - log.Debug("region has no target store", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.Region.GetID())) + log.Debug("region has no target store", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID())) balanceLeaderNoTargetStoreCounter.Inc() return nil } @@ -469,16 +469,16 @@ func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl // transferLeaderIn transfers leader to the target store. // It randomly selects a health region from the target store, then picks // the worst follower peer and transfers the leader. -func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *plan.Collector) *operator.Operator { - solver.Region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.targetStoreID(), l.conf.getRanges()), +func (s *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *plan.Collector) *operator.Operator { + solver.Region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.targetStoreID(), s.conf.getRanges()), nil, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { - log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.targetStoreID())) + log.Debug("store has no follower", zap.String("scheduler", s.GetName()), zap.Uint64("store-id", solver.targetStoreID())) balanceLeaderNoFollowerRegionCounter.Inc() return nil } if solver.IsRegionHot(solver.Region) { - log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.Region.GetID())) + log.Debug("region is hot region, ignore it", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID())) balanceLeaderRegionHotCounter.Inc() return nil } @@ -486,38 +486,38 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla solver.Source = solver.GetStore(leaderStoreID) if solver.Source == nil { log.Debug("region has no leader or leader store cannot be found", - zap.String("scheduler", l.GetName()), + zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID()), zap.Uint64("store-id", leaderStoreID), ) balanceLeaderNoLeaderRegionCounter.Inc() return nil } - finalFilters := l.filters + finalFilters := s.filters conf := solver.GetSchedulerConfig() - if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { - finalFilters = append(l.filters, leaderFilter) + if leaderFilter := filter.NewPlacementLeaderSafeguard(s.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { + finalFilters = append(s.filters, leaderFilter) } - target := filter.NewCandidates(l.R, []*core.StoreInfo{solver.Target}). - FilterTarget(conf, nil, l.filterCounter, finalFilters...). + target := filter.NewCandidates(s.R, []*core.StoreInfo{solver.Target}). + FilterTarget(conf, nil, s.filterCounter, finalFilters...). PickFirst() if target == nil { - log.Debug("region has no target store", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.Region.GetID())) + log.Debug("region has no target store", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID())) balanceLeaderNoTargetStoreCounter.Inc() return nil } - return l.createOperator(solver, collector) + return s.createOperator(solver, collector) } // createOperator creates the operator according to the source and target store. // If the region is hot or the difference between the two stores is tolerable, then // no new operator need to be created, otherwise create an operator that transfers // the leader from the source store to the target store for the region. -func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator { +func (s *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator { solver.Step++ defer func() { solver.Step-- }() - solver.sourceScore, solver.targetScore = solver.sourceStoreScore(l.GetName()), solver.targetStoreScore(l.GetName()) - if !solver.shouldBalance(l.GetName()) { + solver.sourceScore, solver.targetScore = solver.sourceStoreScore(s.GetName()), solver.targetStoreScore(s.GetName()) + if !solver.shouldBalance(s.GetName()) { balanceLeaderSkipCounter.Inc() if collector != nil { collector.Collect(plan.SetStatus(plan.NewStatus(plan.StatusStoreScoreDisallowed))) @@ -526,7 +526,7 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan. } solver.Step++ defer func() { solver.Step-- }() - op, err := operator.CreateTransferLeaderOperator(l.GetName(), solver, solver.Region, solver.targetStoreID(), []uint64{}, operator.OpLeader) + op, err := operator.CreateTransferLeaderOperator(s.GetName(), solver, solver.Region, solver.targetStoreID(), []uint64{}, operator.OpLeader) if err != nil { log.Debug("fail to create balance leader operator", errs.ZapError(err)) if collector != nil { @@ -538,7 +538,7 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan. balanceLeaderNewOpCounter, ) op.FinishedCounters = append(op.FinishedCounters, - balanceDirectionCounter.WithLabelValues(l.GetName(), solver.sourceMetricLabel(), solver.targetMetricLabel()), + balanceDirectionCounter.WithLabelValues(s.GetName(), solver.sourceMetricLabel(), solver.targetMetricLabel()), ) op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)) op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64)) diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index 6953c7f7634..1fedb2769ee 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -180,8 +180,8 @@ func newBalanceWitnessScheduler(opController *operator.Controller, conf *balance } // ServeHTTP implements the http.Handler interface. -func (b *balanceWitnessScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - b.handler.ServeHTTP(w, r) +func (s *balanceWitnessScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) } // BalanceWitnessCreateOption is used to create a scheduler with an option. @@ -195,46 +195,46 @@ func WithBalanceWitnessCounter(counter *prometheus.CounterVec) BalanceWitnessCre } // EncodeConfig implements the Scheduler interface. -func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) { - b.conf.RLock() - defer b.conf.RUnlock() - return EncodeConfig(b.conf) +func (s *balanceWitnessScheduler) EncodeConfig() ([]byte, error) { + s.conf.RLock() + defer s.conf.RUnlock() + return EncodeConfig(s.conf) } // ReloadConfig implements the Scheduler interface. -func (b *balanceWitnessScheduler) ReloadConfig() error { - b.conf.Lock() - defer b.conf.Unlock() +func (s *balanceWitnessScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() newCfg := &balanceWitnessSchedulerConfig{} - if err := b.conf.load(newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } - b.conf.Ranges = newCfg.Ranges - b.conf.Batch = newCfg.Batch + s.conf.Ranges = newCfg.Ranges + s.conf.Batch = newCfg.Batch return nil } // IsScheduleAllowed implements the Scheduler interface. -func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { - allowed := b.OpController.OperatorCount(operator.OpWitness) < cluster.GetSchedulerConfig().GetWitnessScheduleLimit() +func (s *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + allowed := s.OpController.OperatorCount(operator.OpWitness) < cluster.GetSchedulerConfig().GetWitnessScheduleLimit() if !allowed { - operator.IncOperatorLimitCounter(b.GetType(), operator.OpWitness) + operator.IncOperatorLimitCounter(s.GetType(), operator.OpWitness) } return allowed } // Schedule implements the Scheduler interface. -func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector if dryRun { collector = plan.NewCollector(basePlan) } - batch := b.conf.getBatch() - schedulerCounter.WithLabelValues(b.GetName(), "schedule").Inc() + batch := s.conf.getBatch() + schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() - opInfluence := b.OpController.GetOpInfluence(cluster.GetBasicCluster()) + opInfluence := s.OpController.GetOpInfluence(cluster.GetBasicCluster()) kind := constant.NewScheduleKind(constant.WitnessKind, constant.ByCount) solver := newSolver(basePlan, kind, cluster, opInfluence) @@ -242,12 +242,12 @@ func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun scoreFunc := func(store *core.StoreInfo) float64 { return store.WitnessScore(solver.getOpInfluence(store.GetID())) } - sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, b.filters, cluster.GetSchedulerConfig(), collector, b.filterCounter), false, scoreFunc) + sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, s.filters, cluster.GetSchedulerConfig(), collector, s.filterCounter), false, scoreFunc) usedRegions := make(map[uint64]struct{}) result := make([]*operator.Operator, 0, batch) if sourceCandidate.hasStore() { - op := createTransferWitnessOperator(sourceCandidate, b, solver, usedRegions, collector) + op := createTransferWitnessOperator(sourceCandidate, s, solver, usedRegions, collector) if op != nil { result = append(result, op) if len(result) >= batch { @@ -256,21 +256,21 @@ func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun makeInfluence(op, solver, usedRegions, sourceCandidate) } } - b.retryQuota.gc(sourceCandidate.stores) + s.retryQuota.gc(sourceCandidate.stores) return result, collector.GetPlans() } -func createTransferWitnessOperator(cs *candidateStores, b *balanceWitnessScheduler, +func createTransferWitnessOperator(cs *candidateStores, s *balanceWitnessScheduler, ssolver *solver, usedRegions map[uint64]struct{}, collector *plan.Collector) *operator.Operator { store := cs.getStore() ssolver.Step++ defer func() { ssolver.Step-- }() - retryLimit := b.retryQuota.getLimit(store) + retryLimit := s.retryQuota.getLimit(store) ssolver.Source, ssolver.Target = store, nil var op *operator.Operator for i := 0; i < retryLimit; i++ { - schedulerCounter.WithLabelValues(b.GetName(), "total").Inc() - if op = b.transferWitnessOut(ssolver, collector); op != nil { + schedulerCounter.WithLabelValues(s.GetName(), "total").Inc() + if op = s.transferWitnessOut(ssolver, collector); op != nil { if _, ok := usedRegions[op.RegionID()]; !ok { break } @@ -278,10 +278,10 @@ func createTransferWitnessOperator(cs *candidateStores, b *balanceWitnessSchedul } } if op != nil { - b.retryQuota.resetLimit(store) + s.retryQuota.resetLimit(store) } else { - b.attenuate(store) - log.Debug("no operator created for selected stores", zap.String("scheduler", b.GetName()), zap.Uint64("transfer-out", store.GetID())) + s.attenuate(store) + log.Debug("no operator created for selected stores", zap.String("scheduler", s.GetName()), zap.Uint64("transfer-out", store.GetID())) cs.next() } return op @@ -290,35 +290,35 @@ func createTransferWitnessOperator(cs *candidateStores, b *balanceWitnessSchedul // transferWitnessOut transfers witness from the source store. // It randomly selects a health region from the source store, then picks // the best follower peer and transfers the witness. -func (b *balanceWitnessScheduler) transferWitnessOut(solver *solver, collector *plan.Collector) *operator.Operator { - solver.Region = filter.SelectOneRegion(solver.RandWitnessRegions(solver.sourceStoreID(), b.conf.getRanges()), +func (s *balanceWitnessScheduler) transferWitnessOut(solver *solver, collector *plan.Collector) *operator.Operator { + solver.Region = filter.SelectOneRegion(solver.RandWitnessRegions(solver.sourceStoreID(), s.conf.getRanges()), collector, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { - log.Debug("store has no witness", zap.String("scheduler", b.GetName()), zap.Uint64("store-id", solver.sourceStoreID())) - schedulerCounter.WithLabelValues(b.GetName(), "no-witness-region").Inc() + log.Debug("store has no witness", zap.String("scheduler", s.GetName()), zap.Uint64("store-id", solver.sourceStoreID())) + schedulerCounter.WithLabelValues(s.GetName(), "no-witness-region").Inc() return nil } solver.Step++ defer func() { solver.Step-- }() targets := solver.GetNonWitnessVoterStores(solver.Region) - finalFilters := b.filters + finalFilters := s.filters conf := solver.GetSchedulerConfig() - if witnessFilter := filter.NewPlacementWitnessSafeguard(b.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, solver.fit); witnessFilter != nil { - finalFilters = append(b.filters, witnessFilter) + if witnessFilter := filter.NewPlacementWitnessSafeguard(s.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, solver.fit); witnessFilter != nil { + finalFilters = append(s.filters, witnessFilter) } - targets = filter.SelectTargetStores(targets, finalFilters, conf, collector, b.filterCounter) + targets = filter.SelectTargetStores(targets, finalFilters, conf, collector, s.filterCounter) sort.Slice(targets, func(i, j int) bool { iOp := solver.getOpInfluence(targets[i].GetID()) jOp := solver.getOpInfluence(targets[j].GetID()) return targets[i].WitnessScore(iOp) < targets[j].WitnessScore(jOp) }) for _, solver.Target = range targets { - if op := b.createOperator(solver, collector); op != nil { + if op := s.createOperator(solver, collector); op != nil { return op } } - log.Debug("region has no target store", zap.String("scheduler", b.GetName()), zap.Uint64("region-id", solver.Region.GetID())) - schedulerCounter.WithLabelValues(b.GetName(), "no-target-store").Inc() + log.Debug("region has no target store", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID())) + schedulerCounter.WithLabelValues(s.GetName(), "no-target-store").Inc() return nil } @@ -326,12 +326,12 @@ func (b *balanceWitnessScheduler) transferWitnessOut(solver *solver, collector * // If the region is hot or the difference between the two stores is tolerable, then // no new operator need to be created, otherwise create an operator that transfers // the witness from the source store to the target store for the region. -func (b *balanceWitnessScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator { +func (s *balanceWitnessScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator { solver.Step++ defer func() { solver.Step-- }() - solver.sourceScore, solver.targetScore = solver.sourceStoreScore(b.GetName()), solver.targetStoreScore(b.GetName()) - if !solver.shouldBalance(b.GetName()) { - schedulerCounter.WithLabelValues(b.GetName(), "skip").Inc() + solver.sourceScore, solver.targetScore = solver.sourceStoreScore(s.GetName()), solver.targetStoreScore(s.GetName()) + if !solver.shouldBalance(s.GetName()) { + schedulerCounter.WithLabelValues(s.GetName(), "skip").Inc() if collector != nil { collector.Collect(plan.SetStatus(plan.NewStatus(plan.StatusStoreScoreDisallowed))) } @@ -339,18 +339,18 @@ func (b *balanceWitnessScheduler) createOperator(solver *solver, collector *plan } solver.Step++ defer func() { solver.Step-- }() - op, err := operator.CreateMoveWitnessOperator(b.GetName(), solver, solver.Region, solver.sourceStoreID(), solver.targetStoreID()) + op, err := operator.CreateMoveWitnessOperator(s.GetName(), solver, solver.Region, solver.sourceStoreID(), solver.targetStoreID()) if err != nil { log.Debug("fail to create balance witness operator", errs.ZapError(err)) return nil } op.Counters = append(op.Counters, - schedulerCounter.WithLabelValues(b.GetName(), "new-operator"), + schedulerCounter.WithLabelValues(s.GetName(), "new-operator"), ) op.FinishedCounters = append(op.FinishedCounters, - balanceDirectionCounter.WithLabelValues(b.GetName(), solver.sourceMetricLabel(), solver.targetMetricLabel()), - b.counter.WithLabelValues("move-witness", solver.sourceMetricLabel()+"-out"), - b.counter.WithLabelValues("move-witness", solver.targetMetricLabel()+"-in"), + balanceDirectionCounter.WithLabelValues(s.GetName(), solver.sourceMetricLabel(), solver.targetMetricLabel()), + s.counter.WithLabelValues("move-witness", solver.sourceMetricLabel()+"-out"), + s.counter.WithLabelValues("move-witness", solver.targetMetricLabel()+"-in"), ) op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)) op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64)) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index e9e369b68d4..eedbcfe4625 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -109,18 +109,18 @@ func newBaseHotScheduler( // prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for // each store, only update read or write load detail -func (h *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.SchedulerCluster) { +func (s *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.SchedulerCluster) { storeInfos := statistics.SummaryStoreInfos(cluster.GetStores()) - h.summaryPendingInfluence(storeInfos) + s.summaryPendingInfluence(storeInfos) storesLoads := cluster.GetStoresLoads() isTraceRegionFlow := cluster.GetSchedulerConfig().IsTraceRegionFlow() prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, rw utils.RWType, resource constant.ResourceKind) { ty := buildResourceType(rw, resource) - h.stLoadInfos[ty] = statistics.SummaryStoresLoad( + s.stLoadInfos[ty] = statistics.SummaryStoresLoad( storeInfos, storesLoads, - h.stHistoryLoads, + s.stHistoryLoads, regionStats, isTraceRegionFlow, rw, resource) @@ -129,35 +129,35 @@ func (h *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.Sche case readLeader, readPeer: // update read statistics // avoid to update read statistics frequently - if time.Since(h.updateReadTime) >= statisticsInterval { + if time.Since(s.updateReadTime) >= statisticsInterval { regionRead := cluster.RegionReadStats() prepare(regionRead, utils.Read, constant.LeaderKind) prepare(regionRead, utils.Read, constant.RegionKind) - h.updateReadTime = time.Now() + s.updateReadTime = time.Now() } case writeLeader, writePeer: // update write statistics // avoid to update write statistics frequently - if time.Since(h.updateWriteTime) >= statisticsInterval { + if time.Since(s.updateWriteTime) >= statisticsInterval { regionWrite := cluster.RegionWriteStats() prepare(regionWrite, utils.Write, constant.LeaderKind) prepare(regionWrite, utils.Write, constant.RegionKind) - h.updateWriteTime = time.Now() + s.updateWriteTime = time.Now() } default: log.Error("invalid resource type", zap.String("type", typ.String())) } } -func (h *baseHotScheduler) updateHistoryLoadConfig(sampleDuration, sampleInterval time.Duration) { - h.stHistoryLoads = h.stHistoryLoads.UpdateConfig(sampleDuration, sampleInterval) +func (s *baseHotScheduler) updateHistoryLoadConfig(sampleDuration, sampleInterval time.Duration) { + s.stHistoryLoads = s.stHistoryLoads.UpdateConfig(sampleDuration, sampleInterval) } // summaryPendingInfluence calculate the summary of pending Influence for each store // and clean the region from regionInfluence if they have ended operator. // It makes each dim rate or count become `weight` times to the origin value. -func (h *baseHotScheduler) summaryPendingInfluence(storeInfos map[uint64]*statistics.StoreSummaryInfo) { - for id, p := range h.regionPendings { +func (s *baseHotScheduler) summaryPendingInfluence(storeInfos map[uint64]*statistics.StoreSummaryInfo) { + for id, p := range s.regionPendings { for _, from := range p.froms { from := storeInfos[from] to := storeInfos[p.to] @@ -165,7 +165,7 @@ func (h *baseHotScheduler) summaryPendingInfluence(storeInfos map[uint64]*statis weight, needGC := calcPendingInfluence(p.op, maxZombieDur) if needGC { - delete(h.regionPendings, id) + delete(s.regionPendings, id) continue } @@ -188,8 +188,8 @@ func (h *baseHotScheduler) summaryPendingInfluence(storeInfos map[uint64]*statis } } -func (h *baseHotScheduler) randomType() resourceType { - return h.types[h.r.Int()%len(h.types)] +func (s *baseHotScheduler) randomType() resourceType { + return s.types[s.r.Int()%len(s.types)] } type hotScheduler struct { @@ -214,48 +214,48 @@ func newHotScheduler(opController *operator.Controller, conf *hotRegionScheduler } // EncodeConfig implements the Scheduler interface. -func (h *hotScheduler) EncodeConfig() ([]byte, error) { - return h.conf.encodeConfig() +func (s *hotScheduler) EncodeConfig() ([]byte, error) { + return s.conf.encodeConfig() } // ReloadConfig impl -func (h *hotScheduler) ReloadConfig() error { - h.conf.Lock() - defer h.conf.Unlock() +func (s *hotScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() newCfg := &hotRegionSchedulerConfig{} - if err := h.conf.load(newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } - h.conf.MinHotByteRate = newCfg.MinHotByteRate - h.conf.MinHotKeyRate = newCfg.MinHotKeyRate - h.conf.MinHotQueryRate = newCfg.MinHotQueryRate - h.conf.MaxZombieRounds = newCfg.MaxZombieRounds - h.conf.MaxPeerNum = newCfg.MaxPeerNum - h.conf.ByteRateRankStepRatio = newCfg.ByteRateRankStepRatio - h.conf.KeyRateRankStepRatio = newCfg.KeyRateRankStepRatio - h.conf.QueryRateRankStepRatio = newCfg.QueryRateRankStepRatio - h.conf.CountRankStepRatio = newCfg.CountRankStepRatio - h.conf.GreatDecRatio = newCfg.GreatDecRatio - h.conf.MinorDecRatio = newCfg.MinorDecRatio - h.conf.SrcToleranceRatio = newCfg.SrcToleranceRatio - h.conf.DstToleranceRatio = newCfg.DstToleranceRatio - h.conf.WriteLeaderPriorities = newCfg.WriteLeaderPriorities - h.conf.WritePeerPriorities = newCfg.WritePeerPriorities - h.conf.ReadPriorities = newCfg.ReadPriorities - h.conf.StrictPickingStore = newCfg.StrictPickingStore - h.conf.EnableForTiFlash = newCfg.EnableForTiFlash - h.conf.RankFormulaVersion = newCfg.RankFormulaVersion - h.conf.ForbidRWType = newCfg.ForbidRWType - h.conf.SplitThresholds = newCfg.SplitThresholds - h.conf.HistorySampleDuration = newCfg.HistorySampleDuration - h.conf.HistorySampleInterval = newCfg.HistorySampleInterval + s.conf.MinHotByteRate = newCfg.MinHotByteRate + s.conf.MinHotKeyRate = newCfg.MinHotKeyRate + s.conf.MinHotQueryRate = newCfg.MinHotQueryRate + s.conf.MaxZombieRounds = newCfg.MaxZombieRounds + s.conf.MaxPeerNum = newCfg.MaxPeerNum + s.conf.ByteRateRankStepRatio = newCfg.ByteRateRankStepRatio + s.conf.KeyRateRankStepRatio = newCfg.KeyRateRankStepRatio + s.conf.QueryRateRankStepRatio = newCfg.QueryRateRankStepRatio + s.conf.CountRankStepRatio = newCfg.CountRankStepRatio + s.conf.GreatDecRatio = newCfg.GreatDecRatio + s.conf.MinorDecRatio = newCfg.MinorDecRatio + s.conf.SrcToleranceRatio = newCfg.SrcToleranceRatio + s.conf.DstToleranceRatio = newCfg.DstToleranceRatio + s.conf.WriteLeaderPriorities = newCfg.WriteLeaderPriorities + s.conf.WritePeerPriorities = newCfg.WritePeerPriorities + s.conf.ReadPriorities = newCfg.ReadPriorities + s.conf.StrictPickingStore = newCfg.StrictPickingStore + s.conf.EnableForTiFlash = newCfg.EnableForTiFlash + s.conf.RankFormulaVersion = newCfg.RankFormulaVersion + s.conf.ForbidRWType = newCfg.ForbidRWType + s.conf.SplitThresholds = newCfg.SplitThresholds + s.conf.HistorySampleDuration = newCfg.HistorySampleDuration + s.conf.HistorySampleInterval = newCfg.HistorySampleInterval return nil } // ServeHTTP implements the http.Handler interface. -func (h *hotScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - h.conf.ServeHTTP(w, r) +func (s *hotScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.conf.ServeHTTP(w, r) } // GetMinInterval implements the Scheduler interface. @@ -264,73 +264,73 @@ func (*hotScheduler) GetMinInterval() time.Duration { } // GetNextInterval implements the Scheduler interface. -func (h *hotScheduler) GetNextInterval(time.Duration) time.Duration { - return intervalGrow(h.GetMinInterval(), maxHotScheduleInterval, exponentialGrowth) +func (s *hotScheduler) GetNextInterval(time.Duration) time.Duration { + return intervalGrow(s.GetMinInterval(), maxHotScheduleInterval, exponentialGrowth) } // IsScheduleAllowed implements the Scheduler interface. -func (h *hotScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { - allowed := h.OpController.OperatorCount(operator.OpHotRegion) < cluster.GetSchedulerConfig().GetHotRegionScheduleLimit() +func (s *hotScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + allowed := s.OpController.OperatorCount(operator.OpHotRegion) < cluster.GetSchedulerConfig().GetHotRegionScheduleLimit() if !allowed { - operator.IncOperatorLimitCounter(h.GetType(), operator.OpHotRegion) + operator.IncOperatorLimitCounter(s.GetType(), operator.OpHotRegion) } return allowed } // Schedule implements the Scheduler interface. -func (h *hotScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { +func (s *hotScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { hotSchedulerCounter.Inc() - typ := h.randomType() - return h.dispatch(typ, cluster), nil + typ := s.randomType() + return s.dispatch(typ, cluster), nil } -func (h *hotScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator { - h.Lock() - defer h.Unlock() - h.updateHistoryLoadConfig(h.conf.getHistorySampleDuration(), h.conf.getHistorySampleInterval()) - h.prepareForBalance(typ, cluster) +func (s *hotScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator { + s.Lock() + defer s.Unlock() + s.updateHistoryLoadConfig(s.conf.getHistorySampleDuration(), s.conf.getHistorySampleInterval()) + s.prepareForBalance(typ, cluster) // isForbidRWType can not be move earlier to support to use api and metrics. switch typ { case readLeader, readPeer: - if h.conf.isForbidRWType(utils.Read) { + if s.conf.isForbidRWType(utils.Read) { return nil } - return h.balanceHotReadRegions(cluster) + return s.balanceHotReadRegions(cluster) case writePeer: - if h.conf.isForbidRWType(utils.Write) { + if s.conf.isForbidRWType(utils.Write) { return nil } - return h.balanceHotWritePeers(cluster) + return s.balanceHotWritePeers(cluster) case writeLeader: - if h.conf.isForbidRWType(utils.Write) { + if s.conf.isForbidRWType(utils.Write) { return nil } - return h.balanceHotWriteLeaders(cluster) + return s.balanceHotWriteLeaders(cluster) } return nil } -func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore []uint64, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool { +func (s *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore []uint64, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool { regionID := op.RegionID() - _, ok := h.regionPendings[regionID] + _, ok := s.regionPendings[regionID] if ok { pendingOpFailsStoreCounter.Inc() return false } influence := newPendingInfluence(op, srcStore, dstStore, infl, maxZombieDur) - h.regionPendings[regionID] = influence + s.regionPendings[regionID] = influence utils.ForeachRegionStats(func(rwTy utils.RWType, dim int, kind utils.RegionStatKind) { - hotPeerHist.WithLabelValues(h.GetName(), rwTy.String(), utils.DimToString(dim)).Observe(infl.Loads[kind]) + hotPeerHist.WithLabelValues(s.GetName(), rwTy.String(), utils.DimToString(dim)).Observe(infl.Loads[kind]) }) return true } -func (h *hotScheduler) balanceHotReadRegions(cluster sche.SchedulerCluster) []*operator.Operator { - leaderSolver := newBalanceSolver(h, cluster, utils.Read, transferLeader) +func (s *hotScheduler) balanceHotReadRegions(cluster sche.SchedulerCluster) []*operator.Operator { + leaderSolver := newBalanceSolver(s, cluster, utils.Read, transferLeader) leaderOps := leaderSolver.solve() - peerSolver := newBalanceSolver(h, cluster, utils.Read, movePeer) + peerSolver := newBalanceSolver(s, cluster, utils.Read, movePeer) peerOps := peerSolver.solve() if len(leaderOps) == 0 && len(peerOps) == 0 { hotSchedulerSkipCounter.Inc() @@ -370,8 +370,8 @@ func (h *hotScheduler) balanceHotReadRegions(cluster sche.SchedulerCluster) []*o return nil } -func (h *hotScheduler) balanceHotWritePeers(cluster sche.SchedulerCluster) []*operator.Operator { - peerSolver := newBalanceSolver(h, cluster, utils.Write, movePeer) +func (s *hotScheduler) balanceHotWritePeers(cluster sche.SchedulerCluster) []*operator.Operator { + peerSolver := newBalanceSolver(s, cluster, utils.Write, movePeer) ops := peerSolver.solve() if len(ops) > 0 && peerSolver.tryAddPendingInfluence() { return ops @@ -379,8 +379,8 @@ func (h *hotScheduler) balanceHotWritePeers(cluster sche.SchedulerCluster) []*op return nil } -func (h *hotScheduler) balanceHotWriteLeaders(cluster sche.SchedulerCluster) []*operator.Operator { - leaderSolver := newBalanceSolver(h, cluster, utils.Write, transferLeader) +func (s *hotScheduler) balanceHotWriteLeaders(cluster sche.SchedulerCluster) []*operator.Operator { + leaderSolver := newBalanceSolver(s, cluster, utils.Write, transferLeader) ops := leaderSolver.solve() if len(ops) > 0 && leaderSolver.tryAddPendingInfluence() { return ops diff --git a/pkg/schedule/schedulers/scatter_range.go b/pkg/schedule/schedulers/scatter_range.go index 2e062126fea..5ba303ad05a 100644 --- a/pkg/schedule/schedulers/scatter_range.go +++ b/pkg/schedule/schedulers/scatter_range.go @@ -131,62 +131,62 @@ func newScatterRangeScheduler(opController *operator.Controller, config *scatter } // ServeHTTP implements the http.Handler interface. -func (l *scatterRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - l.handler.ServeHTTP(w, r) +func (s *scatterRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) } // EncodeConfig implements the Scheduler interface. -func (l *scatterRangeScheduler) EncodeConfig() ([]byte, error) { - l.config.RLock() - defer l.config.RUnlock() - return EncodeConfig(l.config) +func (s *scatterRangeScheduler) EncodeConfig() ([]byte, error) { + s.config.RLock() + defer s.config.RUnlock() + return EncodeConfig(s.config) } // ReloadConfig implements the Scheduler interface. -func (l *scatterRangeScheduler) ReloadConfig() error { - l.config.Lock() - defer l.config.Unlock() +func (s *scatterRangeScheduler) ReloadConfig() error { + s.config.Lock() + defer s.config.Unlock() newCfg := &scatterRangeSchedulerConfig{} - if err := l.config.load(newCfg); err != nil { + if err := s.config.load(newCfg); err != nil { return err } - l.config.RangeName = newCfg.RangeName - l.config.StartKey = newCfg.StartKey - l.config.EndKey = newCfg.EndKey + s.config.RangeName = newCfg.RangeName + s.config.StartKey = newCfg.StartKey + s.config.EndKey = newCfg.EndKey return nil } // IsScheduleAllowed implements the Scheduler interface. -func (l *scatterRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { - return l.allowBalanceLeader(cluster) || l.allowBalanceRegion(cluster) +func (s *scatterRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + return s.allowBalanceLeader(cluster) || s.allowBalanceRegion(cluster) } -func (l *scatterRangeScheduler) allowBalanceLeader(cluster sche.SchedulerCluster) bool { - allowed := l.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() +func (s *scatterRangeScheduler) allowBalanceLeader(cluster sche.SchedulerCluster) bool { + allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() if !allowed { - operator.IncOperatorLimitCounter(l.GetType(), operator.OpLeader) + operator.IncOperatorLimitCounter(s.GetType(), operator.OpLeader) } return allowed } -func (l *scatterRangeScheduler) allowBalanceRegion(cluster sche.SchedulerCluster) bool { - allowed := l.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() +func (s *scatterRangeScheduler) allowBalanceRegion(cluster sche.SchedulerCluster) bool { + allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() if !allowed { - operator.IncOperatorLimitCounter(l.GetType(), operator.OpRegion) + operator.IncOperatorLimitCounter(s.GetType(), operator.OpRegion) } return allowed } // Schedule implements the Scheduler interface. -func (l *scatterRangeScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { +func (s *scatterRangeScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { scatterRangeCounter.Inc() // isolate a new cluster according to the key range - c := genRangeCluster(cluster, l.config.getStartKey(), l.config.getEndKey()) + c := genRangeCluster(cluster, s.config.getStartKey(), s.config.getEndKey()) c.SetTolerantSizeRatio(2) - if l.allowBalanceLeader(cluster) { - ops, _ := l.balanceLeader.Schedule(c, false) + if s.allowBalanceLeader(cluster) { + ops, _ := s.balanceLeader.Schedule(c, false) if len(ops) > 0 { - ops[0].SetDesc(fmt.Sprintf("scatter-range-leader-%s", l.config.getRangeName())) + ops[0].SetDesc(fmt.Sprintf("scatter-range-leader-%s", s.config.getRangeName())) ops[0].AttachKind(operator.OpRange) ops[0].Counters = append(ops[0].Counters, scatterRangeNewOperatorCounter, @@ -195,10 +195,10 @@ func (l *scatterRangeScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) } scatterRangeNoNeedBalanceLeaderCounter.Inc() } - if l.allowBalanceRegion(cluster) { - ops, _ := l.balanceRegion.Schedule(c, false) + if s.allowBalanceRegion(cluster) { + ops, _ := s.balanceRegion.Schedule(c, false) if len(ops) > 0 { - ops[0].SetDesc(fmt.Sprintf("scatter-range-region-%s", l.config.getRangeName())) + ops[0].SetDesc(fmt.Sprintf("scatter-range-region-%s", s.config.getRangeName())) ops[0].AttachKind(operator.OpRange) ops[0].Counters = append(ops[0].Counters, scatterRangeNewOperatorCounter,