diff --git a/errors.toml b/errors.toml index a61c23a6fbd..a7e039564d7 100644 --- a/errors.toml +++ b/errors.toml @@ -141,9 +141,14 @@ error = ''' can not remove store %d since there are no extra up store to store the leader ''' -["PD:core:ErrPauseLeaderTransfer"] +["PD:core:ErrPauseLeaderTransferIn"] error = ''' -store %v is paused for leader transfer +store %v is paused for leader transfer in +''' + +["PD:core:ErrPauseLeaderTransferOut"] +error = ''' +store %v is paused for leader transfer out ''' ["PD:core:ErrSlowStoreEvicted"] diff --git a/pkg/core/basic_cluster.go b/pkg/core/basic_cluster.go index f0b23bd6434..f7c3c5e93b1 100644 --- a/pkg/core/basic_cluster.go +++ b/pkg/core/basic_cluster.go @@ -14,7 +14,11 @@ package core -import "bytes" +import ( + "bytes" + + "github.com/tikv/pd/pkg/core/constant" +) // BasicCluster provides basic data member and interface for a tikv cluster. type BasicCluster struct { @@ -137,8 +141,8 @@ type StoreSetInformer interface { // StoreSetController is used to control stores' status. type StoreSetController interface { - PauseLeaderTransfer(id uint64) error - ResumeLeaderTransfer(id uint64) + PauseLeaderTransfer(id uint64, d constant.Direction) error + ResumeLeaderTransfer(id uint64, d constant.Direction) SlowStoreEvicted(id uint64) error SlowStoreRecovered(id uint64) diff --git a/pkg/core/constant/kind.go b/pkg/core/constant/kind.go index d8059a306e7..39c256c4f5d 100644 --- a/pkg/core/constant/kind.go +++ b/pkg/core/constant/kind.go @@ -155,3 +155,24 @@ func StringToKeyType(input string) KeyType { panic("invalid key type: " + input) } } + +// Direction distinguishes different kinds of direction. +type Direction int + +const ( + // In indicates that the direction is in. + In Direction = iota + // Out indicates that the direction is out. + Out +) + +func (d Direction) String() string { + switch d { + case In: + return "in" + case Out: + return "out" + default: + return "unknown" + } +} diff --git a/pkg/core/store.go b/pkg/core/store.go index 5baedafdb05..2cd924ad339 100644 --- a/pkg/core/store.go +++ b/pkg/core/store.go @@ -51,22 +51,23 @@ const ( type StoreInfo struct { meta *metapb.Store *storeStats - pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader - slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it - slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it - leaderCount int - regionCount int - learnerCount int - witnessCount int - leaderSize int64 - regionSize int64 - pendingPeerCount int - lastPersistTime time.Time - leaderWeight float64 - regionWeight float64 - limiter storelimit.StoreLimit - minResolvedTS uint64 - lastAwakenTime time.Time + pauseLeaderTransferIn bool // not allow to be used as target of transfer leader + pauseLeaderTransferOut bool // not allow to be used as source of transfer leader + slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it + slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it + leaderCount int + regionCount int + learnerCount int + witnessCount int + leaderSize int64 + regionSize int64 + pendingPeerCount int + lastPersistTime time.Time + leaderWeight float64 + regionWeight float64 + limiter storelimit.StoreLimit + minResolvedTS uint64 + lastAwakenTime time.Time } // NewStoreInfo creates StoreInfo with meta data. @@ -138,10 +139,16 @@ func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo { return &store } -// AllowLeaderTransfer returns if the store is allowed to be selected -// as source or target of transfer leader. -func (s *StoreInfo) AllowLeaderTransfer() bool { - return !s.pauseLeaderTransfer +// AllowLeaderTransferIn returns if the store is allowed to be selected +// as target of transfer leader. +func (s *StoreInfo) AllowLeaderTransferIn() bool { + return !s.pauseLeaderTransferIn +} + +// AllowLeaderTransferOut returns if the store is allowed to be selected +// as source of transfer leader. +func (s *StoreInfo) AllowLeaderTransferOut() bool { + return !s.pauseLeaderTransferOut } // EvictedAsSlowStore returns if the store should be evicted as a slow store. @@ -775,24 +782,32 @@ func (s *StoresInfo) ResetStores() { s.stores = make(map[uint64]*StoreInfo) } -// PauseLeaderTransfer pauses a StoreInfo with storeID. -func (s *StoresInfo) PauseLeaderTransfer(storeID uint64) error { +// PauseLeaderTransfer pauses a StoreInfo with storeID. The store can not be selected +// as source or target of TransferLeader. +func (s *StoresInfo) PauseLeaderTransfer(storeID uint64, direction constant.Direction) error { s.Lock() defer s.Unlock() store, ok := s.stores[storeID] if !ok { return errs.ErrStoreNotFound.FastGenByArgs(storeID) } - if !store.AllowLeaderTransfer() { - return errs.ErrPauseLeaderTransfer.FastGenByArgs(storeID) + switch direction { + case constant.In: + if !store.AllowLeaderTransferIn() { + return errs.ErrPauseLeaderTransferIn.FastGenByArgs(storeID) + } + case constant.Out: + if !store.AllowLeaderTransferOut() { + return errs.ErrPauseLeaderTransferOut.FastGenByArgs(storeID) + } } - s.stores[storeID] = store.Clone(PauseLeaderTransfer()) + s.stores[storeID] = store.Clone(PauseLeaderTransfer(direction)) return nil } // ResumeLeaderTransfer cleans a store's pause state. The store can be selected // as source or target of TransferLeader again. -func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64) { +func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64, direction constant.Direction) { s.Lock() defer s.Unlock() store, ok := s.stores[storeID] @@ -801,7 +816,7 @@ func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64) { zap.Uint64("store-id", storeID)) return } - s.stores[storeID] = store.Clone(ResumeLeaderTransfer()) + s.stores[storeID] = store.Clone(ResumeLeaderTransfer(direction)) } // SlowStoreEvicted marks a store as a slow store and prevents transferring diff --git a/pkg/core/store_option.go b/pkg/core/store_option.go index 93b25562731..3d05a0fb6e1 100644 --- a/pkg/core/store_option.go +++ b/pkg/core/store_option.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/utils/typeutil" ) @@ -98,19 +99,27 @@ func SetStoreState(state metapb.StoreState, physicallyDestroyed ...bool) StoreCr } } -// PauseLeaderTransfer prevents the store from been selected as source or -// target store of TransferLeader. -func PauseLeaderTransfer() StoreCreateOption { +// PauseLeaderTransfer prevents the store from been selected as source or target store of TransferLeader. +func PauseLeaderTransfer(d constant.Direction) StoreCreateOption { return func(store *StoreInfo) { - store.pauseLeaderTransfer = true + switch d { + case constant.In: + store.pauseLeaderTransferIn = true + case constant.Out: + store.pauseLeaderTransferOut = true + } } } -// ResumeLeaderTransfer cleans a store's pause state. The store can be selected -// as source or target of TransferLeader again. -func ResumeLeaderTransfer() StoreCreateOption { +// ResumeLeaderTransfer cleans a store's pause state. The store can be selected as source or target of TransferLeader again. +func ResumeLeaderTransfer(d constant.Direction) StoreCreateOption { return func(store *StoreInfo) { - store.pauseLeaderTransfer = false + switch d { + case constant.In: + store.pauseLeaderTransferIn = false + case constant.Out: + store.pauseLeaderTransferOut = false + } } } diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 1f56a821032..29065c7c13d 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -71,7 +71,8 @@ var ( var ( ErrWrongRangeKeys = errors.Normalize("wrong range keys", errors.RFCCodeText("PD:core:ErrWrongRangeKeys")) ErrStoreNotFound = errors.Normalize("store %v not found", errors.RFCCodeText("PD:core:ErrStoreNotFound")) - ErrPauseLeaderTransfer = errors.Normalize("store %v is paused for leader transfer", errors.RFCCodeText("PD:core:ErrPauseLeaderTransfer")) + ErrPauseLeaderTransferIn = errors.Normalize("store %v is paused for leader transfer in", errors.RFCCodeText("PD:core:ErrPauseLeaderTransferIn")) + ErrPauseLeaderTransferOut = errors.Normalize("store %v is paused for leader transfer out", errors.RFCCodeText("PD:core:ErrPauseLeaderTransferOut")) ErrStoreRemoved = errors.Normalize("store %v has been removed", errors.RFCCodeText("PD:core:ErrStoreRemoved")) ErrStoreDestroyed = errors.Normalize("store %v has been physically destroyed", errors.RFCCodeText("PD:core:ErrStoreDestroyed")) ErrStoreUnhealthy = errors.Normalize("store %v is unhealthy", errors.RFCCodeText("PD:core:ErrStoreUnhealthy")) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 2200b34aec7..5fe17e5e723 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mock/mockid" @@ -561,9 +562,9 @@ func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) { func (mc *Cluster) SetStoreEvictLeader(storeID uint64, enableEvictLeader bool) { store := mc.GetStore(storeID) if enableEvictLeader { - mc.PutStore(store.Clone(core.PauseLeaderTransfer())) + mc.PutStore(store.Clone(core.PauseLeaderTransfer(constant.In))) } else { - mc.PutStore(store.Clone(core.ResumeLeaderTransfer())) + mc.PutStore(store.Clone(core.ResumeLeaderTransfer(constant.In))) } } diff --git a/pkg/schedule/filter/filters.go b/pkg/schedule/filter/filters.go index e2846e6c9a6..4ea59935109 100644 --- a/pkg/schedule/filter/filters.go +++ b/pkg/schedule/filter/filters.go @@ -390,8 +390,17 @@ func (f *StoreStateFilter) isRemoving(_ config.SharedConfigProvider, store *core return statusOK } -func (f *StoreStateFilter) pauseLeaderTransfer(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { - if !store.AllowLeaderTransfer() { +func (f *StoreStateFilter) pauseLeaderTransferIn(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { + if !store.AllowLeaderTransferIn() { + f.Reason = storeStatePauseLeader + return statusStoreRejectLeader + } + f.Reason = storeStateOK + return statusOK +} + +func (f *StoreStateFilter) pauseLeaderTransferOut(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { + if !store.AllowLeaderTransferOut() { f.Reason = storeStatePauseLeader return statusStoreRejectLeader } @@ -511,13 +520,13 @@ func (f *StoreStateFilter) anyConditionMatch(typ int, conf config.SharedConfigPr var funcs []conditionFunc switch typ { case leaderSource: - funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransfer, f.isDisconnected} + funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransferOut, f.isDisconnected} case regionSource: funcs = []conditionFunc{f.isBusy, f.exceedRemoveLimit, f.tooManySnapshots} case witnessSource: funcs = []conditionFunc{f.isBusy} case leaderTarget: - funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.pauseLeaderTransfer, + funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.pauseLeaderTransferIn, f.slowStoreEvicted, f.slowTrendEvicted, f.isDisconnected, f.isBusy, f.hasRejectLeaderProperty} case regionTarget: funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.isDisconnected, f.isBusy, diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 8788be98797..03f02002c74 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -493,8 +493,17 @@ func (s *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla balanceLeaderNoLeaderRegionCounter.Inc() return nil } - finalFilters := s.filters + // Check if the source store is available as a source. conf := solver.GetSchedulerConfig() + if filter.NewCandidates(s.R, []*core.StoreInfo{solver.Source}). + FilterSource(conf, nil, s.filterCounter, s.filters...).Len() == 0 { + log.Debug("store cannot be used as source", zap.String("scheduler", s.GetName()), zap.Uint64("store-id", solver.Source.GetID())) + balanceLeaderNoSourceStoreCounter.Inc() + return nil + } + + // Check if the target store is available as a target. + finalFilters := s.filters if leaderFilter := filter.NewPlacementLeaderSafeguard(s.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { finalFilters = append(s.filters, leaderFilter) } diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 73256b6102f..defc65846ae 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -98,14 +98,14 @@ func (conf *evictLeaderSchedulerConfig) removeStoreLocked(id uint64) (bool, erro _, exists := conf.StoreIDWithRanges[id] if exists { delete(conf.StoreIDWithRanges, id) - conf.cluster.ResumeLeaderTransfer(id) + conf.cluster.ResumeLeaderTransfer(id, constant.In) return len(conf.StoreIDWithRanges) == 0, nil } return false, errs.ErrScheduleConfigNotExist.FastGenByArgs() } func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) { - if err := conf.cluster.PauseLeaderTransfer(id); err != nil { + if err := conf.cluster.PauseLeaderTransfer(id, constant.In); err != nil { log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err)) } conf.StoreIDWithRanges[id] = keyRange @@ -139,7 +139,7 @@ func (conf *evictLeaderSchedulerConfig) reloadConfig() error { if err := conf.load(newCfg); err != nil { return err } - pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) + pauseAndResumeLeaderTransfer(conf.cluster, constant.In, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) conf.StoreIDWithRanges = newCfg.StoreIDWithRanges conf.Batch = newCfg.Batch return nil @@ -150,7 +150,7 @@ func (conf *evictLeaderSchedulerConfig) pauseLeaderTransfer(cluster sche.Schedul defer conf.RUnlock() var res error for id := range conf.StoreIDWithRanges { - if err := cluster.PauseLeaderTransfer(id); err != nil { + if err := cluster.PauseLeaderTransfer(id, constant.In); err != nil { res = err } } @@ -161,7 +161,7 @@ func (conf *evictLeaderSchedulerConfig) resumeLeaderTransfer(cluster sche.Schedu conf.RLock() defer conf.RUnlock() for id := range conf.StoreIDWithRanges { - cluster.ResumeLeaderTransfer(id) + cluster.ResumeLeaderTransfer(id, constant.In) } } @@ -169,7 +169,7 @@ func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id ui conf.RLock() defer conf.RUnlock() if _, exist := conf.StoreIDWithRanges[id]; !exist { - if err := conf.cluster.PauseLeaderTransfer(id); err != nil { + if err := conf.cluster.PauseLeaderTransfer(id, constant.In); err != nil { return exist, err } } @@ -179,7 +179,7 @@ func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id ui func (conf *evictLeaderSchedulerConfig) resumeLeaderTransferIfExist(id uint64) { conf.RLock() defer conf.RUnlock() - conf.cluster.ResumeLeaderTransfer(id) + conf.cluster.ResumeLeaderTransfer(id, constant.In) } func (conf *evictLeaderSchedulerConfig) update(id uint64, newRanges []core.KeyRange, batch int) error { diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index 93bd5e48395..b1960ceca8e 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" @@ -199,7 +200,7 @@ func (s *evictSlowStoreScheduler) ReloadConfig() error { for _, id := range newCfg.EvictedStores { new[id] = struct{}{} } - pauseAndResumeLeaderTransfer(s.conf.cluster, old, new) + pauseAndResumeLeaderTransfer(s.conf.cluster, constant.In, old, new) s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap s.conf.EvictedStores = newCfg.EvictedStores return nil diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index 6e3caa2e8fe..92805587f72 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" @@ -307,7 +308,7 @@ func (s *evictSlowTrendScheduler) ReloadConfig() error { for _, id := range newCfg.EvictedStores { new[id] = struct{}{} } - pauseAndResumeLeaderTransfer(s.conf.cluster, old, new) + pauseAndResumeLeaderTransfer(s.conf.cluster, constant.In, old, new) s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap s.conf.EvictedStores = newCfg.EvictedStores return nil diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 3fb8225be39..f83c2396ace 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -99,7 +99,7 @@ func (conf *grantLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last succ, last = false, false if exists { delete(conf.StoreIDWithRanges, id) - conf.cluster.ResumeLeaderTransfer(id) + conf.cluster.ResumeLeaderTransfer(id, constant.Out) succ = true last = len(conf.StoreIDWithRanges) == 0 } @@ -109,7 +109,7 @@ func (conf *grantLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last func (conf *grantLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) { conf.Lock() defer conf.Unlock() - if err := conf.cluster.PauseLeaderTransfer(id); err != nil { + if err := conf.cluster.PauseLeaderTransfer(id, constant.Out); err != nil { log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err)) } conf.StoreIDWithRanges[id] = keyRange @@ -171,7 +171,7 @@ func (s *grantLeaderScheduler) ReloadConfig() error { if err := s.conf.load(newCfg); err != nil { return err } - pauseAndResumeLeaderTransfer(s.conf.cluster, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) + pauseAndResumeLeaderTransfer(s.conf.cluster, constant.Out, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges return nil } @@ -182,7 +182,7 @@ func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro defer s.conf.RUnlock() var res error for id := range s.conf.StoreIDWithRanges { - if err := cluster.PauseLeaderTransfer(id); err != nil { + if err := cluster.PauseLeaderTransfer(id, constant.Out); err != nil { res = err } } @@ -194,7 +194,7 @@ func (s *grantLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.RLock() defer s.conf.RUnlock() for id := range s.conf.StoreIDWithRanges { - cluster.ResumeLeaderTransfer(id) + cluster.ResumeLeaderTransfer(id, constant.Out) } } @@ -252,7 +252,7 @@ func (handler *grantLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R id = (uint64)(idFloat) handler.config.RLock() if _, exists = handler.config.StoreIDWithRanges[id]; !exists { - if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil { + if err := handler.config.cluster.PauseLeaderTransfer(id, constant.Out); err != nil { handler.config.RUnlock() handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -272,7 +272,7 @@ func (handler *grantLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R err := handler.config.buildWithArgs(args) if err != nil { handler.config.Lock() - handler.config.cluster.ResumeLeaderTransfer(id) + handler.config.cluster.ResumeLeaderTransfer(id, constant.Out) handler.config.Unlock() handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return diff --git a/pkg/schedule/schedulers/grant_leader_test.go b/pkg/schedule/schedulers/grant_leader_test.go new file mode 100644 index 00000000000..38bac3c961a --- /dev/null +++ b/pkg/schedule/schedulers/grant_leader_test.go @@ -0,0 +1,61 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/types" + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/operatorutil" +) + +func TestGrantLeaderScheduler(t *testing.T) { + re := require.New(t) + cancel, _, tc, oc := prepareSchedulersTest() + defer cancel() + + tc.AddLeaderStore(1, 1) + tc.AddLeaderStore(2, 2) + tc.AddLeaderStore(3, 16) + tc.AddLeaderRegion(1, 3, 1, 2) + tc.AddLeaderRegion(2, 3, 1, 2) + tc.AddLeaderRegion(3, 1, 2, 3) + tc.AddLeaderRegion(4, 2, 1, 3) + storage := storage.NewStorageWithMemoryBackend() + + // balance leader scheduler should add operator from store 3 to store 1 + bls, err := CreateScheduler(types.BalanceLeaderScheduler, oc, storage, ConfigSliceDecoder(types.BalanceLeaderScheduler, []string{})) + re.NoError(err) + ops, _ := bls.Schedule(tc, false) + re.NotEmpty(ops) + operatorutil.CheckTransferLeader(re, ops[0], operator.OpKind(0), 3, 1) + + // add grant leader scheduler for store 3 + re.True(tc.GetStore(3).AllowLeaderTransferOut()) + gls, err := CreateScheduler(types.GrantLeaderScheduler, oc, storage, ConfigSliceDecoder(types.GrantLeaderScheduler, []string{"3"}), nil) + re.NoError(err) + re.True(gls.IsScheduleAllowed(tc)) + re.NoError(gls.PrepareConfig(tc)) + ops, _ = gls.Schedule(tc, false) + operatorutil.CheckMultiSourceTransferLeader(re, ops[0], operator.OpLeader, []uint64{1, 2}) + re.False(tc.GetStore(3).AllowLeaderTransferOut()) + + // balance leader scheduler should not add operator from store 3 to store 1 + ops, _ = bls.Schedule(tc, false) + re.Empty(ops) +} diff --git a/pkg/schedule/schedulers/metrics.go b/pkg/schedule/schedulers/metrics.go index a518a167af7..4afc4605f52 100644 --- a/pkg/schedule/schedulers/metrics.go +++ b/pkg/schedule/schedulers/metrics.go @@ -226,6 +226,7 @@ var ( balanceLeaderScheduleCounter = balanceLeaderCounterWithEvent("schedule") balanceLeaderNoLeaderRegionCounter = balanceLeaderCounterWithEvent("no-leader-region") balanceLeaderRegionHotCounter = balanceLeaderCounterWithEvent("region-hot") + balanceLeaderNoSourceStoreCounter = balanceLeaderCounterWithEvent("no-source-store") balanceLeaderNoTargetStoreCounter = balanceLeaderCounterWithEvent("no-target-store") balanceLeaderNoFollowerRegionCounter = balanceLeaderCounterWithEvent("no-follower-region") balanceLeaderSkipCounter = balanceLeaderCounterWithEvent("skip") diff --git a/pkg/schedule/schedulers/utils.go b/pkg/schedule/schedulers/utils.go index 7cbfe714aa9..9c1c7fe3854 100644 --- a/pkg/schedule/schedulers/utils.go +++ b/pkg/schedule/schedulers/utils.go @@ -391,20 +391,20 @@ func (q *retryQuota) gc(keepStores []*core.StoreInfo) { } } -// pauseAndResumeLeaderTransfer checks the old and new store IDs, and pause or resume the leader transfer. -func pauseAndResumeLeaderTransfer[T any](cluster *core.BasicCluster, old, new map[uint64]T) { +// pauseAndResumeLeaderTransfer checks the old and new store IDs, and pause or resume the leader transfer in or out. +func pauseAndResumeLeaderTransfer[T any](cluster *core.BasicCluster, direction constant.Direction, old, new map[uint64]T) { for id := range old { if _, ok := new[id]; ok { continue } - cluster.ResumeLeaderTransfer(id) + cluster.ResumeLeaderTransfer(id, direction) } for id := range new { if _, ok := old[id]; ok { continue } - if err := cluster.PauseLeaderTransfer(id); err != nil { - log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err)) + if err := cluster.PauseLeaderTransfer(id, direction); err != nil { + log.Error("pause leader transfer failed", zap.Uint64("store-id", id), zap.String("direction", direction.String()), errs.ZapError(err)) } } } diff --git a/pkg/statistics/collector.go b/pkg/statistics/collector.go index 4e3e2fa2c7a..d1a4ebf5a97 100644 --- a/pkg/statistics/collector.go +++ b/pkg/statistics/collector.go @@ -46,7 +46,7 @@ func (tikvCollector) filter(info *StoreSummaryInfo, kind constant.ResourceKind) } switch kind { case constant.LeaderKind: - return info.AllowLeaderTransfer() + return info.AllowLeaderTransferIn() && info.AllowLeaderTransferOut() case constant.RegionKind: return true } diff --git a/pkg/utils/operatorutil/operator_check.go b/pkg/utils/operatorutil/operator_check.go index ef0f6af37b2..78b9e9d9bab 100644 --- a/pkg/utils/operatorutil/operator_check.go +++ b/pkg/utils/operatorutil/operator_check.go @@ -37,6 +37,15 @@ func CheckTransferLeaderFrom(re *require.Assertions, op *operator.Operator, kind re.Equal(kind, op.Kind()&kind) } +// CheckMultiSourceTransferLeader checks if the operator is to transfer leader out of one of the source stores. +func CheckMultiSourceTransferLeader(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceIDs []uint64) { + re.NotNil(op) + re.Equal(1, op.Len()) + re.Contains(sourceIDs, op.Step(0).(operator.TransferLeader).FromStore) + kind |= operator.OpLeader + re.Equal(kind, op.Kind()&kind) +} + // CheckMultiTargetTransferLeader checks if the operator is to transfer leader from the specified source to one of the target stores. func CheckMultiTargetTransferLeader(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID uint64, targetIDs []uint64) { re.NotNil(op) diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index 2f55b2d8ecb..7164f36b1bb 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -185,7 +185,7 @@ func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro defer s.conf.mu.RUnlock() var res error for id := range s.conf.StoreIDWitRanges { - if err := cluster.PauseLeaderTransfer(id); err != nil { + if err := cluster.PauseLeaderTransfer(id, constant.In); err != nil { res = err } } @@ -197,7 +197,7 @@ func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() for id := range s.conf.StoreIDWitRanges { - cluster.ResumeLeaderTransfer(id) + cluster.ResumeLeaderTransfer(id, constant.In) } } @@ -258,7 +258,7 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R if ok { id = (uint64)(idFloat) if _, exists = handler.config.StoreIDWitRanges[id]; !exists { - if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil { + if err := handler.config.cluster.PauseLeaderTransfer(id, constant.In); err != nil { handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } @@ -276,7 +276,7 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R err := handler.config.BuildWithArgs(args) if err != nil { handler.config.mu.Lock() - handler.config.cluster.ResumeLeaderTransfer(id) + handler.config.cluster.ResumeLeaderTransfer(id, constant.In) handler.config.mu.Unlock() handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return @@ -285,7 +285,7 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R if err != nil { handler.config.mu.Lock() delete(handler.config.StoreIDWitRanges, id) - handler.config.cluster.ResumeLeaderTransfer(id) + handler.config.cluster.ResumeLeaderTransfer(id, constant.In) handler.config.mu.Unlock() handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -314,11 +314,11 @@ func (handler *evictLeaderHandler) deleteConfig(w http.ResponseWriter, r *http.R return } delete(handler.config.StoreIDWitRanges, id) - handler.config.cluster.ResumeLeaderTransfer(id) + handler.config.cluster.ResumeLeaderTransfer(id, constant.In) if err := handler.config.Persist(); err != nil { handler.config.StoreIDWitRanges[id] = ranges - _ = handler.config.cluster.PauseLeaderTransfer(id) + _ = handler.config.cluster.PauseLeaderTransfer(id, constant.In) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 9d3a3d44590..99bb60b5558 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1850,15 +1850,15 @@ func TestStores(t *testing.T) { for i, store := range stores { id := store.GetID() re.Nil(cache.GetStore(id)) - re.Error(cache.PauseLeaderTransfer(id)) + re.Error(cache.PauseLeaderTransfer(id, constant.In)) cache.PutStore(store) re.Equal(store, cache.GetStore(id)) re.Equal(i+1, cache.GetStoreCount()) - re.NoError(cache.PauseLeaderTransfer(id)) - re.False(cache.GetStore(id).AllowLeaderTransfer()) - re.Error(cache.PauseLeaderTransfer(id)) - cache.ResumeLeaderTransfer(id) - re.True(cache.GetStore(id).AllowLeaderTransfer()) + re.NoError(cache.PauseLeaderTransfer(id, constant.In)) + re.False(cache.GetStore(id).AllowLeaderTransferIn()) + re.Error(cache.PauseLeaderTransfer(id, constant.In)) + cache.ResumeLeaderTransfer(id, constant.In) + re.True(cache.GetStore(id).AllowLeaderTransferIn()) } re.Equal(int(n), cache.GetStoreCount()) diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index 314f9e3a762..3450e59d178 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -175,19 +175,19 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *pdTests.TestCluster) { schedulers := []string{"evict-leader-scheduler", "grant-leader-scheduler", "evict-leader-scheduler", "grant-leader-scheduler"} checkStorePause := func(changedStores []uint64, schedulerName string) { - status := func() string { - switch schedulerName { - case "evict-leader-scheduler": - return "paused" - case "grant-leader-scheduler": - return "resumed" - default: - re.Fail(fmt.Sprintf("unknown scheduler %s", schedulerName)) - return "" - } - }() for _, store := range stores { - isStorePaused := !cluster.GetLeaderServer().GetRaftCluster().GetStore(store.GetId()).AllowLeaderTransfer() + storeInfo := cluster.GetLeaderServer().GetRaftCluster().GetStore(store.GetId()) + status, isStorePaused := func() (string, bool) { + switch schedulerName { + case "evict-leader-scheduler": + return "paused", !storeInfo.AllowLeaderTransferIn() + case "grant-leader-scheduler": + return "paused", !storeInfo.AllowLeaderTransferOut() + default: + re.Fail(fmt.Sprintf("unknown scheduler %s", schedulerName)) + return "", false + } + }() if slice.AnyOf(changedStores, func(i int) bool { return store.GetId() == changedStores[i] }) { @@ -198,7 +198,14 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *pdTests.TestCluster) { fmt.Sprintf("store %d should not be %s with %s", store.GetId(), status, schedulerName)) } if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { - re.Equal(isStorePaused, !sche.GetCluster().GetStore(store.GetId()).AllowLeaderTransfer()) + switch schedulerName { + case "evict-leader-scheduler": + re.Equal(isStorePaused, !sche.GetCluster().GetStore(store.GetId()).AllowLeaderTransferIn()) + case "grant-leader-scheduler": + re.Equal(isStorePaused, !sche.GetCluster().GetStore(store.GetId()).AllowLeaderTransferOut()) + default: + re.Fail(fmt.Sprintf("unknown scheduler %s", schedulerName)) + } } } }