Skip to content

Commit

Permalink
scheduler: replace pauseLeader with two flags and add source filter t…
Browse files Browse the repository at this point in the history
…o transferIn (tikv#8623)

close tikv#8621

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Nov 11, 2024
1 parent 49307e2 commit 8bc9749
Show file tree
Hide file tree
Showing 21 changed files with 250 additions and 96 deletions.
9 changes: 7 additions & 2 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,14 @@ error = '''
can not remove store %d since there are no extra up store to store the leader
'''

["PD:core:ErrPauseLeaderTransfer"]
["PD:core:ErrPauseLeaderTransferIn"]
error = '''
store %v is paused for leader transfer
store %v is paused for leader transfer in
'''

["PD:core:ErrPauseLeaderTransferOut"]
error = '''
store %v is paused for leader transfer out
'''

["PD:core:ErrSlowStoreEvicted"]
Expand Down
10 changes: 7 additions & 3 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package core

import "bytes"
import (
"bytes"

"github.com/tikv/pd/pkg/core/constant"
)

// BasicCluster provides basic data member and interface for a tikv cluster.
type BasicCluster struct {
Expand Down Expand Up @@ -137,8 +141,8 @@ type StoreSetInformer interface {

// StoreSetController is used to control stores' status.
type StoreSetController interface {
PauseLeaderTransfer(id uint64) error
ResumeLeaderTransfer(id uint64)
PauseLeaderTransfer(id uint64, d constant.Direction) error
ResumeLeaderTransfer(id uint64, d constant.Direction)

SlowStoreEvicted(id uint64) error
SlowStoreRecovered(id uint64)
Expand Down
21 changes: 21 additions & 0 deletions pkg/core/constant/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,24 @@ func StringToKeyType(input string) KeyType {
panic("invalid key type: " + input)
}
}

// Direction distinguishes different kinds of direction.
type Direction int

const (
// In indicates that the direction is in.
In Direction = iota
// Out indicates that the direction is out.
Out
)

func (d Direction) String() string {
switch d {
case In:
return "in"
case Out:
return "out"
default:
return "unknown"
}
}
69 changes: 42 additions & 27 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,23 @@ const (
type StoreInfo struct {
meta *metapb.Store
*storeStats
pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
pauseLeaderTransferIn bool // not allow to be used as target of transfer leader
pauseLeaderTransferOut bool // not allow to be used as source of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
}

// NewStoreInfo creates StoreInfo with meta data.
Expand Down Expand Up @@ -138,10 +139,16 @@ func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo {
return &store
}

// AllowLeaderTransfer returns if the store is allowed to be selected
// as source or target of transfer leader.
func (s *StoreInfo) AllowLeaderTransfer() bool {
return !s.pauseLeaderTransfer
// AllowLeaderTransferIn returns if the store is allowed to be selected
// as target of transfer leader.
func (s *StoreInfo) AllowLeaderTransferIn() bool {
return !s.pauseLeaderTransferIn
}

// AllowLeaderTransferOut returns if the store is allowed to be selected
// as source of transfer leader.
func (s *StoreInfo) AllowLeaderTransferOut() bool {
return !s.pauseLeaderTransferOut
}

// EvictedAsSlowStore returns if the store should be evicted as a slow store.
Expand Down Expand Up @@ -775,24 +782,32 @@ func (s *StoresInfo) ResetStores() {
s.stores = make(map[uint64]*StoreInfo)
}

// PauseLeaderTransfer pauses a StoreInfo with storeID.
func (s *StoresInfo) PauseLeaderTransfer(storeID uint64) error {
// PauseLeaderTransfer pauses a StoreInfo with storeID. The store can not be selected
// as source or target of TransferLeader.
func (s *StoresInfo) PauseLeaderTransfer(storeID uint64, direction constant.Direction) error {
s.Lock()
defer s.Unlock()
store, ok := s.stores[storeID]
if !ok {
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}
if !store.AllowLeaderTransfer() {
return errs.ErrPauseLeaderTransfer.FastGenByArgs(storeID)
switch direction {
case constant.In:
if !store.AllowLeaderTransferIn() {
return errs.ErrPauseLeaderTransferIn.FastGenByArgs(storeID)
}
case constant.Out:
if !store.AllowLeaderTransferOut() {
return errs.ErrPauseLeaderTransferOut.FastGenByArgs(storeID)
}
}
s.stores[storeID] = store.Clone(PauseLeaderTransfer())
s.stores[storeID] = store.Clone(PauseLeaderTransfer(direction))
return nil
}

// ResumeLeaderTransfer cleans a store's pause state. The store can be selected
// as source or target of TransferLeader again.
func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64) {
func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64, direction constant.Direction) {
s.Lock()
defer s.Unlock()
store, ok := s.stores[storeID]
Expand All @@ -801,7 +816,7 @@ func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64) {
zap.Uint64("store-id", storeID))
return
}
s.stores[storeID] = store.Clone(ResumeLeaderTransfer())
s.stores[storeID] = store.Clone(ResumeLeaderTransfer(direction))
}

// SlowStoreEvicted marks a store as a slow store and prevents transferring
Expand Down
25 changes: 17 additions & 8 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/utils/typeutil"
)
Expand Down Expand Up @@ -98,19 +99,27 @@ func SetStoreState(state metapb.StoreState, physicallyDestroyed ...bool) StoreCr
}
}

// PauseLeaderTransfer prevents the store from been selected as source or
// target store of TransferLeader.
func PauseLeaderTransfer() StoreCreateOption {
// PauseLeaderTransfer prevents the store from been selected as source or target store of TransferLeader.
func PauseLeaderTransfer(d constant.Direction) StoreCreateOption {
return func(store *StoreInfo) {
store.pauseLeaderTransfer = true
switch d {
case constant.In:
store.pauseLeaderTransferIn = true
case constant.Out:
store.pauseLeaderTransferOut = true
}
}
}

// ResumeLeaderTransfer cleans a store's pause state. The store can be selected
// as source or target of TransferLeader again.
func ResumeLeaderTransfer() StoreCreateOption {
// ResumeLeaderTransfer cleans a store's pause state. The store can be selected as source or target of TransferLeader again.
func ResumeLeaderTransfer(d constant.Direction) StoreCreateOption {
return func(store *StoreInfo) {
store.pauseLeaderTransfer = false
switch d {
case constant.In:
store.pauseLeaderTransferIn = false
case constant.Out:
store.pauseLeaderTransferOut = false
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ var (
var (
ErrWrongRangeKeys = errors.Normalize("wrong range keys", errors.RFCCodeText("PD:core:ErrWrongRangeKeys"))
ErrStoreNotFound = errors.Normalize("store %v not found", errors.RFCCodeText("PD:core:ErrStoreNotFound"))
ErrPauseLeaderTransfer = errors.Normalize("store %v is paused for leader transfer", errors.RFCCodeText("PD:core:ErrPauseLeaderTransfer"))
ErrPauseLeaderTransferIn = errors.Normalize("store %v is paused for leader transfer in", errors.RFCCodeText("PD:core:ErrPauseLeaderTransferIn"))
ErrPauseLeaderTransferOut = errors.Normalize("store %v is paused for leader transfer out", errors.RFCCodeText("PD:core:ErrPauseLeaderTransferOut"))
ErrStoreRemoved = errors.Normalize("store %v has been removed", errors.RFCCodeText("PD:core:ErrStoreRemoved"))
ErrStoreDestroyed = errors.Normalize("store %v has been physically destroyed", errors.RFCCodeText("PD:core:ErrStoreDestroyed"))
ErrStoreUnhealthy = errors.Normalize("store %v is unhealthy", errors.RFCCodeText("PD:core:ErrStoreUnhealthy"))
Expand Down
5 changes: 3 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mock/mockid"
Expand Down Expand Up @@ -561,9 +562,9 @@ func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) {
func (mc *Cluster) SetStoreEvictLeader(storeID uint64, enableEvictLeader bool) {
store := mc.GetStore(storeID)
if enableEvictLeader {
mc.PutStore(store.Clone(core.PauseLeaderTransfer()))
mc.PutStore(store.Clone(core.PauseLeaderTransfer(constant.In)))
} else {
mc.PutStore(store.Clone(core.ResumeLeaderTransfer()))
mc.PutStore(store.Clone(core.ResumeLeaderTransfer(constant.In)))
}
}

Expand Down
17 changes: 13 additions & 4 deletions pkg/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,17 @@ func (f *StoreStateFilter) isRemoving(_ config.SharedConfigProvider, store *core
return statusOK
}

func (f *StoreStateFilter) pauseLeaderTransfer(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status {
if !store.AllowLeaderTransfer() {
func (f *StoreStateFilter) pauseLeaderTransferIn(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status {
if !store.AllowLeaderTransferIn() {
f.Reason = storeStatePauseLeader
return statusStoreRejectLeader
}
f.Reason = storeStateOK
return statusOK
}

func (f *StoreStateFilter) pauseLeaderTransferOut(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status {
if !store.AllowLeaderTransferOut() {
f.Reason = storeStatePauseLeader
return statusStoreRejectLeader
}
Expand Down Expand Up @@ -511,13 +520,13 @@ func (f *StoreStateFilter) anyConditionMatch(typ int, conf config.SharedConfigPr
var funcs []conditionFunc
switch typ {
case leaderSource:
funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransfer, f.isDisconnected}
funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransferOut, f.isDisconnected}
case regionSource:
funcs = []conditionFunc{f.isBusy, f.exceedRemoveLimit, f.tooManySnapshots}
case witnessSource:
funcs = []conditionFunc{f.isBusy}
case leaderTarget:
funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.pauseLeaderTransfer,
funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.pauseLeaderTransferIn,
f.slowStoreEvicted, f.slowTrendEvicted, f.isDisconnected, f.isBusy, f.hasRejectLeaderProperty}
case regionTarget:
funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.isDisconnected, f.isBusy,
Expand Down
11 changes: 10 additions & 1 deletion pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,17 @@ func (s *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla
balanceLeaderNoLeaderRegionCounter.Inc()
return nil
}
finalFilters := s.filters
// Check if the source store is available as a source.
conf := solver.GetSchedulerConfig()
if filter.NewCandidates(s.R, []*core.StoreInfo{solver.Source}).
FilterSource(conf, nil, s.filterCounter, s.filters...).Len() == 0 {
log.Debug("store cannot be used as source", zap.String("scheduler", s.GetName()), zap.Uint64("store-id", solver.Source.GetID()))
balanceLeaderNoSourceStoreCounter.Inc()
return nil
}

// Check if the target store is available as a target.
finalFilters := s.filters
if leaderFilter := filter.NewPlacementLeaderSafeguard(s.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil {
finalFilters = append(s.filters, leaderFilter)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ func (conf *evictLeaderSchedulerConfig) removeStoreLocked(id uint64) (bool, erro
_, exists := conf.StoreIDWithRanges[id]
if exists {
delete(conf.StoreIDWithRanges, id)
conf.cluster.ResumeLeaderTransfer(id)
conf.cluster.ResumeLeaderTransfer(id, constant.In)
return len(conf.StoreIDWithRanges) == 0, nil
}
return false, errs.ErrScheduleConfigNotExist.FastGenByArgs()
}

func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) {
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
if err := conf.cluster.PauseLeaderTransfer(id, constant.In); err != nil {
log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err))
}
conf.StoreIDWithRanges[id] = keyRange
Expand Down Expand Up @@ -139,7 +139,7 @@ func (conf *evictLeaderSchedulerConfig) reloadConfig() error {
if err := conf.load(newCfg); err != nil {
return err
}
pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
pauseAndResumeLeaderTransfer(conf.cluster, constant.In, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
conf.StoreIDWithRanges = newCfg.StoreIDWithRanges
conf.Batch = newCfg.Batch
return nil
Expand All @@ -150,7 +150,7 @@ func (conf *evictLeaderSchedulerConfig) pauseLeaderTransfer(cluster sche.Schedul
defer conf.RUnlock()
var res error
for id := range conf.StoreIDWithRanges {
if err := cluster.PauseLeaderTransfer(id); err != nil {
if err := cluster.PauseLeaderTransfer(id, constant.In); err != nil {
res = err
}
}
Expand All @@ -161,15 +161,15 @@ func (conf *evictLeaderSchedulerConfig) resumeLeaderTransfer(cluster sche.Schedu
conf.RLock()
defer conf.RUnlock()
for id := range conf.StoreIDWithRanges {
cluster.ResumeLeaderTransfer(id)
cluster.ResumeLeaderTransfer(id, constant.In)
}
}

func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id uint64) (bool, error) {
conf.RLock()
defer conf.RUnlock()
if _, exist := conf.StoreIDWithRanges[id]; !exist {
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
if err := conf.cluster.PauseLeaderTransfer(id, constant.In); err != nil {
return exist, err
}
}
Expand All @@ -179,7 +179,7 @@ func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id ui
func (conf *evictLeaderSchedulerConfig) resumeLeaderTransferIfExist(id uint64) {
conf.RLock()
defer conf.RUnlock()
conf.cluster.ResumeLeaderTransfer(id)
conf.cluster.ResumeLeaderTransfer(id, constant.In)
}

func (conf *evictLeaderSchedulerConfig) update(id uint64, newRanges []core.KeyRange, batch int) error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
Expand Down Expand Up @@ -199,7 +200,7 @@ func (s *evictSlowStoreScheduler) ReloadConfig() error {
for _, id := range newCfg.EvictedStores {
new[id] = struct{}{}
}
pauseAndResumeLeaderTransfer(s.conf.cluster, old, new)
pauseAndResumeLeaderTransfer(s.conf.cluster, constant.In, old, new)
s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap
s.conf.EvictedStores = newCfg.EvictedStores
return nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/schedule/schedulers/evict_slow_trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
Expand Down Expand Up @@ -307,7 +308,7 @@ func (s *evictSlowTrendScheduler) ReloadConfig() error {
for _, id := range newCfg.EvictedStores {
new[id] = struct{}{}
}
pauseAndResumeLeaderTransfer(s.conf.cluster, old, new)
pauseAndResumeLeaderTransfer(s.conf.cluster, constant.In, old, new)
s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap
s.conf.EvictedStores = newCfg.EvictedStores
return nil
Expand Down
Loading

0 comments on commit 8bc9749

Please sign in to comment.