Skip to content

Commit

Permalink
schedule: support patrol region concurrency (#8094)
Browse files Browse the repository at this point in the history
close #7963

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Oct 30, 2024
1 parent de92fc5 commit 649393a
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 35 deletions.
5 changes: 5 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,11 @@ func (o *PersistConfig) GetHotRegionCacheHitsThreshold() int {
return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold)
}

// GetPatrolRegionWorkerCount returns the worker count of the patrol.
func (o *PersistConfig) GetPatrolRegionWorkerCount() int {
return o.GetScheduleConfig().PatrolRegionWorkerCount
}

// GetMaxMovableHotPeerSize returns the max movable hot peer size.
func (o *PersistConfig) GetMaxMovableHotPeerSize() int64 {
return o.GetScheduleConfig().MaxMovableHotPeerSize
Expand Down
106 changes: 98 additions & 8 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"strconv"
"sync"
"time"

"github.com/pingcap/failpoint"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/keyutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)
Expand All @@ -47,6 +49,7 @@ const (
// MaxPatrolScanRegionLimit is the max limit of regions to scan for a batch.
MaxPatrolScanRegionLimit = 8192
patrolRegionPartition = 1024
patrolRegionChanLen = MaxPatrolScanRegionLimit
)

var (
Expand All @@ -71,6 +74,7 @@ type Controller struct {
priorityInspector *PriorityInspector
pendingProcessedRegions *cache.TTLUint64
suspectKeyRanges *cache.TTLString // suspect key-range regions that may need fix
patrolRegionContext *PatrolRegionContext

// duration is the duration of the last patrol round.
// It's exported, so it should be protected by a mutex.
Expand All @@ -82,6 +86,8 @@ type Controller struct {
// It's used to update the ticker, so we need to
// record it to avoid updating the ticker frequently.
interval time.Duration
// workerCount is the count of workers to patrol regions.
workerCount int
// patrolRegionScanLimit is the limit of regions to scan.
// It is calculated by the number of regions.
patrolRegionScanLimit int
Expand All @@ -104,6 +110,7 @@ func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config
priorityInspector: NewPriorityInspector(cluster, conf),
pendingProcessedRegions: pendingProcessedRegions,
suspectKeyRanges: cache.NewStringTTL(ctx, time.Minute, 3*time.Minute),
patrolRegionContext: &PatrolRegionContext{},
interval: cluster.GetCheckerConfig().GetPatrolRegionInterval(),
patrolRegionScanLimit: calculateScanLimit(cluster),
}
Expand All @@ -112,6 +119,9 @@ func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config
// PatrolRegions is used to scan regions.
// The checkers will check these regions to decide if they need to do some operations.
func (c *Controller) PatrolRegions() {
c.patrolRegionContext.init(c.ctx)
c.patrolRegionContext.startPatrolRegionWorkers(c)
defer c.patrolRegionContext.stop()
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
start := time.Now()
Expand All @@ -123,11 +133,20 @@ func (c *Controller) PatrolRegions() {
select {
case <-ticker.C:
c.updateTickerIfNeeded(ticker)
c.updatePatrolWorkersIfNeeded()
if c.cluster.IsSchedulingHalted() {
for len(c.patrolRegionContext.regionChan) > 0 {
<-c.patrolRegionContext.regionChan
}
log.Debug("skip patrol regions due to scheduling is halted")
continue
}

// wait for the regionChan to be drained
if len(c.patrolRegionContext.regionChan) > 0 {
continue
}

// Check priority regions first.
c.checkPriorityRegions()
// Check pending processed regions first.
Expand All @@ -150,6 +169,9 @@ func (c *Controller) PatrolRegions() {
start = time.Now()
}
failpoint.Inject("breakPatrol", func() {
for !c.IsPatrolRegionChanEmpty() {
time.Sleep(time.Millisecond * 10)
}
failpoint.Return()
})
case <-c.ctx.Done():
Expand All @@ -160,6 +182,32 @@ func (c *Controller) PatrolRegions() {
}
}

func (c *Controller) updateTickerIfNeeded(ticker *time.Ticker) {
// Note: we reset the ticker here to support updating configuration dynamically.
newInterval := c.cluster.GetCheckerConfig().GetPatrolRegionInterval()
if c.interval != newInterval {
c.interval = newInterval
ticker.Reset(newInterval)
log.Info("checkers starts patrol regions with new interval", zap.Duration("interval", newInterval))
}
}

func (c *Controller) updatePatrolWorkersIfNeeded() {
newWorkersCount := c.cluster.GetCheckerConfig().GetPatrolRegionWorkerCount()
if c.workerCount != newWorkersCount {
oldWorkersCount := c.workerCount
c.workerCount = newWorkersCount
// Stop the old workers and start the new workers.
c.patrolRegionContext.workersCancel()
c.patrolRegionContext.wg.Wait()
c.patrolRegionContext.workersCtx, c.patrolRegionContext.workersCancel = context.WithCancel(c.ctx)
c.patrolRegionContext.startPatrolRegionWorkers(c)
log.Info("checkers starts patrol regions with new workers count",
zap.Int("old-workers-count", oldWorkersCount),
zap.Int("new-workers-count", newWorkersCount))
}
}

// GetPatrolRegionsDuration returns the duration of the last patrol region round.
func (c *Controller) GetPatrolRegionsDuration() time.Duration {
c.mu.RLock()
Expand All @@ -182,7 +230,7 @@ func (c *Controller) checkRegions(startKey []byte) (key []byte, regions []*core.
}

for _, region := range regions {
c.tryAddOperators(region)
c.patrolRegionContext.regionChan <- region
key = region.GetEndKey()
}
return
Expand Down Expand Up @@ -446,13 +494,55 @@ func (c *Controller) GetPauseController(name string) (*PauseController, error) {
}
}

func (c *Controller) updateTickerIfNeeded(ticker *time.Ticker) {
// Note: we reset the ticker here to support updating configuration dynamically.
newInterval := c.cluster.GetCheckerConfig().GetPatrolRegionInterval()
if c.interval != newInterval {
c.interval = newInterval
ticker.Reset(newInterval)
log.Info("checkers starts patrol regions with new interval", zap.Duration("interval", newInterval))
// IsPatrolRegionChanEmpty returns whether the patrol region channel is empty.
func (c *Controller) IsPatrolRegionChanEmpty() bool {
if c.patrolRegionContext == nil {
return true
}
return len(c.patrolRegionContext.regionChan) == 0
}

// PatrolRegionContext is used to store the context of patrol regions.
type PatrolRegionContext struct {
workersCtx context.Context
workersCancel context.CancelFunc
regionChan chan *core.RegionInfo
wg sync.WaitGroup
}

func (p *PatrolRegionContext) init(ctx context.Context) {
p.regionChan = make(chan *core.RegionInfo, patrolRegionChanLen)
p.workersCtx, p.workersCancel = context.WithCancel(ctx)
}

func (p *PatrolRegionContext) stop() {
log.Debug("closing patrol region workers")
close(p.regionChan)
p.workersCancel()
p.wg.Wait()
log.Debug("patrol region workers are closed")
}

func (p *PatrolRegionContext) startPatrolRegionWorkers(c *Controller) {
for i := range c.workerCount {
p.wg.Add(1)
go func(i int) {
defer logutil.LogPanic()
defer p.wg.Done()
for {
select {
case region, ok := <-p.regionChan:
if !ok {
log.Debug("region channel is closed", zap.Int("worker-id", i))
return
}
c.tryAddOperators(region)
case <-p.workersCtx.Done():
log.Debug("region worker is closed", zap.Int("worker-id", i))
return
}
}
}(i)
}
}

Expand Down
33 changes: 23 additions & 10 deletions pkg/schedule/checker/priority_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/syncutil"
)

// defaultPriorityQueueSize is the default value of priority queue size.
Expand All @@ -31,16 +32,20 @@ const defaultPriorityQueueSize = 1280
type PriorityInspector struct {
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
queue *cache.PriorityQueue
mu struct {
syncutil.RWMutex
queue *cache.PriorityQueue
}
}

// NewPriorityInspector creates a priority inspector.
func NewPriorityInspector(cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *PriorityInspector {
return &PriorityInspector{
res := &PriorityInspector{
cluster: cluster,
conf: conf,
queue: cache.NewPriorityQueue(defaultPriorityQueueSize),
}
res.mu.queue = cache.NewPriorityQueue(defaultPriorityQueueSize)
return res
}

// RegionPriorityEntry records region priority info.
Expand Down Expand Up @@ -99,24 +104,28 @@ func (p *PriorityInspector) inspectRegionInReplica(region *core.RegionInfo) (mak
// It will remove if region's priority equal 0.
// It's Attempt will increase if region's priority equal last.
func (p *PriorityInspector) addOrRemoveRegion(priority int, regionID uint64) {
p.mu.Lock()
defer p.mu.Unlock()
if priority < 0 {
if entry := p.queue.Get(regionID); entry != nil && entry.Priority == priority {
if entry := p.mu.queue.Get(regionID); entry != nil && entry.Priority == priority {
e := entry.Value.(*RegionPriorityEntry)
e.Attempt++
e.Last = time.Now()
p.queue.Put(priority, e)
p.mu.queue.Put(priority, e)
} else {
entry := NewRegionEntry(regionID)
p.queue.Put(priority, entry)
p.mu.queue.Put(priority, entry)
}
} else {
p.queue.Remove(regionID)
p.mu.queue.Remove(regionID)
}
}

// GetPriorityRegions returns all regions in priority queue that needs rerun.
func (p *PriorityInspector) GetPriorityRegions() (ids []uint64) {
entries := p.queue.Elems()
p.mu.RLock()
defer p.mu.RUnlock()
entries := p.mu.queue.Elems()
for _, e := range entries {
re := e.Value.(*RegionPriorityEntry)
// avoid to some priority region occupy checker, region don't need check on next check interval
Expand All @@ -130,11 +139,15 @@ func (p *PriorityInspector) GetPriorityRegions() (ids []uint64) {

// RemovePriorityRegion removes priority region from priority queue.
func (p *PriorityInspector) RemovePriorityRegion(regionID uint64) {
p.queue.Remove(regionID)
p.mu.Lock()
defer p.mu.Unlock()
p.mu.queue.Remove(regionID)
}

// getQueueLen returns the length of priority queue.
// it's only used for test.
func (p *PriorityInspector) getQueueLen() int {
return p.queue.Len()
p.mu.RLock()
defer p.mu.RUnlock()
return p.mu.queue.Len()
}
6 changes: 6 additions & 0 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -654,6 +655,7 @@ func (c *RuleChecker) handleFilterState(region *core.RegionInfo, filterByTempSta
}

type recorder struct {
syncutil.RWMutex
offlineLeaderCounter map[uint64]uint64
lastUpdateTime time.Time
}
Expand All @@ -666,10 +668,14 @@ func newRecord() *recorder {
}

func (o *recorder) getOfflineLeaderCount(storeID uint64) uint64 {
o.RLock()
defer o.RUnlock()
return o.offlineLeaderCounter[storeID]
}

func (o *recorder) incOfflineLeaderCount(storeID uint64) {
o.Lock()
defer o.Unlock()
o.offlineLeaderCounter[storeID] += 1
o.lastUpdateTime = time.Now()
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ const (
defaultRegionScoreFormulaVersion = "v2"
defaultLeaderSchedulePolicy = "count"
defaultStoreLimitVersion = "v1"
defaultPatrolRegionWorkerCount = 1
maxPatrolRegionWorkerCount = 8

// DefaultSplitMergeInterval is the default value of config split merge interval.
DefaultSplitMergeInterval = time.Hour
defaultSwitchWitnessInterval = time.Hour
Expand Down Expand Up @@ -306,6 +309,9 @@ type ScheduleConfig struct {
// HaltScheduling is the option to halt the scheduling. Once it's on, PD will halt the scheduling,
// and any other scheduling configs will be ignored.
HaltScheduling bool `toml:"halt-scheduling" json:"halt-scheduling,string,omitempty"`

// PatrolRegionWorkerCount is the number of workers to patrol region.
PatrolRegionWorkerCount int `toml:"patrol-region-worker-count" json:"patrol-region-worker-count"`
}

// Clone returns a cloned scheduling configuration.
Expand Down Expand Up @@ -374,6 +380,9 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool)
if !meta.IsDefined("store-limit-version") {
configutil.AdjustString(&c.StoreLimitVersion, defaultStoreLimitVersion)
}
if !meta.IsDefined("patrol-region-worker-count") {
configutil.AdjustInt(&c.PatrolRegionWorkerCount, defaultPatrolRegionWorkerCount)
}

if !meta.IsDefined("enable-joint-consensus") {
c.EnableJointConsensus = defaultEnableJointConsensus
Expand Down Expand Up @@ -518,6 +527,9 @@ func (c *ScheduleConfig) Validate() error {
if c.SlowStoreEvictingAffectedStoreRatioThreshold == 0 {
return errors.Errorf("slow-store-evicting-affected-store-ratio-threshold is not set")
}
if c.PatrolRegionWorkerCount > maxPatrolRegionWorkerCount || c.PatrolRegionWorkerCount < 1 {
return errors.Errorf("patrol-region-worker-count should be between 1 and %d", maxPatrolRegionWorkerCount)
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type CheckerConfigProvider interface {
GetIsolationLevel() string
GetSplitMergeInterval() time.Duration
GetPatrolRegionInterval() time.Duration
GetPatrolRegionWorkerCount() int
GetMaxMergeRegionSize() uint64
GetMaxMergeRegionKeys() uint64
GetReplicaScheduleLimit() uint64
Expand Down
Loading

0 comments on commit 649393a

Please sign in to comment.