From d1f4b8a50e9be98efda65abce9f9012b9bc41af4 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 22 Apr 2024 17:52:13 +0800 Subject: [PATCH 01/33] checker: add patrol region concurrency Signed-off-by: lhy1024 --- pkg/cache/cache_test.go | 10 +- pkg/cache/priority_queue.go | 21 +++- pkg/mcs/scheduling/server/config/config.go | 5 + pkg/schedule/config/config.go | 7 ++ pkg/schedule/config/config_provider.go | 2 + pkg/schedule/coordinator.go | 139 ++++++++++++++------- pkg/schedule/metrics.go | 9 ++ server/config/persist_options.go | 5 + 8 files changed, 145 insertions(+), 53 deletions(-) diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 43e97dfa2b0..75f26cfed33 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -371,23 +371,23 @@ func TestPriorityQueue(t *testing.T) { pq.Remove(uint64(1)) re.Nil(pq.Get(1)) re.Equal(2, pq.Len()) - entry := pq.Peek() + entry := pq.peek() re.Equal(2, entry.Priority) re.Equal(testData[2], entry.Value) // case3 update 3's priority to highest pq.Put(-1, testData[3]) - entry = pq.Peek() + entry = pq.peek() re.Equal(-1, entry.Priority) re.Equal(testData[3], entry.Value) pq.Remove(entry.Value.ID()) - re.Equal(testData[2], pq.Peek().Value) + re.Equal(testData[2], pq.peek().Value) re.Equal(1, pq.Len()) // case4 remove all element pq.Remove(uint64(2)) re.Equal(0, pq.Len()) re.Empty(pq.items) - re.Nil(pq.Peek()) - re.Nil(pq.Tail()) + re.Nil(pq.peek()) + re.Nil(pq.tail()) } diff --git a/pkg/cache/priority_queue.go b/pkg/cache/priority_queue.go index a7ac79090b0..31a0ee77f62 100644 --- a/pkg/cache/priority_queue.go +++ b/pkg/cache/priority_queue.go @@ -16,6 +16,7 @@ package cache import ( "github.com/tikv/pd/pkg/btree" + "github.com/tikv/pd/pkg/utils/syncutil" ) // defaultDegree default btree degree, the depth is h 0 { + <-regionChan + } + continue + } + + // Check priority regions first. + c.waitDrainRegionChan(regionChan) + c.checkPriorityRegions() + // Check suspect regions first. + c.waitDrainRegionChan(regionChan) + c.checkSuspectRegions(regionChan) + // Check regions in the waiting list + c.waitDrainRegionChan(regionChan) + c.checkWaitingRegions(regionChan) + + c.waitDrainRegionChan(regionChan) + regions = c.cluster.ScanRegions(key, nil, patrolScanRegionLimit) + if len(regions) == 0 { + continue + } + // Updates the label level isolation statistics. + c.cluster.UpdateRegionsLabelLevelStats(regions) + if len(key) == 0 { + // Resets the scan key. + key = nil + dur := time.Since(start) + patrolCheckRegionsGauge.Set(dur.Seconds()) + c.setPatrolRegionsDuration(dur) + start = time.Now() + } + for _, region := range regions { + regionChan <- region + key = region.GetEndKey() + } + + failpoint.Inject("break-patrol", func() { + failpoint.Break() + }) case <-c.ctx.Done(): patrolCheckRegionsGauge.Set(0) c.setPatrolRegionsDuration(0) log.Info("patrol regions has been stopped") + close(regionChan) + close(quit) + wg.Wait() return } - if c.isSchedulingHalted() { - continue - } + } +} - // Check priority regions first. - c.checkPriorityRegions() - // Check suspect regions first. - c.checkSuspectRegions() - // Check regions in the waiting list - c.checkWaitingRegions() +func (c *Coordinator) startPatrolRegionWorkers(workers int, regionChan <-chan *core.RegionInfo, quit <-chan bool, wg *sync.WaitGroup) { + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case region, ok := <-regionChan: + if ok { + c.tryAddOperators(region) + } + case <-quit: + return + } + } + }() + } +} - key, regions = c.checkRegions(key) - if len(regions) == 0 { - continue - } - // Updates the label level isolation statistics. - c.cluster.UpdateRegionsLabelLevelStats(regions) - if len(key) == 0 { - dur := time.Since(start) - patrolCheckRegionsGauge.Set(dur.Seconds()) - c.setPatrolRegionsDuration(dur) - start = time.Now() +// waitDrainRegionChan is used to drain the regionChan. +// It is used to avoid duplicated regions in the regionChan from different sources. +func (c *Coordinator) waitDrainRegionChan(regionChan chan *core.RegionInfo) { + patrolCheckRegionsChanLenGauge.Set(float64(len(regionChan))) + ticker := time.NewTicker(c.cluster.GetCheckerConfig().GetPatrolRegionInterval()) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + if len(regionChan) == 0 { + return + } } - failpoint.Inject("break-patrol", func() { - failpoint.Break() - }) } } @@ -211,34 +277,19 @@ func (c *Coordinator) isSchedulingHalted() bool { return c.cluster.GetSchedulerConfig().IsSchedulingHalted() } -func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) { - regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit) - if len(regions) == 0 { - // Resets the scan key. - key = nil - return - } - - for _, region := range regions { - c.tryAddOperators(region) - key = region.GetEndKey() - } - return -} - -func (c *Coordinator) checkSuspectRegions() { +func (c *Coordinator) checkSuspectRegions(regionChan chan *core.RegionInfo) { for _, id := range c.checkers.GetSuspectRegions() { region := c.cluster.GetRegion(id) - c.tryAddOperators(region) + regionChan <- region } } -func (c *Coordinator) checkWaitingRegions() { +func (c *Coordinator) checkWaitingRegions(regionChan chan *core.RegionInfo) { items := c.checkers.GetWaitingRegions() waitingListGauge.Set(float64(len(items))) for _, item := range items { region := c.cluster.GetRegion(item.Key) - c.tryAddOperators(region) + regionChan <- region } } diff --git a/pkg/schedule/metrics.go b/pkg/schedule/metrics.go index 6927fb1f178..0a09f157a6c 100644 --- a/pkg/schedule/metrics.go +++ b/pkg/schedule/metrics.go @@ -40,10 +40,19 @@ var ( Name: "patrol_regions_time", Help: "Time spent of patrol checks region.", }) + + patrolCheckRegionsChanLenGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "checker", + Name: "patrol_regions_chan_len", + Help: "Time channel length of patrol checks region.", + }) ) func init() { prometheus.MustRegister(hotSpotStatusGauge) prometheus.MustRegister(regionListGauge) prometheus.MustRegister(patrolCheckRegionsGauge) + prometheus.MustRegister(patrolCheckRegionsChanLenGauge) } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 62118dde593..7335137e541 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -658,6 +658,11 @@ func (o *PersistOptions) GetHotRegionCacheHitsThreshold() int { return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold) } +// GetPatrolRegionConcurrency returns the worker count of the patrol. +func (o *PersistOptions) GetPatrolRegionConcurrency() int { + return int(o.GetScheduleConfig().PatrolRegionConcurrency) +} + // GetStoresLimit gets the stores' limit. func (o *PersistOptions) GetStoresLimit() map[uint64]sc.StoreLimitConfig { return o.GetScheduleConfig().StoreLimit From 6d0fbd4ce33539590e1f52b6e3c7efeba99e9ae7 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 22 Apr 2024 18:21:47 +0800 Subject: [PATCH 02/33] speedup drain Signed-off-by: lhy1024 --- pkg/schedule/coordinator.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 52b647b6115..0e615c735b5 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -259,6 +259,9 @@ func (c *Coordinator) startPatrolRegionWorkers(workers int, regionChan <-chan *c // It is used to avoid duplicated regions in the regionChan from different sources. func (c *Coordinator) waitDrainRegionChan(regionChan chan *core.RegionInfo) { patrolCheckRegionsChanLenGauge.Set(float64(len(regionChan))) + if len(regionChan) == 0 { + return + } ticker := time.NewTicker(c.cluster.GetCheckerConfig().GetPatrolRegionInterval()) defer ticker.Stop() for { From 9b43d61367e4cfe2fe2f1ac36dd44d3060d13f9c Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 22 Apr 2024 20:07:40 +0800 Subject: [PATCH 03/33] fix config Signed-off-by: lhy1024 --- pkg/schedule/coordinator.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 0e615c735b5..84129d4b749 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -179,6 +179,10 @@ func (c *Coordinator) PatrolRegions() { ticker.Reset(c.cluster.GetCheckerConfig().GetPatrolRegionInterval()) newWorkersCount := c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency() if newWorkersCount != workersCount { + log.Info("coordinator starts patrol regions with new workers count", + zap.Int("old-workers-count", workersCount), + zap.Int("new-workers-count", newWorkersCount)) + workersCount = newWorkersCount close(quit) wg.Wait() quit = make(chan bool) From b2b4f398254dcdf425200a19741279395db13e9f Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 22 Apr 2024 21:10:32 +0800 Subject: [PATCH 04/33] make config Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/config/config.go | 5 +++++ pkg/schedule/config/config.go | 14 +++++++++++--- pkg/schedule/config/config_provider.go | 2 ++ pkg/schedule/coordinator.go | 5 ++--- server/config/persist_options.go | 7 ++++++- 5 files changed, 26 insertions(+), 7 deletions(-) diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 9803bc8a4f3..07824a67f35 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -403,6 +403,11 @@ func (o *PersistConfig) GetPatrolRegionConcurrency() int { return int(o.GetScheduleConfig().PatrolRegionConcurrency) } +// GetPatrolRegionBatchLimit returns the region count of the patrol. +func (o *PersistConfig) GetPatrolRegionBatchLimit() int { + return int(o.GetScheduleConfig().PatrolRegionBatchLimit) +} + // GetMaxMovableHotPeerSize returns the max movable hot peer size. func (o *PersistConfig) GetMaxMovableHotPeerSize() int64 { return o.GetScheduleConfig().MaxMovableHotPeerSize diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index a0cf6dee627..159e6eca77e 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -63,7 +63,8 @@ const ( defaultRegionScoreFormulaVersion = "v2" defaultLeaderSchedulePolicy = "count" defaultStoreLimitVersion = "v1" - defaultPatrolRegionConcurrency = 8 + defaultPatrolRegionConcurrency = 1 + defaultPatrolRegionBatchLimit = 128 // DefaultSplitMergeInterval is the default value of config split merge interval. DefaultSplitMergeInterval = time.Hour defaultSwitchWitnessInterval = time.Hour @@ -307,8 +308,11 @@ type ScheduleConfig struct { // and any other scheduling configs will be ignored. HaltScheduling bool `toml:"halt-scheduling" json:"halt-scheduling,string,omitempty"` - // PatrolRegionConcurrency is the number of workers to patrol region. + // PatrolRegionConcurrency is the number of workers to patrol region. PatrolRegionConcurrency uint64 `toml:"patrol-worker-count" json:"patrol-worker-count"` + + // PatrolRegionBatchLimit is the number of regions to patrol in one batch. + PatrolRegionBatchLimit uint64 `toml:"patrol-region-batch-limit" json:"patrol-region-batch-limit"` } // Clone returns a cloned scheduling configuration. @@ -378,10 +382,14 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool) if !meta.IsDefined("store-limit-version") { configutil.AdjustString(&c.StoreLimitVersion, defaultStoreLimitVersion) } - if !meta.IsDefined("patrol-worker-count") { + if !meta.IsDefined("patrol-region-concurrency") { configutil.AdjustUint64(&c.PatrolRegionConcurrency, defaultPatrolRegionConcurrency) } + if !meta.IsDefined("patrol-region-batch-limit") { + configutil.AdjustUint64(&c.PatrolRegionBatchLimit, defaultPatrolRegionBatchLimit) + } + if !meta.IsDefined("enable-joint-consensus") { c.EnableJointConsensus = defaultEnableJointConsensus } diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index ca9b352722a..ec0333a1032 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -63,6 +63,7 @@ type SchedulerConfigProvider interface { GetMaxMovableHotPeerSize() int64 IsTraceRegionFlow() bool GetPatrolRegionConcurrency() int + GetPatrolRegionBatchLimit() int GetTolerantSizeRatio() float64 GetLeaderSchedulePolicy() constant.SchedulePolicy @@ -119,6 +120,7 @@ type SharedConfigProvider interface { SetHaltScheduling(bool, string) GetHotRegionCacheHitsThreshold() int GetPatrolRegionConcurrency() int + GetPatrolRegionBatchLimit() int // for test purpose SetPlacementRuleEnabled(bool) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 84129d4b749..528063f7b89 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -52,7 +52,6 @@ const ( // pushOperatorTickInterval is the interval try to push the operator. pushOperatorTickInterval = 500 * time.Millisecond - patrolScanRegionLimit = 1024 // It takes about 14 minutes to iterate 1 million regions. // PluginLoad means action for load plugin PluginLoad = "PluginLoad" // PluginUnload means action for unload plugin @@ -161,7 +160,7 @@ func (c *Coordinator) PatrolRegions() { defer ticker.Stop() workersCount := c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency() - regionChan := make(chan *core.RegionInfo, patrolScanRegionLimit) + regionChan := make(chan *core.RegionInfo, c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency()) quit := make(chan bool) var wg sync.WaitGroup c.startPatrolRegionWorkers(workersCount, regionChan, quit, &wg) @@ -206,7 +205,7 @@ func (c *Coordinator) PatrolRegions() { c.checkWaitingRegions(regionChan) c.waitDrainRegionChan(regionChan) - regions = c.cluster.ScanRegions(key, nil, patrolScanRegionLimit) + regions = c.cluster.ScanRegions(key, nil, c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency()) if len(regions) == 0 { continue } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 7335137e541..b0b15bf6f64 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -658,11 +658,16 @@ func (o *PersistOptions) GetHotRegionCacheHitsThreshold() int { return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold) } -// GetPatrolRegionConcurrency returns the worker count of the patrol. +// GetPatrolRegionConcurrency returns the worker count of the patrol. func (o *PersistOptions) GetPatrolRegionConcurrency() int { return int(o.GetScheduleConfig().PatrolRegionConcurrency) } +// GetPatrolRegionBatchLimit returns the region count of the patrol. +func (o *PersistOptions) GetPatrolRegionBatchLimit() int { + return int(o.GetScheduleConfig().PatrolRegionBatchLimit) +} + // GetStoresLimit gets the stores' limit. func (o *PersistOptions) GetStoresLimit() map[uint64]sc.StoreLimitConfig { return o.GetScheduleConfig().StoreLimit From cb285f69a007a7fb9e1f170dac1f567d575ff606 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 16 May 2024 19:54:34 +0800 Subject: [PATCH 05/33] update Signed-off-by: lhy1024 --- pkg/schedule/config/config.go | 2 +- pkg/schedule/coordinator.go | 30 ++++++++++++++++++++---------- tools/pd-simulator/main.go | 4 ++-- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index 159e6eca77e..f4735afef4f 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -382,7 +382,7 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool) if !meta.IsDefined("store-limit-version") { configutil.AdjustString(&c.StoreLimitVersion, defaultStoreLimitVersion) } - if !meta.IsDefined("patrol-region-concurrency") { + if !meta.IsDefined("patrol-worker-count") { configutil.AdjustUint64(&c.PatrolRegionConcurrency, defaultPatrolRegionConcurrency) } diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 528063f7b89..1405a6cb481 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -51,6 +51,7 @@ const ( maxLoadConfigRetries = 10 // pushOperatorTickInterval is the interval try to push the operator. pushOperatorTickInterval = 500 * time.Millisecond + patrolRegionChanLen = 1024 // PluginLoad means action for load plugin PluginLoad = "PluginLoad" @@ -160,7 +161,7 @@ func (c *Coordinator) PatrolRegions() { defer ticker.Stop() workersCount := c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency() - regionChan := make(chan *core.RegionInfo, c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency()) + regionChan := make(chan *core.RegionInfo, patrolRegionChanLen) quit := make(chan bool) var wg sync.WaitGroup c.startPatrolRegionWorkers(workersCount, regionChan, quit, &wg) @@ -205,25 +206,18 @@ func (c *Coordinator) PatrolRegions() { c.checkWaitingRegions(regionChan) c.waitDrainRegionChan(regionChan) - regions = c.cluster.ScanRegions(key, nil, c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency()) + key, regions = c.checkRegions(key, c.cluster.GetCheckerConfig().GetPatrolRegionBatchLimit(), regionChan) if len(regions) == 0 { continue } // Updates the label level isolation statistics. c.cluster.UpdateRegionsLabelLevelStats(regions) if len(key) == 0 { - // Resets the scan key. - key = nil dur := time.Since(start) patrolCheckRegionsGauge.Set(dur.Seconds()) c.setPatrolRegionsDuration(dur) start = time.Now() } - for _, region := range regions { - regionChan <- region - key = region.GetEndKey() - } - failpoint.Inject("break-patrol", func() { failpoint.Break() }) @@ -243,8 +237,10 @@ func (c *Coordinator) startPatrolRegionWorkers(workers int, regionChan <-chan *c for i := 0; i < workers; i++ { wg.Add(1) go func() { + defer logutil.LogPanic() defer wg.Done() for { + patrolCheckRegionsChanLenGauge.Set(float64(len(regionChan))) select { case region, ok := <-regionChan: if ok { @@ -261,7 +257,6 @@ func (c *Coordinator) startPatrolRegionWorkers(workers int, regionChan <-chan *c // waitDrainRegionChan is used to drain the regionChan. // It is used to avoid duplicated regions in the regionChan from different sources. func (c *Coordinator) waitDrainRegionChan(regionChan chan *core.RegionInfo) { - patrolCheckRegionsChanLenGauge.Set(float64(len(regionChan))) if len(regionChan) == 0 { return } @@ -279,6 +274,21 @@ func (c *Coordinator) waitDrainRegionChan(regionChan chan *core.RegionInfo) { } } +func (c *Coordinator) checkRegions(startKey []byte, patrolScanRegionLimit int, regionChan chan *core.RegionInfo) (key []byte, regions []*core.RegionInfo) { + regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit) + if len(regions) == 0 { + // Resets the scan key. + key = nil + return + } + + for _, region := range regions { + regionChan <- region + key = region.GetEndKey() + } + return +} + func (c *Coordinator) isSchedulingHalted() bool { return c.cluster.GetSchedulerConfig().IsSchedulingHalted() } diff --git a/tools/pd-simulator/main.go b/tools/pd-simulator/main.go index 73f4a0bba12..955c0d8361e 100644 --- a/tools/pd-simulator/main.go +++ b/tools/pd-simulator/main.go @@ -49,8 +49,8 @@ var ( serverLogLevel = flag.String("serverLog", "info", "pd server log level") simLogLevel = flag.String("simLog", "info", "simulator log level") simLogFile = flag.String("log-file", "", "simulator log file") - regionNum = flag.Int("regionNum", 0, "regionNum of one store") - storeNum = flag.Int("storeNum", 0, "storeNum") + regionNum = flag.Int("regionNum", 50000, "regionNum of one store") + storeNum = flag.Int("storeNum", 20, "storeNum") enableTransferRegionCounter = flag.Bool("enableTransferRegionCounter", false, "enableTransferRegionCounter") statusAddress = flag.String("status-addr", "0.0.0.0:20180", "status address") ) From 351ef5c55b344683a2de9a703bd8a55043d87a9f Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 24 May 2024 01:52:27 +0800 Subject: [PATCH 06/33] fix race Signed-off-by: lhy1024 --- pkg/schedule/operator/operator_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index d63e843f52a..fe93bd98756 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -461,7 +461,7 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool return false, NotInCreateStatus } if !isPromoting && oc.wopStatus.getCount(op.Desc()) >= oc.config.GetSchedulerMaxWaitingOperator() { - log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.ops[op.Desc()]), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator())) + log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.getCount(op.Desc())), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator())) operatorCounter.WithLabelValues(op.Desc(), "exceed-max-waiting").Inc() return false, ExceedWaitLimit } From a0ec33d618b484e2426f1b33e7c7820a1e7eae26 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 24 May 2024 11:55:59 +0800 Subject: [PATCH 07/33] fix test Signed-off-by: lhy1024 --- pkg/cache/priority_queue.go | 13 ------------- pkg/schedule/checker/priority_inspector.go | 9 +++++++++ pkg/schedule/coordinator.go | 12 +++++++----- server/cluster/cluster_test.go | 3 ++- tools/pd-simulator/main.go | 4 ++-- 5 files changed, 20 insertions(+), 21 deletions(-) diff --git a/pkg/cache/priority_queue.go b/pkg/cache/priority_queue.go index 31a0ee77f62..756807e8b6f 100644 --- a/pkg/cache/priority_queue.go +++ b/pkg/cache/priority_queue.go @@ -16,7 +16,6 @@ package cache import ( "github.com/tikv/pd/pkg/btree" - "github.com/tikv/pd/pkg/utils/syncutil" ) // defaultDegree default btree degree, the depth is h Date: Fri, 24 May 2024 12:19:39 +0800 Subject: [PATCH 08/33] remove batch limit config Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/config/config.go | 5 ----- pkg/schedule/config/config.go | 8 -------- pkg/schedule/config/config_provider.go | 2 -- pkg/schedule/coordinator.go | 13 ++++++++++++- server/config/persist_options.go | 5 ----- 5 files changed, 12 insertions(+), 21 deletions(-) diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 4f69b01e8a7..d3a963fd071 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -401,11 +401,6 @@ func (o *PersistConfig) GetPatrolRegionConcurrency() int { return int(o.GetScheduleConfig().PatrolRegionConcurrency) } -// GetPatrolRegionBatchLimit returns the region count of the patrol. -func (o *PersistConfig) GetPatrolRegionBatchLimit() int { - return int(o.GetScheduleConfig().PatrolRegionBatchLimit) -} - // GetMaxMovableHotPeerSize returns the max movable hot peer size. func (o *PersistConfig) GetMaxMovableHotPeerSize() int64 { return o.GetScheduleConfig().MaxMovableHotPeerSize diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index 26916ac7dec..62531d7a887 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -64,7 +64,6 @@ const ( defaultLeaderSchedulePolicy = "count" defaultStoreLimitVersion = "v1" defaultPatrolRegionConcurrency = 1 - defaultPatrolRegionBatchLimit = 128 // DefaultSplitMergeInterval is the default value of config split merge interval. DefaultSplitMergeInterval = time.Hour defaultSwitchWitnessInterval = time.Hour @@ -310,9 +309,6 @@ type ScheduleConfig struct { // PatrolRegionConcurrency is the number of workers to patrol region. PatrolRegionConcurrency uint64 `toml:"patrol-worker-count" json:"patrol-worker-count"` - - // PatrolRegionBatchLimit is the number of regions to patrol in one batch. - PatrolRegionBatchLimit uint64 `toml:"patrol-region-batch-limit" json:"patrol-region-batch-limit"` } // Clone returns a cloned scheduling configuration. @@ -386,10 +382,6 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool) configutil.AdjustUint64(&c.PatrolRegionConcurrency, defaultPatrolRegionConcurrency) } - if !meta.IsDefined("patrol-region-batch-limit") { - configutil.AdjustUint64(&c.PatrolRegionBatchLimit, defaultPatrolRegionBatchLimit) - } - if !meta.IsDefined("enable-joint-consensus") { c.EnableJointConsensus = defaultEnableJointConsensus } diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index ccf86a40f86..19c68db1053 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -63,7 +63,6 @@ type SchedulerConfigProvider interface { GetMaxMovableHotPeerSize() int64 IsTraceRegionFlow() bool GetPatrolRegionConcurrency() int - GetPatrolRegionBatchLimit() int GetTolerantSizeRatio() float64 GetLeaderSchedulePolicy() constant.SchedulePolicy @@ -120,7 +119,6 @@ type SharedConfigProvider interface { SetHaltScheduling(bool, string) GetHotRegionCacheHitsThreshold() int GetPatrolRegionConcurrency() int - GetPatrolRegionBatchLimit() int // for test purpose SetPlacementRuleEnabled(bool) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 62cac40b171..e21110cba83 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -51,7 +51,12 @@ const ( maxLoadConfigRetries = 10 // pushOperatorTickInterval is the interval try to push the operator. pushOperatorTickInterval = 500 * time.Millisecond + + // For 1,024,000 regions, patrolScanRegionLimit is 1000, which is max(patrolScanRegionMinLimit, 1000000/patrolRegionPatition) + // It takes about 10s to iterate 1 million regions(with DefaultPatrolRegionInterval=10ms) where other steps are not considered. + patrolScanRegionMinLimit = 128 patrolRegionChanLen = 1024 + patrolRegionPatition = 1024 // PluginLoad means action for load plugin PluginLoad = "PluginLoad" @@ -173,6 +178,7 @@ func (c *Coordinator) PatrolRegions() { log.Info("coordinator starts patrol regions") start := time.Now() + patrolScanRegionLimit := c.getPatrolScanRegionLimit() var ( key []byte regions []*core.RegionInfo @@ -211,7 +217,7 @@ func (c *Coordinator) PatrolRegions() { c.checkWaitingRegions(regionChan) c.waitDrainRegionChan(regionChan) - key, regions = c.checkRegions(key, c.cluster.GetCheckerConfig().GetPatrolRegionBatchLimit(), regionChan) + key, regions = c.checkRegions(key, patrolScanRegionLimit, regionChan) if len(regions) == 0 { continue } @@ -222,6 +228,7 @@ func (c *Coordinator) PatrolRegions() { patrolCheckRegionsGauge.Set(dur.Seconds()) c.setPatrolRegionsDuration(dur) start = time.Now() + patrolScanRegionLimit = c.getPatrolScanRegionLimit() } failpoint.Inject("break-patrol", func() { failpoint.Return() @@ -235,6 +242,10 @@ func (c *Coordinator) PatrolRegions() { } } +func (c *Coordinator) getPatrolScanRegionLimit() int { + return max(patrolScanRegionMinLimit, c.cluster.GetTotalRegionCount()/patrolRegionPatition) +} + func (c *Coordinator) startPatrolRegionWorkers(workers int, regionChan <-chan *core.RegionInfo, quit <-chan bool, wg *sync.WaitGroup) { for i := 0; i < workers; i++ { wg.Add(1) diff --git a/server/config/persist_options.go b/server/config/persist_options.go index e3722363d15..641788e4036 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -663,11 +663,6 @@ func (o *PersistOptions) GetPatrolRegionConcurrency() int { return int(o.GetScheduleConfig().PatrolRegionConcurrency) } -// GetPatrolRegionBatchLimit returns the region count of the patrol. -func (o *PersistOptions) GetPatrolRegionBatchLimit() int { - return int(o.GetScheduleConfig().PatrolRegionBatchLimit) -} - // GetStoresLimit gets the stores' limit. func (o *PersistOptions) GetStoresLimit() map[uint64]sc.StoreLimitConfig { return o.GetScheduleConfig().StoreLimit From 97a40a9edb2cadd315674add39b9c67686e31062 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 27 May 2024 13:50:17 +0800 Subject: [PATCH 09/33] address comments Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index d3a963fd071..d1c1db9ff6a 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -396,7 +396,7 @@ func (o *PersistConfig) GetHotRegionCacheHitsThreshold() int { return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold) } -// GetPatrolRegionConcurrency returns the worker count of the patrol. +// GetPatrolRegionConcurrency returns the worker count of the patrol. func (o *PersistConfig) GetPatrolRegionConcurrency() int { return int(o.GetScheduleConfig().PatrolRegionConcurrency) } From 438efce2f6f9e4afe0b551a8922ee2c6563584b4 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 27 May 2024 15:04:12 +0800 Subject: [PATCH 10/33] address comments Signed-off-by: lhy1024 --- pkg/schedule/config/config_provider.go | 3 +-- pkg/schedule/coordinator.go | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index 19c68db1053..c3bd8ee3172 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -62,7 +62,6 @@ type SchedulerConfigProvider interface { GetHotRegionCacheHitsThreshold() int GetMaxMovableHotPeerSize() int64 IsTraceRegionFlow() bool - GetPatrolRegionConcurrency() int GetTolerantSizeRatio() float64 GetLeaderSchedulePolicy() constant.SchedulePolicy @@ -89,6 +88,7 @@ type CheckerConfigProvider interface { GetIsolationLevel() string GetSplitMergeInterval() time.Duration GetPatrolRegionInterval() time.Duration + GetPatrolRegionConcurrency() int GetMaxMergeRegionSize() uint64 GetMaxMergeRegionKeys() uint64 GetReplicaScheduleLimit() uint64 @@ -118,7 +118,6 @@ type SharedConfigProvider interface { IsPlacementRulesCacheEnabled() bool SetHaltScheduling(bool, string) GetHotRegionCacheHitsThreshold() int - GetPatrolRegionConcurrency() int // for test purpose SetPlacementRuleEnabled(bool) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index e21110cba83..bdf2af4ce0b 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -52,11 +52,11 @@ const ( // pushOperatorTickInterval is the interval try to push the operator. pushOperatorTickInterval = 500 * time.Millisecond - // For 1,024,000 regions, patrolScanRegionLimit is 1000, which is max(patrolScanRegionMinLimit, 1000000/patrolRegionPatition) + // For 1,024,000 regions, patrolScanRegionLimit is 1000, which is max(patrolScanRegionMinLimit, 1000000/patrolRegionPartition) // It takes about 10s to iterate 1 million regions(with DefaultPatrolRegionInterval=10ms) where other steps are not considered. patrolScanRegionMinLimit = 128 patrolRegionChanLen = 1024 - patrolRegionPatition = 1024 + patrolRegionPartition = 1024 // PluginLoad means action for load plugin PluginLoad = "PluginLoad" @@ -243,7 +243,7 @@ func (c *Coordinator) PatrolRegions() { } func (c *Coordinator) getPatrolScanRegionLimit() int { - return max(patrolScanRegionMinLimit, c.cluster.GetTotalRegionCount()/patrolRegionPatition) + return max(patrolScanRegionMinLimit, c.cluster.GetTotalRegionCount()/patrolRegionPartition) } func (c *Coordinator) startPatrolRegionWorkers(workers int, regionChan <-chan *core.RegionInfo, quit <-chan bool, wg *sync.WaitGroup) { From c59b47c2426d192fb976282f79a3f12781226bda Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 28 May 2024 12:03:17 +0800 Subject: [PATCH 11/33] address comments Signed-off-by: lhy1024 --- pkg/cache/priority_queue.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/cache/priority_queue.go b/pkg/cache/priority_queue.go index 756807e8b6f..ca2d405bcca 100644 --- a/pkg/cache/priority_queue.go +++ b/pkg/cache/priority_queue.go @@ -18,17 +18,17 @@ import ( "github.com/tikv/pd/pkg/btree" ) -// defaultDegree default btree degree, the depth is h Date: Mon, 3 Jun 2024 13:54:41 +0800 Subject: [PATCH 12/33] refactor and add patrol region context Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/config/config.go | 6 +- pkg/schedule/config/config.go | 8 +- pkg/schedule/config/config_provider.go | 2 +- pkg/schedule/coordinator.go | 219 +++++++++++++-------- pkg/schedule/metrics.go | 9 - server/config/persist_options.go | 6 +- 6 files changed, 146 insertions(+), 104 deletions(-) diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index d1c1db9ff6a..3ba3317d08c 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -396,9 +396,9 @@ func (o *PersistConfig) GetHotRegionCacheHitsThreshold() int { return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold) } -// GetPatrolRegionConcurrency returns the worker count of the patrol. -func (o *PersistConfig) GetPatrolRegionConcurrency() int { - return int(o.GetScheduleConfig().PatrolRegionConcurrency) +// GetPatrolRegionWorkerCount returns the worker count of the patrol. +func (o *PersistConfig) GetPatrolRegionWorkerCount() int { + return o.GetScheduleConfig().PatrolRegionWorkerCount } // GetMaxMovableHotPeerSize returns the max movable hot peer size. diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index 62531d7a887..737d6b8fdd7 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -63,7 +63,7 @@ const ( defaultRegionScoreFormulaVersion = "v2" defaultLeaderSchedulePolicy = "count" defaultStoreLimitVersion = "v1" - defaultPatrolRegionConcurrency = 1 + defaultPatrolRegionWorkerCount = 1 // DefaultSplitMergeInterval is the default value of config split merge interval. DefaultSplitMergeInterval = time.Hour defaultSwitchWitnessInterval = time.Hour @@ -307,8 +307,8 @@ type ScheduleConfig struct { // and any other scheduling configs will be ignored. HaltScheduling bool `toml:"halt-scheduling" json:"halt-scheduling,string,omitempty"` - // PatrolRegionConcurrency is the number of workers to patrol region. - PatrolRegionConcurrency uint64 `toml:"patrol-worker-count" json:"patrol-worker-count"` + // PatrolRegionWorkerCount is the number of workers to patrol region. + PatrolRegionWorkerCount int `toml:"patrol-region-worker-count" json:"patrol-region-worker-count"` } // Clone returns a cloned scheduling configuration. @@ -379,7 +379,7 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool) configutil.AdjustString(&c.StoreLimitVersion, defaultStoreLimitVersion) } if !meta.IsDefined("patrol-worker-count") { - configutil.AdjustUint64(&c.PatrolRegionConcurrency, defaultPatrolRegionConcurrency) + configutil.AdjustInt(&c.PatrolRegionWorkerCount, defaultPatrolRegionWorkerCount) } if !meta.IsDefined("enable-joint-consensus") { diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index c3bd8ee3172..ba30a435962 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -88,7 +88,7 @@ type CheckerConfigProvider interface { GetIsolationLevel() string GetSplitMergeInterval() time.Duration GetPatrolRegionInterval() time.Duration - GetPatrolRegionConcurrency() int + GetPatrolRegionWorkerCount() int GetMaxMergeRegionSize() uint64 GetMaxMergeRegionKeys() uint64 GetReplicaScheduleLimit() uint64 diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index bdf2af4ce0b..495b3414aec 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -79,18 +79,18 @@ type Coordinator struct { cancel context.CancelFunc schedulersInitialized bool - patrolRegionsDuration time.Duration - cluster sche.ClusterInformer - prepareChecker *prepareChecker - checkers *checker.Controller - regionScatterer *scatter.RegionScatterer - regionSplitter *splitter.RegionSplitter - schedulers *schedulers.Controller - opController *operator.Controller - hbStreams *hbstream.HeartbeatStreams - pluginInterface *PluginInterface - diagnosticManager *diagnostic.Manager + cluster sche.ClusterInformer + prepareChecker *prepareChecker + checkers *checker.Controller + regionScatterer *scatter.RegionScatterer + regionSplitter *splitter.RegionSplitter + schedulers *schedulers.Controller + opController *operator.Controller + hbStreams *hbstream.HeartbeatStreams + pluginInterface *PluginInterface + diagnosticManager *diagnostic.Manager + patrolRegionContext *PatrolRegionContext } // NewCoordinator creates a new Coordinator. @@ -113,6 +113,7 @@ func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbS hbStreams: hbStreams, pluginInterface: NewPluginInterface(), diagnosticManager: diagnostic.NewManager(schedulers, cluster.GetSchedulerConfig()), + patrolRegionContext: &PatrolRegionContext{}, } } @@ -121,15 +122,11 @@ func (c *Coordinator) GetPatrolRegionsDuration() time.Duration { if c == nil { return 0 } - c.RLock() - defer c.RUnlock() - return c.patrolRegionsDuration + return c.patrolRegionContext.getPatrolRegionsDuration() } func (c *Coordinator) setPatrolRegionsDuration(dur time.Duration) { - c.Lock() - defer c.Unlock() - c.patrolRegionsDuration = dur + c.patrolRegionContext.setPatrolRegionsDuration(dur) } // markSchedulersInitialized marks the scheduler initialization is finished. @@ -156,29 +153,97 @@ func (c *Coordinator) IsPendingRegion(region uint64) bool { return c.checkers.IsPendingRegion(region) } +// PatrolRegionContext is used to store the context of patrol regions. +type PatrolRegionContext struct { + // config + interval time.Duration + workerCount int + scanLimit int + // status + durationLock syncutil.RWMutex + duration time.Duration + // workers + workersCtx context.Context + workersCancel context.CancelFunc + regionChan chan *core.RegionInfo + wg sync.WaitGroup +} + +func (p *PatrolRegionContext) init(ctx context.Context, cluster sche.ClusterInformer) { + p.interval = cluster.GetCheckerConfig().GetPatrolRegionInterval() + p.workerCount = cluster.GetCheckerConfig().GetPatrolRegionWorkerCount() + p.scanLimit = calculateScanLimit(cluster) + + p.regionChan = make(chan *core.RegionInfo, patrolRegionChanLen) + p.workersCtx, p.workersCancel = context.WithCancel(ctx) +} + +func (p *PatrolRegionContext) stop() { + close(p.regionChan) + p.workersCancel() + p.wg.Wait() +} + +func (p *PatrolRegionContext) updateScanLimit(cluster sche.ClusterInformer) { + p.scanLimit = calculateScanLimit(cluster) +} + +func calculateScanLimit(cluster sche.ClusterInformer) int { + return max(patrolScanRegionMinLimit, cluster.GetTotalRegionCount()/patrolRegionPartition) +} + +func (p *PatrolRegionContext) getPatrolRegionsDuration() time.Duration { + p.durationLock.RLock() + defer p.durationLock.RUnlock() + return p.duration +} + +func (p *PatrolRegionContext) setPatrolRegionsDuration(dur time.Duration) { + p.durationLock.Lock() + defer p.durationLock.Unlock() + p.duration = dur +} + +func (p *PatrolRegionContext) startPatrolRegionWorkers(c *Coordinator) { + for i := 0; i < p.workerCount; i++ { + p.wg.Add(1) + go func() { + defer logutil.LogPanic() + defer p.wg.Done() + for { + select { + case region, ok := <-p.regionChan: + if ok { + c.tryAddOperators(region) + } else { + log.Debug("region channel is closed") + } + case <-p.workersCtx.Done(): + log.Debug("region worker is closed") + return + } + } + }() + } +} + // PatrolRegions is used to scan regions. // The checkers will check these regions to decide if they need to do some operations. // The function is exposed for test purpose. func (c *Coordinator) PatrolRegions() { defer logutil.LogPanic() defer c.wg.Done() - ticker := time.NewTicker(c.cluster.GetCheckerConfig().GetPatrolRegionInterval()) + + c.patrolRegionContext.init(c.ctx, c.cluster) + defer c.patrolRegionContext.stop() + + ticker := time.NewTicker(c.patrolRegionContext.interval) defer ticker.Stop() - regionChan := make(chan *core.RegionInfo, patrolRegionChanLen) - quit := make(chan bool) - var wg sync.WaitGroup - defer func() { - close(regionChan) - close(quit) - wg.Wait() - }() - workersCount := c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency() - c.startPatrolRegionWorkers(workersCount, regionChan, quit, &wg) + c.patrolRegionContext.startPatrolRegionWorkers(c) log.Info("coordinator starts patrol regions") start := time.Now() - patrolScanRegionLimit := c.getPatrolScanRegionLimit() var ( key []byte regions []*core.RegionInfo @@ -186,38 +251,28 @@ func (c *Coordinator) PatrolRegions() { for { select { case <-ticker.C: - // Note: we reset the ticker here to support updating configuration dynamically. - ticker.Reset(c.cluster.GetCheckerConfig().GetPatrolRegionInterval()) - newWorkersCount := c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency() - if newWorkersCount != workersCount { - log.Info("coordinator starts patrol regions with new workers count", - zap.Int("old-workers-count", workersCount), - zap.Int("new-workers-count", newWorkersCount)) - workersCount = newWorkersCount - close(quit) - wg.Wait() - quit = make(chan bool) - c.startPatrolRegionWorkers(workersCount, regionChan, quit, &wg) - } + c.updateTickerIfNeeded(ticker) + c.updatePatrolWorkersIfNeeded() + if c.cluster.IsSchedulingHalted() { - for len(regionChan) > 0 { - <-regionChan + for len(c.patrolRegionContext.regionChan) > 0 { + <-c.patrolRegionContext.regionChan } continue } // Check priority regions first. - c.waitDrainRegionChan(regionChan) + c.waitDrainRegionChan() c.checkPriorityRegions() // Check suspect regions first. - c.waitDrainRegionChan(regionChan) - c.checkSuspectRegions(regionChan) + c.waitDrainRegionChan() + c.checkSuspectRegions() // Check regions in the waiting list - c.waitDrainRegionChan(regionChan) - c.checkWaitingRegions(regionChan) + c.waitDrainRegionChan() + c.checkWaitingRegions() - c.waitDrainRegionChan(regionChan) - key, regions = c.checkRegions(key, patrolScanRegionLimit, regionChan) + c.waitDrainRegionChan() + key, regions = c.checkRegions(key) if len(regions) == 0 { continue } @@ -228,7 +283,7 @@ func (c *Coordinator) PatrolRegions() { patrolCheckRegionsGauge.Set(dur.Seconds()) c.setPatrolRegionsDuration(dur) start = time.Now() - patrolScanRegionLimit = c.getPatrolScanRegionLimit() + c.patrolRegionContext.updateScanLimit(c.cluster) } failpoint.Inject("break-patrol", func() { failpoint.Return() @@ -242,37 +297,33 @@ func (c *Coordinator) PatrolRegions() { } } -func (c *Coordinator) getPatrolScanRegionLimit() int { - return max(patrolScanRegionMinLimit, c.cluster.GetTotalRegionCount()/patrolRegionPartition) +func (c *Coordinator) updateTickerIfNeeded(ticker *time.Ticker) { + // Note: we reset the ticker here to support updating configuration dynamically. + newInterval := c.cluster.GetCheckerConfig().GetPatrolRegionInterval() + if c.patrolRegionContext.interval != newInterval { + c.patrolRegionContext.interval = newInterval + ticker.Reset(newInterval) + } } -func (c *Coordinator) startPatrolRegionWorkers(workers int, regionChan <-chan *core.RegionInfo, quit <-chan bool, wg *sync.WaitGroup) { - for i := 0; i < workers; i++ { - wg.Add(1) - go func() { - defer logutil.LogPanic() - defer wg.Done() - for { - patrolCheckRegionsChanLenGauge.Set(float64(len(regionChan))) - select { - case region, ok := <-regionChan: - if ok { - c.tryAddOperators(region) - } - case <-quit: - return - } - } - }() +func (c *Coordinator) updatePatrolWorkersIfNeeded() { + newWorkersCount := c.cluster.GetCheckerConfig().GetPatrolRegionWorkerCount() + if c.patrolRegionContext.workerCount != newWorkersCount { + log.Info("coordinator starts patrol regions with new workers count", + zap.Int("old-workers-count", c.patrolRegionContext.workerCount), + zap.Int("new-workers-count", newWorkersCount)) + c.patrolRegionContext.workerCount = newWorkersCount + // Stop the old workers and start the new workers. + c.patrolRegionContext.workersCancel() + c.wg.Wait() + c.patrolRegionContext.workersCtx, c.patrolRegionContext.workersCancel = context.WithCancel(c.ctx) + c.patrolRegionContext.startPatrolRegionWorkers(c) } } // waitDrainRegionChan is used to drain the regionChan. // It is used to avoid duplicated regions in the regionChan from different sources. -func (c *Coordinator) waitDrainRegionChan(regionChan chan *core.RegionInfo) { - if len(regionChan) == 0 { - return - } +func (c *Coordinator) waitDrainRegionChan() { ticker := time.NewTicker(c.cluster.GetCheckerConfig().GetPatrolRegionInterval()) defer ticker.Stop() for { @@ -280,15 +331,15 @@ func (c *Coordinator) waitDrainRegionChan(regionChan chan *core.RegionInfo) { case <-c.ctx.Done(): return case <-ticker.C: - if len(regionChan) == 0 { + if len(c.patrolRegionContext.regionChan) == 0 { return } } } } -func (c *Coordinator) checkRegions(startKey []byte, patrolScanRegionLimit int, regionChan chan *core.RegionInfo) (key []byte, regions []*core.RegionInfo) { - regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit) +func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) { + regions = c.cluster.ScanRegions(startKey, nil, c.patrolRegionContext.scanLimit) if len(regions) == 0 { // Resets the scan key. key = nil @@ -296,25 +347,25 @@ func (c *Coordinator) checkRegions(startKey []byte, patrolScanRegionLimit int, r } for _, region := range regions { - regionChan <- region + c.patrolRegionContext.regionChan <- region key = region.GetEndKey() } return } -func (c *Coordinator) checkSuspectRegions(regionChan chan *core.RegionInfo) { +func (c *Coordinator) checkSuspectRegions() { for _, id := range c.checkers.GetSuspectRegions() { region := c.cluster.GetRegion(id) - regionChan <- region + c.patrolRegionContext.regionChan <- region } } -func (c *Coordinator) checkWaitingRegions(regionChan chan *core.RegionInfo) { +func (c *Coordinator) checkWaitingRegions() { items := c.checkers.GetWaitingRegions() waitingListGauge.Set(float64(len(items))) for _, item := range items { region := c.cluster.GetRegion(item.Key) - regionChan <- region + c.patrolRegionContext.regionChan <- region } } diff --git a/pkg/schedule/metrics.go b/pkg/schedule/metrics.go index 0a09f157a6c..6927fb1f178 100644 --- a/pkg/schedule/metrics.go +++ b/pkg/schedule/metrics.go @@ -40,19 +40,10 @@ var ( Name: "patrol_regions_time", Help: "Time spent of patrol checks region.", }) - - patrolCheckRegionsChanLenGauge = prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: "pd", - Subsystem: "checker", - Name: "patrol_regions_chan_len", - Help: "Time channel length of patrol checks region.", - }) ) func init() { prometheus.MustRegister(hotSpotStatusGauge) prometheus.MustRegister(regionListGauge) prometheus.MustRegister(patrolCheckRegionsGauge) - prometheus.MustRegister(patrolCheckRegionsChanLenGauge) } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 641788e4036..6410594d69c 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -658,9 +658,9 @@ func (o *PersistOptions) GetHotRegionCacheHitsThreshold() int { return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold) } -// GetPatrolRegionConcurrency returns the worker count of the patrol. -func (o *PersistOptions) GetPatrolRegionConcurrency() int { - return int(o.GetScheduleConfig().PatrolRegionConcurrency) +// GetPatrolRegionWorkerCount returns the worker count of the patrol. +func (o *PersistOptions) GetPatrolRegionWorkerCount() int { + return o.GetScheduleConfig().PatrolRegionWorkerCount } // GetStoresLimit gets the stores' limit. From b0eab80b7df35617e10636757596074ce6c011f5 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 3 Jun 2024 17:10:36 +0800 Subject: [PATCH 13/33] address comments Signed-off-by: lhy1024 --- pkg/schedule/config/config.go | 7 ++- pkg/schedule/coordinator.go | 53 ++++++++------------ pkg/schedule/operator/operator_controller.go | 2 +- 3 files changed, 27 insertions(+), 35 deletions(-) diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index 737d6b8fdd7..5e2e70ac561 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -64,6 +64,8 @@ const ( defaultLeaderSchedulePolicy = "count" defaultStoreLimitVersion = "v1" defaultPatrolRegionWorkerCount = 1 + maxPatrolRegionWorkerCount = 8 + // DefaultSplitMergeInterval is the default value of config split merge interval. DefaultSplitMergeInterval = time.Hour defaultSwitchWitnessInterval = time.Hour @@ -378,7 +380,7 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool) if !meta.IsDefined("store-limit-version") { configutil.AdjustString(&c.StoreLimitVersion, defaultStoreLimitVersion) } - if !meta.IsDefined("patrol-worker-count") { + if !meta.IsDefined("patrol-region-worker-count") { configutil.AdjustInt(&c.PatrolRegionWorkerCount, defaultPatrolRegionWorkerCount) } @@ -525,6 +527,9 @@ func (c *ScheduleConfig) Validate() error { if c.SlowStoreEvictingAffectedStoreRatioThreshold == 0 { return errors.Errorf("slow-store-evicting-affected-store-ratio-threshold is not set") } + if c.PatrolRegionWorkerCount > maxPatrolRegionWorkerCount { + return errors.Errorf("patrol-region-worker-count should be less than or equal to %d", maxPatrolRegionWorkerCount) + } return nil } diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 495b3414aec..0aac7cf7f1d 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -125,10 +125,6 @@ func (c *Coordinator) GetPatrolRegionsDuration() time.Duration { return c.patrolRegionContext.getPatrolRegionsDuration() } -func (c *Coordinator) setPatrolRegionsDuration(dur time.Duration) { - c.patrolRegionContext.setPatrolRegionsDuration(dur) -} - // markSchedulersInitialized marks the scheduler initialization is finished. func (c *Coordinator) markSchedulersInitialized() { c.Lock() @@ -160,8 +156,9 @@ type PatrolRegionContext struct { workerCount int scanLimit int // status - durationLock syncutil.RWMutex - duration time.Duration + patrolRoundStartTime time.Time + durationLock syncutil.RWMutex + duration time.Duration // workers workersCtx context.Context workersCancel context.CancelFunc @@ -172,7 +169,9 @@ type PatrolRegionContext struct { func (p *PatrolRegionContext) init(ctx context.Context, cluster sche.ClusterInformer) { p.interval = cluster.GetCheckerConfig().GetPatrolRegionInterval() p.workerCount = cluster.GetCheckerConfig().GetPatrolRegionWorkerCount() + p.scanLimit = calculateScanLimit(cluster) + p.patrolRoundStartTime = time.Now() p.regionChan = make(chan *core.RegionInfo, patrolRegionChanLen) p.workersCtx, p.workersCancel = context.WithCancel(ctx) @@ -227,6 +226,13 @@ func (p *PatrolRegionContext) startPatrolRegionWorkers(c *Coordinator) { } } +func (p *PatrolRegionContext) roundUpdate() { + dur := time.Since(p.patrolRoundStartTime) + patrolCheckRegionsGauge.Set(dur.Seconds()) + p.setPatrolRegionsDuration(dur) + p.patrolRoundStartTime = time.Now() +} + // PatrolRegions is used to scan regions. // The checkers will check these regions to decide if they need to do some operations. // The function is exposed for test purpose. @@ -243,7 +249,6 @@ func (c *Coordinator) PatrolRegions() { c.patrolRegionContext.startPatrolRegionWorkers(c) log.Info("coordinator starts patrol regions") - start := time.Now() var ( key []byte regions []*core.RegionInfo @@ -261,28 +266,27 @@ func (c *Coordinator) PatrolRegions() { continue } + // wait for the regionChan to be drained + if len(c.patrolRegionContext.regionChan) > 0 { + continue + } + // Check priority regions first. - c.waitDrainRegionChan() c.checkPriorityRegions() // Check suspect regions first. - c.waitDrainRegionChan() c.checkSuspectRegions() // Check regions in the waiting list - c.waitDrainRegionChan() c.checkWaitingRegions() - c.waitDrainRegionChan() key, regions = c.checkRegions(key) if len(regions) == 0 { continue } // Updates the label level isolation statistics. c.cluster.UpdateRegionsLabelLevelStats(regions) + // Update metrics if a full scan is done. if len(key) == 0 { - dur := time.Since(start) - patrolCheckRegionsGauge.Set(dur.Seconds()) - c.setPatrolRegionsDuration(dur) - start = time.Now() + c.patrolRegionContext.roundUpdate() c.patrolRegionContext.updateScanLimit(c.cluster) } failpoint.Inject("break-patrol", func() { @@ -290,7 +294,7 @@ func (c *Coordinator) PatrolRegions() { }) case <-c.ctx.Done(): patrolCheckRegionsGauge.Set(0) - c.setPatrolRegionsDuration(0) + c.patrolRegionContext.setPatrolRegionsDuration(0) log.Info("patrol regions has been stopped") return } @@ -321,23 +325,6 @@ func (c *Coordinator) updatePatrolWorkersIfNeeded() { } } -// waitDrainRegionChan is used to drain the regionChan. -// It is used to avoid duplicated regions in the regionChan from different sources. -func (c *Coordinator) waitDrainRegionChan() { - ticker := time.NewTicker(c.cluster.GetCheckerConfig().GetPatrolRegionInterval()) - defer ticker.Stop() - for { - select { - case <-c.ctx.Done(): - return - case <-ticker.C: - if len(c.patrolRegionContext.regionChan) == 0 { - return - } - } - } -} - func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) { regions = c.cluster.ScanRegions(startKey, nil, c.patrolRegionContext.scanLimit) if len(regions) == 0 { diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index fe93bd98756..1d75b1fa1e9 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -963,7 +963,7 @@ func (oc *Controller) ExceedStoreLimit(ops ...*Operator) bool { if stepCost == 0 { continue } - limiter := oc.getOrCreateStoreLimit(storeID, v) + limiter := oc.getOrCreateStoreLimit(storeID, v) // is thread safe if limiter == nil { return false } From 5c442a36c14c67f0c01a2770d0f5c15825a35183 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 4 Jun 2024 19:49:56 +0800 Subject: [PATCH 14/33] add config test Signed-off-by: lhy1024 --- pkg/schedule/config/config.go | 4 ++-- tools/pd-ctl/tests/config/config_test.go | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index 5e2e70ac561..84279a544d5 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -527,8 +527,8 @@ func (c *ScheduleConfig) Validate() error { if c.SlowStoreEvictingAffectedStoreRatioThreshold == 0 { return errors.Errorf("slow-store-evicting-affected-store-ratio-threshold is not set") } - if c.PatrolRegionWorkerCount > maxPatrolRegionWorkerCount { - return errors.Errorf("patrol-region-worker-count should be less than or equal to %d", maxPatrolRegionWorkerCount) + if c.PatrolRegionWorkerCount > maxPatrolRegionWorkerCount || c.PatrolRegionWorkerCount < 1 { + return errors.Errorf("patrol-region-worker-count should be between 1 and %d", maxPatrolRegionWorkerCount) } return nil } diff --git a/tools/pd-ctl/tests/config/config_test.go b/tools/pd-ctl/tests/config/config_test.go index c6430789cfc..d992f470857 100644 --- a/tools/pd-ctl/tests/config/config_test.go +++ b/tools/pd-ctl/tests/config/config_test.go @@ -343,6 +343,21 @@ func (suite *configTestSuite) checkConfig(cluster *pdTests.TestCluster) { output, err = tests.ExecuteCommand(cmd, argsInvalid...) re.NoError(err) re.Contains(string(output), "is invalid") + + // config set patrol-region-worker-count + args = []string{"-u", pdAddr, "config", "set", "patrol-region-worker-count", "8"} + _, err = tests.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Equal(8, svr.GetScheduleConfig().PatrolRegionWorkerCount) + // the max value of patrol-region-worker-count is 8 and the min value is 1 + args = []string{"-u", pdAddr, "config", "set", "patrol-region-worker-count", "9"} + _, err = tests.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Equal(8, svr.GetScheduleConfig().PatrolRegionWorkerCount) + args = []string{"-u", pdAddr, "config", "set", "patrol-region-worker-count", "0"} + _, err = tests.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Equal(8, svr.GetScheduleConfig().PatrolRegionWorkerCount) } func (suite *configTestSuite) TestConfigForwardControl() { From 0d02d8b63b81d29a49897730eea1d6a076f96f1e Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 4 Jun 2024 23:17:48 +0800 Subject: [PATCH 15/33] add more tests Signed-off-by: lhy1024 --- pkg/core/storelimit/store_limit.go | 16 +++++- pkg/schedule/coordinator.go | 5 +- pkg/schedule/operator/operator_controller.go | 2 +- server/cluster/cluster_test.go | 59 ++++++++++++++++++++ 4 files changed, 79 insertions(+), 3 deletions(-) mode change 100644 => 100755 pkg/schedule/coordinator.go diff --git a/pkg/core/storelimit/store_limit.go b/pkg/core/storelimit/store_limit.go index 8d70b2918a1..d2c79debb9e 100644 --- a/pkg/core/storelimit/store_limit.go +++ b/pkg/core/storelimit/store_limit.go @@ -17,6 +17,7 @@ package storelimit import ( "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/ratelimit" + "github.com/tikv/pd/pkg/utils/syncutil" ) const ( @@ -106,7 +107,7 @@ func (l *StoreRateLimit) Rate(typ Type) float64 { if l.limits[typ] == nil { return 0.0 } - return l.limits[typ].ratePerSec + return l.limits[typ].GetRatePerSec() } // Take takes count tokens from the bucket without blocking. @@ -128,12 +129,15 @@ func (l *StoreRateLimit) Reset(rate float64, typ Type) { // limit the operators of a store type limit struct { + syncutil.RWMutex limiter *ratelimit.RateLimiter ratePerSec float64 } // Reset resets the rate limit. func (l *limit) Reset(ratePerSec float64) { + l.Lock() + defer l.Unlock() if l.ratePerSec == ratePerSec { return } @@ -155,6 +159,8 @@ func (l *limit) Reset(ratePerSec float64) { // Available returns the number of available tokens // It returns true if the rate per second is zero. func (l *limit) Available(n int64) bool { + l.RLock() + defer l.RUnlock() if l.ratePerSec == 0 { return true } @@ -164,8 +170,16 @@ func (l *limit) Available(n int64) bool { // Take takes count tokens from the bucket without blocking. func (l *limit) Take(count int64) bool { + l.RLock() + defer l.RUnlock() if l.ratePerSec == 0 { return true } return l.limiter.AllowN(int(count)) } + +func (l *limit) GetRatePerSec() float64 { + l.RLock() + defer l.RUnlock() + return l.ratePerSec +} diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go old mode 100644 new mode 100755 index 0aac7cf7f1d..793a8b04aa9 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -240,7 +240,9 @@ func (c *Coordinator) PatrolRegions() { defer logutil.LogPanic() defer c.wg.Done() - c.patrolRegionContext.init(c.ctx, c.cluster) + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + c.patrolRegionContext.init(ctx, c.cluster) defer c.patrolRegionContext.stop() ticker := time.NewTicker(c.patrolRegionContext.interval) @@ -290,6 +292,7 @@ func (c *Coordinator) PatrolRegions() { c.patrolRegionContext.updateScanLimit(c.cluster) } failpoint.Inject("break-patrol", func() { + time.Sleep(3 * time.Second) // ensure the regions are handled by the workers failpoint.Return() }) case <-c.ctx.Done(): diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index 1d75b1fa1e9..fe93bd98756 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -963,7 +963,7 @@ func (oc *Controller) ExceedStoreLimit(ops ...*Operator) bool { if stepCost == 0 { continue } - limiter := oc.getOrCreateStoreLimit(storeID, v) // is thread safe + limiter := oc.getOrCreateStoreLimit(storeID, v) if limiter == nil { return false } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 6f5db763666..f071db318e7 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2879,6 +2879,65 @@ func TestCheckCache(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/break-patrol")) } +func TestPatrolRegionConcurrency(t *testing.T) { + re := require.New(t) + + regionNum := 10000 + mergeScheduleLimit := 15 + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + cfg.PatrolRegionWorkerCount = 8 + cfg.MergeScheduleLimit = uint64(mergeScheduleLimit) + }, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + + tc.opt.SetSplitMergeInterval(time.Duration(0)) + for i := 1; i < 4; i++ { + if err := tc.addRegionStore(uint64(i), regionNum); err != nil { + return + } + } + for i := 0; i < regionNum; i++ { + if err := tc.addLeaderRegion(uint64(i), 1, 2, 3); err != nil { + return + } + } + + // test patrol region concurrency + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/break-patrol", `return`)) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + testutil.Eventually(re, func() bool { + return len(oc.GetOperators()) >= mergeScheduleLimit + }) + checkOperatorDuplicate(re, oc.GetOperators()) + + // test patrol region concurrency with suspect regions + suspectRegions := make([]uint64, 0) + for i := 0; i < 10; i++ { + suspectRegions = append(suspectRegions, uint64(i)) + } + co.GetCheckerController().AddSuspectRegions(suspectRegions...) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + testutil.Eventually(re, func() bool { + return len(oc.GetOperators()) >= mergeScheduleLimit + }) + checkOperatorDuplicate(re, oc.GetOperators()) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/break-patrol")) +} + +func checkOperatorDuplicate(re *require.Assertions, ops []*operator.Operator) { + regionMap := make(map[uint64]struct{}) + for _, op := range ops { + if _, ok := regionMap[op.RegionID()]; ok { + re.Fail("duplicate operator") + } + regionMap[op.RegionID()] = struct{}{} + } +} + func TestPeerState(t *testing.T) { re := require.New(t) From a638cce2aa6680320b8154b14f25427b5511f75c Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 13 Jun 2024 21:02:10 +0800 Subject: [PATCH 16/33] address comments Signed-off-by: lhy1024 --- pkg/schedule/coordinator.go | 72 ++++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 20 deletions(-) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 249e8e8e569..36dbfac09ba 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -151,13 +151,13 @@ func (c *Coordinator) IsPendingRegion(region uint64) bool { // PatrolRegionContext is used to store the context of patrol regions. type PatrolRegionContext struct { + syncutil.RWMutex // config interval time.Duration workerCount int scanLimit int // status patrolRoundStartTime time.Time - durationLock syncutil.RWMutex duration time.Duration // workers workersCtx context.Context @@ -183,8 +183,40 @@ func (p *PatrolRegionContext) stop() { p.wg.Wait() } -func (p *PatrolRegionContext) updateScanLimit(cluster sche.ClusterInformer) { - p.scanLimit = calculateScanLimit(cluster) +func (p *PatrolRegionContext) getWorkerCount() int { + p.RLock() + defer p.RUnlock() + return p.workerCount +} + +func (p *PatrolRegionContext) setWorkerCount(count int) { + p.Lock() + defer p.Unlock() + p.workerCount = count +} + +func (p *PatrolRegionContext) getInterval() time.Duration { + p.RLock() + defer p.RUnlock() + return p.interval +} + +func (p *PatrolRegionContext) setInterval(interval time.Duration) { + p.Lock() + defer p.Unlock() + p.interval = interval +} + +func (p *PatrolRegionContext) getScanLimit() int { + p.RLock() + defer p.RUnlock() + return p.scanLimit +} + +func (p *PatrolRegionContext) setScanLimit(limit int) { + p.Lock() + defer p.Unlock() + p.scanLimit = limit } func calculateScanLimit(cluster sche.ClusterInformer) int { @@ -192,21 +224,21 @@ func calculateScanLimit(cluster sche.ClusterInformer) int { } func (p *PatrolRegionContext) getPatrolRegionsDuration() time.Duration { - p.durationLock.RLock() - defer p.durationLock.RUnlock() + p.RLock() + defer p.RUnlock() return p.duration } func (p *PatrolRegionContext) setPatrolRegionsDuration(dur time.Duration) { - p.durationLock.Lock() - defer p.durationLock.Unlock() + p.Lock() + defer p.Unlock() p.duration = dur } func (p *PatrolRegionContext) startPatrolRegionWorkers(c *Coordinator) { - for i := 0; i < p.workerCount; i++ { + for i := 0; i < p.getWorkerCount(); i++ { p.wg.Add(1) - go func() { + go func(i int) { defer logutil.LogPanic() defer p.wg.Done() for { @@ -215,14 +247,14 @@ func (p *PatrolRegionContext) startPatrolRegionWorkers(c *Coordinator) { if ok { c.tryAddOperators(region) } else { - log.Debug("region channel is closed") + log.Debug("region channel is closed", zap.Int("worker-id", i)) } case <-p.workersCtx.Done(): - log.Debug("region worker is closed") + log.Debug("region worker is closed", zap.Int("worker-id", i)) return } } - }() + }(i) } } @@ -245,7 +277,7 @@ func (c *Coordinator) PatrolRegions() { c.patrolRegionContext.init(ctx, c.cluster) defer c.patrolRegionContext.stop() - ticker := time.NewTicker(c.patrolRegionContext.interval) + ticker := time.NewTicker(c.patrolRegionContext.getInterval()) defer ticker.Stop() c.patrolRegionContext.startPatrolRegionWorkers(c) @@ -289,7 +321,7 @@ func (c *Coordinator) PatrolRegions() { // Update metrics if a full scan is done. if len(key) == 0 { c.patrolRegionContext.roundUpdate() - c.patrolRegionContext.updateScanLimit(c.cluster) + c.patrolRegionContext.setScanLimit(calculateScanLimit(c.cluster)) } failpoint.Inject("break-patrol", func() { time.Sleep(100 * time.Millisecond) // ensure the regions are handled by the workers @@ -307,19 +339,19 @@ func (c *Coordinator) PatrolRegions() { func (c *Coordinator) updateTickerIfNeeded(ticker *time.Ticker) { // Note: we reset the ticker here to support updating configuration dynamically. newInterval := c.cluster.GetCheckerConfig().GetPatrolRegionInterval() - if c.patrolRegionContext.interval != newInterval { - c.patrolRegionContext.interval = newInterval + if c.patrolRegionContext.getInterval() != newInterval { + c.patrolRegionContext.setInterval(newInterval) ticker.Reset(newInterval) } } func (c *Coordinator) updatePatrolWorkersIfNeeded() { newWorkersCount := c.cluster.GetCheckerConfig().GetPatrolRegionWorkerCount() - if c.patrolRegionContext.workerCount != newWorkersCount { + if c.patrolRegionContext.getWorkerCount() != newWorkersCount { log.Info("coordinator starts patrol regions with new workers count", - zap.Int("old-workers-count", c.patrolRegionContext.workerCount), + zap.Int("old-workers-count", c.patrolRegionContext.getWorkerCount()), zap.Int("new-workers-count", newWorkersCount)) - c.patrolRegionContext.workerCount = newWorkersCount + c.patrolRegionContext.setWorkerCount(newWorkersCount) // Stop the old workers and start the new workers. c.patrolRegionContext.workersCancel() c.wg.Wait() @@ -329,7 +361,7 @@ func (c *Coordinator) updatePatrolWorkersIfNeeded() { } func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) { - regions = c.cluster.ScanRegions(startKey, nil, c.patrolRegionContext.scanLimit) + regions = c.cluster.ScanRegions(startKey, nil, c.patrolRegionContext.getScanLimit()) if len(regions) == 0 { // Resets the scan key. key = nil From 614737361f3f27b562b34ea5dcc5ce4c38c3633a Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 18 Jun 2024 18:22:32 +0800 Subject: [PATCH 17/33] add some test to cover branches Signed-off-by: lhy1024 --- pkg/schedule/coordinator.go | 29 ++++++------ tests/server/api/api_test.go | 2 +- tests/server/cluster/cluster_test.go | 67 ++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 16 deletions(-) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 36dbfac09ba..55be64d5379 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -52,8 +52,8 @@ const ( // pushOperatorTickInterval is the interval try to push the operator. pushOperatorTickInterval = 500 * time.Millisecond - // For 1,024,000 regions, patrolScanRegionLimit is 1000, which is max(patrolScanRegionMinLimit, 1000000/patrolRegionPartition) - // It takes about 10s to iterate 1 million regions(with DefaultPatrolRegionInterval=10ms) where other steps are not considered. + // For 1,024,000 regions, patrolScanRegionLimit is 1000, which is max(patrolScanRegionMinLimit, 1,024,000/patrolRegionPartition) + // It takes about 10s to iterate 1,024,000 regions(with DefaultPatrolRegionInterval=10ms) where other steps are not considered. patrolScanRegionMinLimit = 128 patrolRegionChanLen = 1024 patrolRegionPartition = 1024 @@ -258,7 +258,7 @@ func (p *PatrolRegionContext) startPatrolRegionWorkers(c *Coordinator) { } } -func (p *PatrolRegionContext) roundUpdate() { +func (p *PatrolRegionContext) roundUpdateMetrics() { dur := time.Since(p.patrolRoundStartTime) patrolCheckRegionsGauge.Set(dur.Seconds()) p.setPatrolRegionsDuration(dur) @@ -272,16 +272,13 @@ func (c *Coordinator) PatrolRegions() { defer logutil.LogPanic() defer c.wg.Done() - ctx, cancel := context.WithCancel(c.ctx) - defer cancel() - c.patrolRegionContext.init(ctx, c.cluster) + c.patrolRegionContext.init(c.ctx, c.cluster) + c.patrolRegionContext.startPatrolRegionWorkers(c) defer c.patrolRegionContext.stop() ticker := time.NewTicker(c.patrolRegionContext.getInterval()) defer ticker.Stop() - c.patrolRegionContext.startPatrolRegionWorkers(c) - log.Info("coordinator starts patrol regions") var ( key []byte @@ -292,11 +289,11 @@ func (c *Coordinator) PatrolRegions() { case <-ticker.C: c.updateTickerIfNeeded(ticker) c.updatePatrolWorkersIfNeeded() - if c.cluster.IsSchedulingHalted() { for len(c.patrolRegionContext.regionChan) > 0 { <-c.patrolRegionContext.regionChan } + log.Debug("skip patrol regions due to scheduling is halted") continue } @@ -318,9 +315,9 @@ func (c *Coordinator) PatrolRegions() { } // Updates the label level isolation statistics. c.cluster.UpdateRegionsLabelLevelStats(regions) - // Update metrics if a full scan is done. + // Update metrics and scan limit if a full scan is done. if len(key) == 0 { - c.patrolRegionContext.roundUpdate() + c.patrolRegionContext.roundUpdateMetrics() c.patrolRegionContext.setScanLimit(calculateScanLimit(c.cluster)) } failpoint.Inject("break-patrol", func() { @@ -342,21 +339,23 @@ func (c *Coordinator) updateTickerIfNeeded(ticker *time.Ticker) { if c.patrolRegionContext.getInterval() != newInterval { c.patrolRegionContext.setInterval(newInterval) ticker.Reset(newInterval) + log.Debug("coordinator starts patrol regions with new interval", zap.Duration("interval", newInterval)) } } func (c *Coordinator) updatePatrolWorkersIfNeeded() { newWorkersCount := c.cluster.GetCheckerConfig().GetPatrolRegionWorkerCount() if c.patrolRegionContext.getWorkerCount() != newWorkersCount { - log.Info("coordinator starts patrol regions with new workers count", - zap.Int("old-workers-count", c.patrolRegionContext.getWorkerCount()), - zap.Int("new-workers-count", newWorkersCount)) + oldWorkersCount := c.patrolRegionContext.getWorkerCount() c.patrolRegionContext.setWorkerCount(newWorkersCount) // Stop the old workers and start the new workers. c.patrolRegionContext.workersCancel() - c.wg.Wait() + c.patrolRegionContext.wg.Wait() c.patrolRegionContext.workersCtx, c.patrolRegionContext.workersCancel = context.WithCancel(c.ctx) c.patrolRegionContext.startPatrolRegionWorkers(c) + log.Info("coordinator starts patrol regions with new workers count", + zap.Int("old-workers-count", oldWorkersCount), + zap.Int("new-workers-count", newWorkersCount)) } } diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index f59e85651f5..8f36dce224e 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -595,7 +595,7 @@ func (suite *redirectorTestSuite) SetupSuite() { }) re.NoError(err) re.NoError(cluster.RunInitialServers()) - re.NotEmpty(cluster.WaitLeader(), 0) + re.NotEmpty(cluster.WaitLeader()) suite.cluster = cluster } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 9e70a52d11d..1b3e85c070c 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -18,7 +18,9 @@ import ( "context" "fmt" "math" + "os" "strconv" + "strings" "sync" "testing" "time" @@ -1812,3 +1814,68 @@ func TestExternalTimestamp(t *testing.T) { re.Equal(ts, resp4.GetTimestamp()) } } + +func TestPatrolRegionConfigChange(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tc, err := tests.NewTestCluster(ctx, 1) + defer tc.Destroy() + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + leaderServer := tc.GetLeaderServer() + re.NoError(leaderServer.BootstrapCluster()) + for i := 1; i <= 3; i++ { + store := &metapb.Store{ + Id: uint64(i), + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + } + tests.MustPutStore(re, tc, store) + } + for i := 1; i <= 200; i++ { + startKey := []byte(fmt.Sprintf("%d", i*2-1)) + endKey := []byte(fmt.Sprintf("%d", i*2)) + tests.MustPutRegion(re, tc, uint64(i), uint64(i%3+1), startKey, endKey) + } + fname := testutil.InitTempFileLogger("debug") + defer os.RemoveAll(fname) + testutil.Eventually(re, func() bool { + b, _ := os.ReadFile(fname) + l := string(b) + return strings.Contains(l, "coordinator starts patrol regions") + }) + + // test change patrol region interval + schedule := leaderServer.GetConfig().Schedule + schedule.PatrolRegionInterval = typeutil.NewDuration(99 * time.Millisecond) + leaderServer.GetServer().SetScheduleConfig(schedule) + testutil.Eventually(re, func() bool { + b, _ := os.ReadFile(fname) + l := string(b) + return strings.Contains(l, "coordinator starts patrol regions with new interval") + }) + + // test change patrol region worker count + schedule = leaderServer.GetConfig().Schedule + schedule.PatrolRegionWorkerCount = 8 + leaderServer.GetServer().SetScheduleConfig(schedule) + testutil.Eventually(re, func() bool { + b, _ := os.ReadFile(fname) + l := string(b) + return strings.Contains(l, "coordinator starts patrol regions with new workers count") + }) + + // test change schedule halt + schedule = leaderServer.GetConfig().Schedule + schedule.HaltScheduling = true + leaderServer.GetServer().SetScheduleConfig(schedule) + testutil.Eventually(re, func() bool { + b, _ := os.ReadFile(fname) + l := string(b) + return strings.Contains(l, "skip patrol regions due to scheduling is halted") + }) +} From 82785c22d89eb67ba7ab23831c33bd44cd7da62a Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 1 Jul 2024 12:13:01 +0800 Subject: [PATCH 18/33] address comments Signed-off-by: lhy1024 --- tests/server/cluster/cluster_test.go | 25 +++++++++--------------- tools/pd-ctl/tests/config/config_test.go | 4 +++- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index a858bc9c280..6fff272bb50 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -1842,39 +1842,32 @@ func TestPatrolRegionConfigChange(t *testing.T) { } fname := testutil.InitTempFileLogger("debug") defer os.RemoveAll(fname) - testutil.Eventually(re, func() bool { - b, _ := os.ReadFile(fname) - l := string(b) - return strings.Contains(l, "coordinator starts patrol regions") - }) + checkLog(re, fname, "coordinator starts patrol regions") // test change patrol region interval schedule := leaderServer.GetConfig().Schedule schedule.PatrolRegionInterval = typeutil.NewDuration(99 * time.Millisecond) leaderServer.GetServer().SetScheduleConfig(schedule) - testutil.Eventually(re, func() bool { - b, _ := os.ReadFile(fname) - l := string(b) - return strings.Contains(l, "coordinator starts patrol regions with new interval") - }) + checkLog(re, fname, "coordinator starts patrol regions with new interval") // test change patrol region worker count schedule = leaderServer.GetConfig().Schedule schedule.PatrolRegionWorkerCount = 8 leaderServer.GetServer().SetScheduleConfig(schedule) - testutil.Eventually(re, func() bool { - b, _ := os.ReadFile(fname) - l := string(b) - return strings.Contains(l, "coordinator starts patrol regions with new workers count") - }) + checkLog(re, fname, "coordinator starts patrol regions with new workers count") // test change schedule halt schedule = leaderServer.GetConfig().Schedule schedule.HaltScheduling = true leaderServer.GetServer().SetScheduleConfig(schedule) + checkLog(re, fname, "skip patrol regions due to scheduling is halted") +} + +func checkLog(re *require.Assertions, fname, expect string) { testutil.Eventually(re, func() bool { b, _ := os.ReadFile(fname) l := string(b) - return strings.Contains(l, "skip patrol regions due to scheduling is halted") + return strings.Contains(l, expect) }) + os.Truncate(fname, 0) } diff --git a/tools/pd-ctl/tests/config/config_test.go b/tools/pd-ctl/tests/config/config_test.go index d992f470857..881f8632ee9 100644 --- a/tools/pd-ctl/tests/config/config_test.go +++ b/tools/pd-ctl/tests/config/config_test.go @@ -351,12 +351,14 @@ func (suite *configTestSuite) checkConfig(cluster *pdTests.TestCluster) { re.Equal(8, svr.GetScheduleConfig().PatrolRegionWorkerCount) // the max value of patrol-region-worker-count is 8 and the min value is 1 args = []string{"-u", pdAddr, "config", "set", "patrol-region-worker-count", "9"} - _, err = tests.ExecuteCommand(cmd, args...) + output, err = tests.ExecuteCommand(cmd, args...) re.NoError(err) + re.Contains(string(output), "patrol-region-worker-count should be between 1 and 8") re.Equal(8, svr.GetScheduleConfig().PatrolRegionWorkerCount) args = []string{"-u", pdAddr, "config", "set", "patrol-region-worker-count", "0"} _, err = tests.ExecuteCommand(cmd, args...) re.NoError(err) + re.Contains(string(output), "patrol-region-worker-count should be between 1 and 8") re.Equal(8, svr.GetScheduleConfig().PatrolRegionWorkerCount) } From a21ef8328e27bf37ae2334b4163ce85d767491da Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 1 Jul 2024 14:02:25 +0800 Subject: [PATCH 19/33] address comments Signed-off-by: lhy1024 --- pkg/schedule/coordinator.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 55be64d5379..b1f0482a756 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -244,11 +244,11 @@ func (p *PatrolRegionContext) startPatrolRegionWorkers(c *Coordinator) { for { select { case region, ok := <-p.regionChan: - if ok { - c.tryAddOperators(region) - } else { + if !ok { log.Debug("region channel is closed", zap.Int("worker-id", i)) + return } + c.tryAddOperators(region) case <-p.workersCtx.Done(): log.Debug("region worker is closed", zap.Int("worker-id", i)) return From cd1cd8b3177bc38ea202cfd7b17e89c25d002df8 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 4 Jul 2024 14:24:30 +0800 Subject: [PATCH 20/33] address comments Signed-off-by: lhy1024 --- pkg/schedule/checker/priority_inspector.go | 67 +++++++++++-------- .../checker/priority_inspector_test.go | 8 +-- 2 files changed, 42 insertions(+), 33 deletions(-) diff --git a/pkg/schedule/checker/priority_inspector.go b/pkg/schedule/checker/priority_inspector.go index 2993f92f36e..4413e2461ed 100644 --- a/pkg/schedule/checker/priority_inspector.go +++ b/pkg/schedule/checker/priority_inspector.go @@ -25,39 +25,42 @@ import ( "github.com/tikv/pd/pkg/utils/syncutil" ) -// the default value of priority queue size +// defaultPriorityQueueSize is the default value of priority queue size. const defaultPriorityQueueSize = 1280 -// PriorityInspector ensures high priority region should run first +// PriorityInspector ensures high priority region should run first. type PriorityInspector struct { - syncutil.Mutex cluster sche.CheckerCluster conf config.CheckerConfigProvider - queue *cache.PriorityQueue + mu struct { + syncutil.RWMutex + queue *cache.PriorityQueue + } } // NewPriorityInspector creates a priority inspector. func NewPriorityInspector(cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *PriorityInspector { - return &PriorityInspector{ + res := &PriorityInspector{ cluster: cluster, conf: conf, - queue: cache.NewPriorityQueue(defaultPriorityQueueSize), } + res.mu.queue = cache.NewPriorityQueue(defaultPriorityQueueSize) + return res } -// RegionPriorityEntry records region priority info +// RegionPriorityEntry records region priority info. type RegionPriorityEntry struct { Attempt int Last time.Time regionID uint64 } -// ID implement PriorityQueueItem interface +// ID implements PriorityQueueItem interface. func (r RegionPriorityEntry) ID() uint64 { return r.regionID } -// NewRegionEntry construct of region priority entry +// NewRegionEntry constructs a region priority entry. func NewRegionEntry(regionID uint64) *RegionPriorityEntry { return &RegionPriorityEntry{regionID: regionID, Last: time.Now(), Attempt: 1} } @@ -75,7 +78,7 @@ func (p *PriorityInspector) Inspect(region *core.RegionInfo) (fit *placement.Reg return } -// inspectRegionInPlacementRule inspects region in placement rule mode +// inspectRegionInPlacementRule inspects region in placement rule mode. func (p *PriorityInspector) inspectRegionInPlacementRule(region *core.RegionInfo) (makeupCount int, fit *placement.RegionFit) { fit = p.cluster.GetRuleManager().FitRegion(p.cluster, region) if len(fit.RuleFits) == 0 { @@ -92,38 +95,37 @@ func (p *PriorityInspector) inspectRegionInPlacementRule(region *core.RegionInfo return } -// inspectReplicas inspects region in replica mode +// inspectReplicas inspects region in replica mode. func (p *PriorityInspector) inspectRegionInReplica(region *core.RegionInfo) (makeupCount int) { return p.conf.GetMaxReplicas() - len(region.GetPeers()) } -// addOrRemoveRegion add or remove region from queue -// it will remove if region's priority equal 0 -// it's Attempt will increase if region's priority equal last +// addOrRemoveRegion add or remove region from queue. +// it will remove if region's priority equal 0. +// it's Attempt will increase if region's priority equal last. func (p *PriorityInspector) addOrRemoveRegion(priority int, regionID uint64) { - p.Lock() - defer p.Unlock() + p.mu.Lock() + defer p.mu.Unlock() if priority < 0 { - if entry := p.queue.Get(regionID); entry != nil && entry.Priority == priority { + if entry := p.mu.queue.Get(regionID); entry != nil && entry.Priority == priority { e := entry.Value.(*RegionPriorityEntry) e.Attempt++ e.Last = time.Now() - p.queue.Put(priority, e) + p.mu.queue.Put(priority, e) } else { entry := NewRegionEntry(regionID) - p.queue.Put(priority, entry) + p.mu.queue.Put(priority, entry) } } else { - p.queue.Remove(regionID) + p.mu.queue.Remove(regionID) } } -// GetPriorityRegions returns all regions in priority queue that needs rerun +// GetPriorityRegions returns all regions in priority queue that needs rerun. func (p *PriorityInspector) GetPriorityRegions() (ids []uint64) { - // we modify the queue entry in this function, so we need to lock it - p.Lock() - defer p.Unlock() - entries := p.queue.Elems() + p.mu.RLock() + defer p.mu.RUnlock() + entries := p.mu.queue.Elems() for _, e := range entries { re := e.Value.(*RegionPriorityEntry) // avoid to some priority region occupy checker, region don't need check on next check interval @@ -135,9 +137,16 @@ func (p *PriorityInspector) GetPriorityRegions() (ids []uint64) { return } -// RemovePriorityRegion removes priority region from priority queue +// RemovePriorityRegion removes priority region from priority queue. func (p *PriorityInspector) RemovePriorityRegion(regionID uint64) { - p.Lock() - defer p.Unlock() - p.queue.Remove(regionID) + p.mu.Lock() + defer p.mu.Unlock() + p.mu.queue.Remove(regionID) +} + +// GetQueueLen returns the length of priority queue. +func (p *PriorityInspector) GetQueueLen() int { + p.mu.RLock() + defer p.mu.RUnlock() + return p.mu.queue.Len() } diff --git a/pkg/schedule/checker/priority_inspector_test.go b/pkg/schedule/checker/priority_inspector_test.go index 5aef4c01158..ceb2a3f2511 100644 --- a/pkg/schedule/checker/priority_inspector_test.go +++ b/pkg/schedule/checker/priority_inspector_test.go @@ -49,19 +49,19 @@ func checkPriorityRegionTest(re *require.Assertions, pc *PriorityInspector, tc * region := tc.GetRegion(1) opt := tc.GetCheckerConfig() pc.Inspect(region) - re.Equal(0, pc.queue.Len()) + re.Equal(0, pc.GetQueueLen()) // case2: inspect region 2, it lacks one replica region = tc.GetRegion(2) pc.Inspect(region) - re.Equal(1, pc.queue.Len()) + re.Equal(1, pc.GetQueueLen()) // the region will not rerun after it checks re.Empty(pc.GetPriorityRegions()) // case3: inspect region 3, it will has high priority region = tc.GetRegion(3) pc.Inspect(region) - re.Equal(2, pc.queue.Len()) + re.Equal(2, pc.GetQueueLen()) time.Sleep(opt.GetPatrolRegionInterval() * 10) // region 3 has higher priority ids := pc.GetPriorityRegions() @@ -73,7 +73,7 @@ func checkPriorityRegionTest(re *require.Assertions, pc *PriorityInspector, tc * tc.AddLeaderRegion(2, 2, 3, 1) region = tc.GetRegion(2) pc.Inspect(region) - re.Equal(1, pc.queue.Len()) + re.Equal(1, pc.GetQueueLen()) // case5: inspect region 3 again region = tc.GetRegion(3) From 5668d98bd4bfc46d36836356b6b804f90a955051 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 4 Jul 2024 14:26:46 +0800 Subject: [PATCH 21/33] address comments Signed-off-by: lhy1024 --- pkg/schedule/coordinator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index b1f0482a756..21cc7929a73 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -339,7 +339,7 @@ func (c *Coordinator) updateTickerIfNeeded(ticker *time.Ticker) { if c.patrolRegionContext.getInterval() != newInterval { c.patrolRegionContext.setInterval(newInterval) ticker.Reset(newInterval) - log.Debug("coordinator starts patrol regions with new interval", zap.Duration("interval", newInterval)) + log.Info("coordinator starts patrol regions with new interval", zap.Duration("interval", newInterval)) } } From cf010762ae1e5575a2033d39f60a2f78f3321ca8 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 4 Jul 2024 17:27:32 +0800 Subject: [PATCH 22/33] refactor Signed-off-by: lhy1024 --- pkg/schedule/checker/checker_controller.go | 414 +++++++++++++++++++-- pkg/schedule/checker/metrics.go | 17 + pkg/schedule/coordinator.go | 368 +----------------- pkg/schedule/core/cluster_informer.go | 4 +- pkg/schedule/metrics.go | 18 - 5 files changed, 423 insertions(+), 398 deletions(-) diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index cdc826a1dda..6ca7804dada 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -15,10 +15,13 @@ package checker import ( + "bytes" "context" + "sync" "time" "github.com/pingcap/failpoint" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/cache" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" @@ -28,51 +31,224 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/keyutil" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" + "go.uber.org/zap" ) -// DefaultCacheSize is the default length of waiting list. -const DefaultCacheSize = 100000 +const ( + checkSuspectRangesInterval = 100 * time.Millisecond + // DefaultWaitingCacheSize is the default length of waiting list. + DefaultWaitingCacheSize = 100000 + // For 1,024,000 regions, patrolScanRegionLimit is 1000, which is max(patrolScanRegionMinLimit, 1,024,000/patrolRegionPartition) + // It takes about 10s to iterate 1,024,000 regions(with DefaultPatrolRegionInterval=10ms) where other steps are not considered. + patrolScanRegionMinLimit = 128 + patrolRegionChanLen = 1024 + patrolRegionPartition = 1024 +) -var denyCheckersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("checkers", "deny") +var ( + denyCheckersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("checkers", "deny") + // WithLabelValues is a heavy operation, define variable to avoid call it every time. + waitingListGauge = regionListGauge.WithLabelValues("waiting_list") + priorityListGauge = regionListGauge.WithLabelValues("priority_list") +) // Controller is used to manage all checkers. type Controller struct { - cluster sche.CheckerCluster - conf config.CheckerConfigProvider - opController *operator.Controller - learnerChecker *LearnerChecker - replicaChecker *ReplicaChecker - ruleChecker *RuleChecker - splitChecker *SplitChecker - mergeChecker *MergeChecker - jointStateChecker *JointStateChecker - priorityInspector *PriorityInspector - regionWaitingList cache.Cache - suspectRegions *cache.TTLUint64 // suspectRegions are regions that may need fix - suspectKeyRanges *cache.TTLString // suspect key-range regions that may need fix + ctx context.Context + cluster sche.CheckerCluster + conf config.CheckerConfigProvider + opController *operator.Controller + learnerChecker *LearnerChecker + replicaChecker *ReplicaChecker + ruleChecker *RuleChecker + splitChecker *SplitChecker + mergeChecker *MergeChecker + jointStateChecker *JointStateChecker + priorityInspector *PriorityInspector + regionWaitingList cache.Cache + suspectRegions *cache.TTLUint64 // suspectRegions are regions that may need fix + suspectKeyRanges *cache.TTLString // suspect key-range regions that may need fix + patrolRegionContext *PatrolRegionContext } // NewController create a new Controller. func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller { - regionWaitingList := cache.NewDefaultCache(DefaultCacheSize) + regionWaitingList := cache.NewDefaultCache(DefaultWaitingCacheSize) return &Controller{ - cluster: cluster, - conf: conf, - opController: opController, - learnerChecker: NewLearnerChecker(cluster), - replicaChecker: NewReplicaChecker(cluster, conf, regionWaitingList), - ruleChecker: NewRuleChecker(ctx, cluster, ruleManager, regionWaitingList), - splitChecker: NewSplitChecker(cluster, ruleManager, labeler), - mergeChecker: NewMergeChecker(ctx, cluster, conf), - jointStateChecker: NewJointStateChecker(cluster), - priorityInspector: NewPriorityInspector(cluster, conf), - regionWaitingList: regionWaitingList, - suspectRegions: cache.NewIDTTL(ctx, time.Minute, 3*time.Minute), - suspectKeyRanges: cache.NewStringTTL(ctx, time.Minute, 3*time.Minute), + ctx: ctx, + cluster: cluster, + conf: conf, + opController: opController, + learnerChecker: NewLearnerChecker(cluster), + replicaChecker: NewReplicaChecker(cluster, conf, regionWaitingList), + ruleChecker: NewRuleChecker(ctx, cluster, ruleManager, regionWaitingList), + splitChecker: NewSplitChecker(cluster, ruleManager, labeler), + mergeChecker: NewMergeChecker(ctx, cluster, conf), + jointStateChecker: NewJointStateChecker(cluster), + priorityInspector: NewPriorityInspector(cluster, conf), + regionWaitingList: regionWaitingList, + suspectRegions: cache.NewIDTTL(ctx, time.Minute, 3*time.Minute), + suspectKeyRanges: cache.NewStringTTL(ctx, time.Minute, 3*time.Minute), + patrolRegionContext: &PatrolRegionContext{}, + } +} + +// PatrolRegions is used to scan regions. +// The checkers will check these regions to decide if they need to do some operations. +func (c *Controller) PatrolRegions() { + c.patrolRegionContext.init(c.ctx, c.cluster) + c.patrolRegionContext.startPatrolRegionWorkers(c) + defer c.patrolRegionContext.stop() + ticker := time.NewTicker(c.patrolRegionContext.getInterval()) + defer ticker.Stop() + + var ( + key []byte + regions []*core.RegionInfo + ) + for { + select { + case <-ticker.C: + c.updateTickerIfNeeded(ticker) + c.updatePatrolWorkersIfNeeded() + if c.cluster.IsSchedulingHalted() { + for len(c.patrolRegionContext.regionChan) > 0 { + <-c.patrolRegionContext.regionChan + } + log.Debug("skip patrol regions due to scheduling is halted") + continue + } + + // wait for the regionChan to be drained + if len(c.patrolRegionContext.regionChan) > 0 { + continue + } + + // Check priority regions first. + c.checkPriorityRegions() + // Check suspect regions first. + c.checkSuspectRegions() + // Check regions in the waiting list + c.checkWaitingRegions() + + key, regions = c.checkRegions(key) + if len(regions) == 0 { + continue + } + // Updates the label level isolation statistics. + c.cluster.UpdateRegionsLabelLevelStats(regions) + // Update metrics and scan limit if a full scan is done. + if len(key) == 0 { + c.patrolRegionContext.roundUpdateMetrics() + c.patrolRegionContext.setScanLimit(calculateScanLimit(c.cluster)) + } + failpoint.Inject("break-patrol", func() { + time.Sleep(100 * time.Millisecond) // ensure the regions are handled by the workers + failpoint.Return() + }) + case <-c.ctx.Done(): + patrolCheckRegionsGauge.Set(0) + c.patrolRegionContext.setPatrolRegionsDuration(0) + log.Info("patrol regions has been stopped") + return + } + } +} + +func (c *Controller) updateTickerIfNeeded(ticker *time.Ticker) { + // Note: we reset the ticker here to support updating configuration dynamically. + newInterval := c.cluster.GetCheckerConfig().GetPatrolRegionInterval() + if c.patrolRegionContext.getInterval() != newInterval { + c.patrolRegionContext.setInterval(newInterval) + ticker.Reset(newInterval) + log.Info("checkers starts patrol regions with new interval", zap.Duration("interval", newInterval)) + } +} + +func (c *Controller) updatePatrolWorkersIfNeeded() { + newWorkersCount := c.cluster.GetCheckerConfig().GetPatrolRegionWorkerCount() + if c.patrolRegionContext.getWorkerCount() != newWorkersCount { + oldWorkersCount := c.patrolRegionContext.getWorkerCount() + c.patrolRegionContext.setWorkerCount(newWorkersCount) + // Stop the old workers and start the new workers. + c.patrolRegionContext.workersCancel() + c.patrolRegionContext.wg.Wait() + c.patrolRegionContext.workersCtx, c.patrolRegionContext.workersCancel = context.WithCancel(c.ctx) + c.patrolRegionContext.startPatrolRegionWorkers(c) + log.Info("checkers starts patrol regions with new workers count", + zap.Int("old-workers-count", oldWorkersCount), + zap.Int("new-workers-count", newWorkersCount)) + } +} + +// GetPatrolRegionsDuration returns the duration of the last patrol region round. +func (c *Controller) GetPatrolRegionsDuration() time.Duration { + if c == nil { + return 0 + } + return c.patrolRegionContext.getPatrolRegionsDuration() +} + +func (c *Controller) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) { + regions = c.cluster.ScanRegions(startKey, nil, c.patrolRegionContext.getScanLimit()) + if len(regions) == 0 { + // Resets the scan key. + key = nil + return + } + + for _, region := range regions { + c.patrolRegionContext.regionChan <- region + key = region.GetEndKey() + } + return +} + +func (c *Controller) checkSuspectRegions() { + for _, id := range c.GetSuspectRegions() { + region := c.cluster.GetRegion(id) + c.patrolRegionContext.regionChan <- region + } +} + +func (c *Controller) checkWaitingRegions() { + items := c.GetWaitingRegions() + waitingListGauge.Set(float64(len(items))) + for _, item := range items { + region := c.cluster.GetRegion(item.Key) + c.patrolRegionContext.regionChan <- region + } +} + +// checkPriorityRegions checks priority regions +func (c *Controller) checkPriorityRegions() { + items := c.GetPriorityRegions() + removes := make([]uint64, 0) + priorityListGauge.Set(float64(len(items))) + for _, id := range items { + region := c.cluster.GetRegion(id) + if region == nil { + removes = append(removes, id) + continue + } + ops := c.CheckRegion(region) + // it should skip if region needs to merge + if len(ops) == 0 || ops[0].Kind()&operator.OpMerge != 0 { + continue + } + if !c.opController.ExceedStoreLimit(ops...) { + c.opController.AddWaitingOperator(ops...) + } + } + for _, v := range removes { + c.RemovePriorityRegions(v) } } // CheckRegion will check the region and add a new operator if needed. +// The function is exposed for test purpose. func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator { // If PD has restarted, it needs to check learners added before and promote them. // Don't check isRaftLearnerEnabled cause it maybe disable learner feature but there are still some learners to promote. @@ -142,6 +318,31 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator { return nil } +func (c *Controller) tryAddOperators(region *core.RegionInfo) { + if region == nil { + // the region could be recent split, continue to wait. + return + } + id := region.GetID() + if c.opController.GetOperator(id) != nil { + c.RemoveWaitingRegion(id) + c.RemoveSuspectRegion(id) + return + } + ops := c.CheckRegion(region) + if len(ops) == 0 { + return + } + + if !c.opController.ExceedStoreLimit(ops...) { + c.opController.AddWaitingOperator(ops...) + c.RemoveWaitingRegion(id) + c.RemoveSuspectRegion(id) + } else { + c.AddWaitingRegion(region) + } +} + // GetMergeChecker returns the merge checker. func (c *Controller) GetMergeChecker() *MergeChecker { return c.mergeChecker @@ -177,6 +378,43 @@ func (c *Controller) RemovePriorityRegions(id uint64) { c.priorityInspector.RemovePriorityRegion(id) } +// CheckSuspectRanges would pop one suspect key range group +// The regions of new version key range and old version key range would be placed into +// the suspect regions map +func (c *Controller) CheckSuspectRanges() { + ticker := time.NewTicker(checkSuspectRangesInterval) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + log.Info("check suspect key ranges has been stopped") + return + case <-ticker.C: + keyRange, success := c.PopOneSuspectKeyRange() + if !success { + continue + } + limit := 1024 + regions := c.cluster.ScanRegions(keyRange[0], keyRange[1], limit) + if len(regions) == 0 { + continue + } + regionIDList := make([]uint64, 0, len(regions)) + for _, region := range regions { + regionIDList = append(regionIDList, region.GetID()) + } + + // if the last region's end key is smaller the keyRange[1] which means there existed the remaining regions between + // keyRange[0] and keyRange[1] after scan regions, so we put the end key and keyRange[1] into Suspect KeyRanges + lastRegion := regions[len(regions)-1] + if lastRegion.GetEndKey() != nil && bytes.Compare(lastRegion.GetEndKey(), keyRange[1]) < 0 { + c.AddSuspectKeyRange(lastRegion.GetEndKey(), keyRange[1]) + } + c.AddSuspectRegions(regionIDList...) + } + } +} + // AddSuspectRegions adds regions to suspect list. func (c *Controller) AddSuspectRegions(regionIDs ...uint64) { for _, regionID := range regionIDs { @@ -251,3 +489,119 @@ func (c *Controller) GetPauseController(name string) (*PauseController, error) { return nil, errs.ErrCheckerNotFound.FastGenByArgs() } } + +// PatrolRegionContext is used to store the context of patrol regions. +type PatrolRegionContext struct { + syncutil.RWMutex + // config + interval time.Duration + workerCount int + scanLimit int + // status + patrolRoundStartTime time.Time + duration time.Duration + // workers + workersCtx context.Context + workersCancel context.CancelFunc + regionChan chan *core.RegionInfo + wg sync.WaitGroup +} + +func (p *PatrolRegionContext) init(ctx context.Context, cluster sche.CheckerCluster) { + p.interval = cluster.GetCheckerConfig().GetPatrolRegionInterval() + p.workerCount = cluster.GetCheckerConfig().GetPatrolRegionWorkerCount() + + p.scanLimit = calculateScanLimit(cluster) + p.patrolRoundStartTime = time.Now() + + p.regionChan = make(chan *core.RegionInfo, patrolRegionChanLen) + p.workersCtx, p.workersCancel = context.WithCancel(ctx) +} + +func (p *PatrolRegionContext) stop() { + close(p.regionChan) + p.workersCancel() + p.wg.Wait() +} + +func (p *PatrolRegionContext) getWorkerCount() int { + p.RLock() + defer p.RUnlock() + return p.workerCount +} + +func (p *PatrolRegionContext) setWorkerCount(count int) { + p.Lock() + defer p.Unlock() + p.workerCount = count +} + +func (p *PatrolRegionContext) getInterval() time.Duration { + p.RLock() + defer p.RUnlock() + return p.interval +} + +func (p *PatrolRegionContext) setInterval(interval time.Duration) { + p.Lock() + defer p.Unlock() + p.interval = interval +} + +func (p *PatrolRegionContext) getScanLimit() int { + p.RLock() + defer p.RUnlock() + return p.scanLimit +} + +func (p *PatrolRegionContext) setScanLimit(limit int) { + p.Lock() + defer p.Unlock() + p.scanLimit = limit +} + +func calculateScanLimit(cluster sche.CheckerCluster) int { + return max(patrolScanRegionMinLimit, cluster.GetTotalRegionCount()/patrolRegionPartition) +} + +func (p *PatrolRegionContext) getPatrolRegionsDuration() time.Duration { + p.RLock() + defer p.RUnlock() + return p.duration +} + +func (p *PatrolRegionContext) setPatrolRegionsDuration(dur time.Duration) { + p.Lock() + defer p.Unlock() + p.duration = dur +} + +func (p *PatrolRegionContext) startPatrolRegionWorkers(c *Controller) { + for i := 0; i < p.getWorkerCount(); i++ { + p.wg.Add(1) + go func(i int) { + defer logutil.LogPanic() + defer p.wg.Done() + for { + select { + case region, ok := <-p.regionChan: + if !ok { + log.Debug("region channel is closed", zap.Int("worker-id", i)) + return + } + c.tryAddOperators(region) + case <-p.workersCtx.Done(): + log.Debug("region worker is closed", zap.Int("worker-id", i)) + return + } + } + }(i) + } +} + +func (p *PatrolRegionContext) roundUpdateMetrics() { + dur := time.Since(p.patrolRoundStartTime) + patrolCheckRegionsGauge.Set(dur.Seconds()) + p.setPatrolRegionsDuration(dur) + p.patrolRoundStartTime = time.Now() +} diff --git a/pkg/schedule/checker/metrics.go b/pkg/schedule/checker/metrics.go index 9d0c83f94e6..1d2d85d9d23 100644 --- a/pkg/schedule/checker/metrics.go +++ b/pkg/schedule/checker/metrics.go @@ -24,8 +24,25 @@ var ( Name: "event_count", Help: "Counter of checker events.", }, []string{"type", "name"}) + regionListGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "checker", + Name: "region_list", + Help: "Number of region in waiting list", + }, []string{"type"}) + + patrolCheckRegionsGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "checker", + Name: "patrol_regions_time", + Help: "Time spent of patrol checks region.", + }) ) func init() { prometheus.MustRegister(checkerCounter) + prometheus.MustRegister(regionListGauge) + prometheus.MustRegister(patrolCheckRegionsGauge) } diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index e790b10093e..2c0b99ce432 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -15,7 +15,6 @@ package schedule import ( - "bytes" "context" "strconv" "sync" @@ -44,31 +43,18 @@ import ( ) const ( - runSchedulerCheckInterval = 3 * time.Second - checkSuspectRangesInterval = 100 * time.Millisecond - collectTimeout = 5 * time.Minute - maxLoadConfigRetries = 10 + runSchedulerCheckInterval = 3 * time.Second + collectTimeout = 5 * time.Minute + maxLoadConfigRetries = 10 // pushOperatorTickInterval is the interval try to push the operator. pushOperatorTickInterval = 500 * time.Millisecond - // For 1,024,000 regions, patrolScanRegionLimit is 1000, which is max(patrolScanRegionMinLimit, 1,024,000/patrolRegionPartition) - // It takes about 10s to iterate 1,024,000 regions(with DefaultPatrolRegionInterval=10ms) where other steps are not considered. - patrolScanRegionMinLimit = 128 - patrolRegionChanLen = 1024 - patrolRegionPartition = 1024 - // PluginLoad means action for load plugin PluginLoad = "PluginLoad" // PluginUnload means action for unload plugin PluginUnload = "PluginUnload" ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - waitingListGauge = regionListGauge.WithLabelValues("waiting_list") - priorityListGauge = regionListGauge.WithLabelValues("priority_list") -) - // Coordinator is used to manage all schedulers and checkers to decide if the region needs to be scheduled. type Coordinator struct { syncutil.RWMutex @@ -79,17 +65,16 @@ type Coordinator struct { schedulersInitialized bool - cluster sche.ClusterInformer - prepareChecker *prepareChecker - checkers *checker.Controller - regionScatterer *scatter.RegionScatterer - regionSplitter *splitter.RegionSplitter - schedulers *schedulers.Controller - opController *operator.Controller - hbStreams *hbstream.HeartbeatStreams - pluginInterface *PluginInterface - diagnosticManager *diagnostic.Manager - patrolRegionContext *PatrolRegionContext + cluster sche.ClusterInformer + prepareChecker *prepareChecker + checkers *checker.Controller + regionScatterer *scatter.RegionScatterer + regionSplitter *splitter.RegionSplitter + schedulers *schedulers.Controller + opController *operator.Controller + hbStreams *hbstream.HeartbeatStreams + pluginInterface *PluginInterface + diagnosticManager *diagnostic.Manager } // NewCoordinator creates a new Coordinator. @@ -112,18 +97,9 @@ func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbS hbStreams: hbStreams, pluginInterface: NewPluginInterface(), diagnosticManager: diagnostic.NewManager(schedulers, cluster.GetSchedulerConfig()), - patrolRegionContext: &PatrolRegionContext{}, } } -// GetPatrolRegionsDuration returns the duration of the last patrol region round. -func (c *Coordinator) GetPatrolRegionsDuration() time.Duration { - if c == nil { - return 0 - } - return c.patrolRegionContext.getPatrolRegionsDuration() -} - // markSchedulersInitialized marks the scheduler initialization is finished. func (c *Coordinator) markSchedulersInitialized() { c.Lock() @@ -148,270 +124,13 @@ func (c *Coordinator) IsPendingRegion(region uint64) bool { return c.checkers.IsPendingRegion(region) } -// PatrolRegionContext is used to store the context of patrol regions. -type PatrolRegionContext struct { - syncutil.RWMutex - // config - interval time.Duration - workerCount int - scanLimit int - // status - patrolRoundStartTime time.Time - duration time.Duration - // workers - workersCtx context.Context - workersCancel context.CancelFunc - regionChan chan *core.RegionInfo - wg sync.WaitGroup -} - -func (p *PatrolRegionContext) init(ctx context.Context, cluster sche.ClusterInformer) { - p.interval = cluster.GetCheckerConfig().GetPatrolRegionInterval() - p.workerCount = cluster.GetCheckerConfig().GetPatrolRegionWorkerCount() - - p.scanLimit = calculateScanLimit(cluster) - p.patrolRoundStartTime = time.Now() - - p.regionChan = make(chan *core.RegionInfo, patrolRegionChanLen) - p.workersCtx, p.workersCancel = context.WithCancel(ctx) -} - -func (p *PatrolRegionContext) stop() { - close(p.regionChan) - p.workersCancel() - p.wg.Wait() -} - -func (p *PatrolRegionContext) getWorkerCount() int { - p.RLock() - defer p.RUnlock() - return p.workerCount -} - -func (p *PatrolRegionContext) setWorkerCount(count int) { - p.Lock() - defer p.Unlock() - p.workerCount = count -} - -func (p *PatrolRegionContext) getInterval() time.Duration { - p.RLock() - defer p.RUnlock() - return p.interval -} - -func (p *PatrolRegionContext) setInterval(interval time.Duration) { - p.Lock() - defer p.Unlock() - p.interval = interval -} - -func (p *PatrolRegionContext) getScanLimit() int { - p.RLock() - defer p.RUnlock() - return p.scanLimit -} - -func (p *PatrolRegionContext) setScanLimit(limit int) { - p.Lock() - defer p.Unlock() - p.scanLimit = limit -} - -func calculateScanLimit(cluster sche.ClusterInformer) int { - return max(patrolScanRegionMinLimit, cluster.GetTotalRegionCount()/patrolRegionPartition) -} - -func (p *PatrolRegionContext) getPatrolRegionsDuration() time.Duration { - p.RLock() - defer p.RUnlock() - return p.duration -} - -func (p *PatrolRegionContext) setPatrolRegionsDuration(dur time.Duration) { - p.Lock() - defer p.Unlock() - p.duration = dur -} - -func (p *PatrolRegionContext) startPatrolRegionWorkers(c *Coordinator) { - for i := 0; i < p.getWorkerCount(); i++ { - p.wg.Add(1) - go func(i int) { - defer logutil.LogPanic() - defer p.wg.Done() - for { - select { - case region, ok := <-p.regionChan: - if !ok { - log.Debug("region channel is closed", zap.Int("worker-id", i)) - return - } - c.tryAddOperators(region) - case <-p.workersCtx.Done(): - log.Debug("region worker is closed", zap.Int("worker-id", i)) - return - } - } - }(i) - } -} - -func (p *PatrolRegionContext) roundUpdateMetrics() { - dur := time.Since(p.patrolRoundStartTime) - patrolCheckRegionsGauge.Set(dur.Seconds()) - p.setPatrolRegionsDuration(dur) - p.patrolRoundStartTime = time.Now() -} - // PatrolRegions is used to scan regions. -// The checkers will check these regions to decide if they need to do some operations. // The function is exposed for test purpose. func (c *Coordinator) PatrolRegions() { defer logutil.LogPanic() defer c.wg.Done() - - c.patrolRegionContext.init(c.ctx, c.cluster) - c.patrolRegionContext.startPatrolRegionWorkers(c) - defer c.patrolRegionContext.stop() - - ticker := time.NewTicker(c.patrolRegionContext.getInterval()) - defer ticker.Stop() - log.Info("coordinator starts patrol regions") - var ( - key []byte - regions []*core.RegionInfo - ) - for { - select { - case <-ticker.C: - c.updateTickerIfNeeded(ticker) - c.updatePatrolWorkersIfNeeded() - if c.cluster.IsSchedulingHalted() { - for len(c.patrolRegionContext.regionChan) > 0 { - <-c.patrolRegionContext.regionChan - } - log.Debug("skip patrol regions due to scheduling is halted") - continue - } - - // wait for the regionChan to be drained - if len(c.patrolRegionContext.regionChan) > 0 { - continue - } - - // Check priority regions first. - c.checkPriorityRegions() - // Check suspect regions first. - c.checkSuspectRegions() - // Check regions in the waiting list - c.checkWaitingRegions() - - key, regions = c.checkRegions(key) - if len(regions) == 0 { - continue - } - // Updates the label level isolation statistics. - c.cluster.UpdateRegionsLabelLevelStats(regions) - // Update metrics and scan limit if a full scan is done. - if len(key) == 0 { - c.patrolRegionContext.roundUpdateMetrics() - c.patrolRegionContext.setScanLimit(calculateScanLimit(c.cluster)) - } - failpoint.Inject("break-patrol", func() { - time.Sleep(100 * time.Millisecond) // ensure the regions are handled by the workers - failpoint.Return() - }) - case <-c.ctx.Done(): - patrolCheckRegionsGauge.Set(0) - c.patrolRegionContext.setPatrolRegionsDuration(0) - log.Info("patrol regions has been stopped") - return - } - } -} - -func (c *Coordinator) updateTickerIfNeeded(ticker *time.Ticker) { - // Note: we reset the ticker here to support updating configuration dynamically. - newInterval := c.cluster.GetCheckerConfig().GetPatrolRegionInterval() - if c.patrolRegionContext.getInterval() != newInterval { - c.patrolRegionContext.setInterval(newInterval) - ticker.Reset(newInterval) - log.Info("coordinator starts patrol regions with new interval", zap.Duration("interval", newInterval)) - } -} - -func (c *Coordinator) updatePatrolWorkersIfNeeded() { - newWorkersCount := c.cluster.GetCheckerConfig().GetPatrolRegionWorkerCount() - if c.patrolRegionContext.getWorkerCount() != newWorkersCount { - oldWorkersCount := c.patrolRegionContext.getWorkerCount() - c.patrolRegionContext.setWorkerCount(newWorkersCount) - // Stop the old workers and start the new workers. - c.patrolRegionContext.workersCancel() - c.patrolRegionContext.wg.Wait() - c.patrolRegionContext.workersCtx, c.patrolRegionContext.workersCancel = context.WithCancel(c.ctx) - c.patrolRegionContext.startPatrolRegionWorkers(c) - log.Info("coordinator starts patrol regions with new workers count", - zap.Int("old-workers-count", oldWorkersCount), - zap.Int("new-workers-count", newWorkersCount)) - } -} - -func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) { - regions = c.cluster.ScanRegions(startKey, nil, c.patrolRegionContext.getScanLimit()) - if len(regions) == 0 { - // Resets the scan key. - key = nil - return - } - - for _, region := range regions { - c.patrolRegionContext.regionChan <- region - key = region.GetEndKey() - } - return -} - -func (c *Coordinator) checkSuspectRegions() { - for _, id := range c.checkers.GetSuspectRegions() { - region := c.cluster.GetRegion(id) - c.patrolRegionContext.regionChan <- region - } -} - -func (c *Coordinator) checkWaitingRegions() { - items := c.checkers.GetWaitingRegions() - waitingListGauge.Set(float64(len(items))) - for _, item := range items { - region := c.cluster.GetRegion(item.Key) - c.patrolRegionContext.regionChan <- region - } -} - -// checkPriorityRegions checks priority regions -func (c *Coordinator) checkPriorityRegions() { - items := c.checkers.GetPriorityRegions() - removes := make([]uint64, 0) - priorityListGauge.Set(float64(len(items))) - for _, id := range items { - region := c.cluster.GetRegion(id) - if region == nil { - removes = append(removes, id) - continue - } - ops := c.checkers.CheckRegion(region) - // it should skip if region needs to merge - if len(ops) == 0 || ops[0].Kind()&operator.OpMerge != 0 { - continue - } - if !c.opController.ExceedStoreLimit(ops...) { - c.opController.AddWaitingOperator(ops...) - } - } - for _, v := range removes { - c.checkers.RemovePriorityRegions(v) - } + c.checkers.PatrolRegions() } // checkSuspectRanges would pop one suspect key range group @@ -421,62 +140,15 @@ func (c *Coordinator) checkSuspectRanges() { defer logutil.LogPanic() defer c.wg.Done() log.Info("coordinator begins to check suspect key ranges") - ticker := time.NewTicker(checkSuspectRangesInterval) - defer ticker.Stop() - for { - select { - case <-c.ctx.Done(): - log.Info("check suspect key ranges has been stopped") - return - case <-ticker.C: - keyRange, success := c.checkers.PopOneSuspectKeyRange() - if !success { - continue - } - limit := 1024 - regions := c.cluster.ScanRegions(keyRange[0], keyRange[1], limit) - if len(regions) == 0 { - continue - } - regionIDList := make([]uint64, 0, len(regions)) - for _, region := range regions { - regionIDList = append(regionIDList, region.GetID()) - } - - // if the last region's end key is smaller the keyRange[1] which means there existed the remaining regions between - // keyRange[0] and keyRange[1] after scan regions, so we put the end key and keyRange[1] into Suspect KeyRanges - lastRegion := regions[len(regions)-1] - if lastRegion.GetEndKey() != nil && bytes.Compare(lastRegion.GetEndKey(), keyRange[1]) < 0 { - c.checkers.AddSuspectKeyRange(lastRegion.GetEndKey(), keyRange[1]) - } - c.checkers.AddSuspectRegions(regionIDList...) - } - } + c.checkers.CheckSuspectRanges() } -func (c *Coordinator) tryAddOperators(region *core.RegionInfo) { - if region == nil { - // the region could be recent split, continue to wait. - return - } - id := region.GetID() - if c.opController.GetOperator(id) != nil { - c.checkers.RemoveWaitingRegion(id) - c.checkers.RemoveSuspectRegion(id) - return - } - ops := c.checkers.CheckRegion(region) - if len(ops) == 0 { - return - } - - if !c.opController.ExceedStoreLimit(ops...) { - c.opController.AddWaitingOperator(ops...) - c.checkers.RemoveWaitingRegion(id) - c.checkers.RemoveSuspectRegion(id) - } else { - c.checkers.AddWaitingRegion(region) +// GetPatrolRegionsDuration returns the duration of the last patrol region round. +func (c *Coordinator) GetPatrolRegionsDuration() time.Duration { + if c == nil { + return 0 } + return c.checkers.GetPatrolRegionsDuration() } // drivePushOperator is used to push the unfinished operator to the executor. diff --git a/pkg/schedule/core/cluster_informer.go b/pkg/schedule/core/cluster_informer.go index b97459d26ea..ce2cf01ed16 100644 --- a/pkg/schedule/core/cluster_informer.go +++ b/pkg/schedule/core/cluster_informer.go @@ -30,7 +30,6 @@ type ClusterInformer interface { CheckerCluster GetStorage() storage.Storage - UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) } // SchedulerCluster is an aggregate interface that wraps multiple interfaces @@ -43,7 +42,6 @@ type SchedulerCluster interface { GetSchedulerConfig() sc.SchedulerConfigProvider GetRegionLabeler() *labeler.RegionLabeler GetStoreConfig() sc.StoreConfigProvider - IsSchedulingHalted() bool } // CheckerCluster is an aggregate interface that wraps multiple interfaces @@ -52,6 +50,7 @@ type CheckerCluster interface { GetCheckerConfig() sc.CheckerConfigProvider GetStoreConfig() sc.StoreConfigProvider + UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) } // SharedCluster is an aggregate interface that wraps multiple interfaces @@ -63,6 +62,7 @@ type SharedCluster interface { GetSharedConfig() sc.SharedConfigProvider GetRuleManager() *placement.RuleManager AllocID() (uint64, error) + IsSchedulingHalted() bool } // BasicCluster is an aggregate interface that wraps multiple interfaces diff --git a/pkg/schedule/metrics.go b/pkg/schedule/metrics.go index 6927fb1f178..de654846412 100644 --- a/pkg/schedule/metrics.go +++ b/pkg/schedule/metrics.go @@ -24,26 +24,8 @@ var ( Name: "status", Help: "Status of the hotspot.", }, []string{"address", "store", "type"}) - - regionListGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "pd", - Subsystem: "checker", - Name: "region_list", - Help: "Number of region in waiting list", - }, []string{"type"}) - - patrolCheckRegionsGauge = prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: "pd", - Subsystem: "checker", - Name: "patrol_regions_time", - Help: "Time spent of patrol checks region.", - }) ) func init() { prometheus.MustRegister(hotSpotStatusGauge) - prometheus.MustRegister(regionListGauge) - prometheus.MustRegister(patrolCheckRegionsGauge) } From cc51a2e7ddfc89f93120b81b0879085a29f58d12 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 4 Jul 2024 19:18:56 +0800 Subject: [PATCH 23/33] fix test and add metrics Signed-off-by: lhy1024 --- pkg/schedule/checker/checker_controller.go | 5 ++++- pkg/schedule/checker/metrics.go | 2 +- tests/server/cluster/cluster_test.go | 4 ++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 6ca7804dada..315ec63a16b 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -52,6 +52,7 @@ var ( // WithLabelValues is a heavy operation, define variable to avoid call it every time. waitingListGauge = regionListGauge.WithLabelValues("waiting_list") priorityListGauge = regionListGauge.WithLabelValues("priority_list") + scanLimitGauge = regionListGauge.WithLabelValues("scan_limit") ) // Controller is used to manage all checkers. @@ -561,7 +562,9 @@ func (p *PatrolRegionContext) setScanLimit(limit int) { } func calculateScanLimit(cluster sche.CheckerCluster) int { - return max(patrolScanRegionMinLimit, cluster.GetTotalRegionCount()/patrolRegionPartition) + scanlimit := max(patrolScanRegionMinLimit, cluster.GetTotalRegionCount()/patrolRegionPartition) + scanLimitGauge.Set(float64(scanlimit)) + return scanlimit } func (p *PatrolRegionContext) getPatrolRegionsDuration() time.Duration { diff --git a/pkg/schedule/checker/metrics.go b/pkg/schedule/checker/metrics.go index 1d2d85d9d23..42bcf5ecba8 100644 --- a/pkg/schedule/checker/metrics.go +++ b/pkg/schedule/checker/metrics.go @@ -29,7 +29,7 @@ var ( Namespace: "pd", Subsystem: "checker", Name: "region_list", - Help: "Number of region in waiting list", + Help: "Number of region about different type.", }, []string{"type"}) patrolCheckRegionsGauge = prometheus.NewGauge( diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 6fff272bb50..9a37c46d225 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -1848,13 +1848,13 @@ func TestPatrolRegionConfigChange(t *testing.T) { schedule := leaderServer.GetConfig().Schedule schedule.PatrolRegionInterval = typeutil.NewDuration(99 * time.Millisecond) leaderServer.GetServer().SetScheduleConfig(schedule) - checkLog(re, fname, "coordinator starts patrol regions with new interval") + checkLog(re, fname, "starts patrol regions with new interval") // test change patrol region worker count schedule = leaderServer.GetConfig().Schedule schedule.PatrolRegionWorkerCount = 8 leaderServer.GetServer().SetScheduleConfig(schedule) - checkLog(re, fname, "coordinator starts patrol regions with new workers count") + checkLog(re, fname, "starts patrol regions with new workers count") // test change schedule halt schedule = leaderServer.GetConfig().Schedule From 9f7406a8e9bc77c703ed128027a6438d13e96a4d Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 4 Jul 2024 20:19:02 +0800 Subject: [PATCH 24/33] fix failpoint Signed-off-by: lhy1024 --- server/cluster/cluster_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 26b044de299..0ccf08c04ce 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2844,7 +2844,7 @@ func TestCheckCache(t *testing.T) { // Add a peer with two replicas. re.NoError(tc.addLeaderRegion(1, 2, 3)) - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/break-patrol", `return`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/break-patrol", `return`)) // case 1: operator cannot be created due to replica-schedule-limit restriction co.GetWaitGroup().Add(1) @@ -2878,7 +2878,7 @@ func TestCheckCache(t *testing.T) { co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/break-patrol")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/break-patrol")) } func TestPatrolRegionConcurrency(t *testing.T) { @@ -2907,7 +2907,7 @@ func TestPatrolRegionConcurrency(t *testing.T) { } // test patrol region concurrency - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/break-patrol", `return`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/break-patrol", `return`)) co.GetWaitGroup().Add(1) co.PatrolRegions() testutil.Eventually(re, func() bool { @@ -2927,7 +2927,9 @@ func TestPatrolRegionConcurrency(t *testing.T) { return len(oc.GetOperators()) >= mergeScheduleLimit }) checkOperatorDuplicate(re, oc.GetOperators()) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/break-patrol")) + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/break-patrol")) } func checkOperatorDuplicate(re *require.Assertions, ops []*operator.Operator) { From ae0778f4d80df70f415f8bf60d87b39601fc6471 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 16 Jul 2024 17:06:42 +0800 Subject: [PATCH 25/33] fix conflict Signed-off-by: lhy1024 --- pkg/schedule/checker/checker_controller.go | 7 +++--- pkg/schedule/checker/rule_checker.go | 28 +++++++++++----------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 3aebcc6cc99..c2d212bd064 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -38,8 +38,8 @@ import ( const ( checkSuspectRangesInterval = 100 * time.Millisecond - // DefaultWaitingCacheSize is the default length of waiting list. - DefaultWaitingCacheSize = 100000 + // DefaultPendingRegionCacheSize is the default length of waiting list. + DefaultPendingRegionCacheSize = 100000 // For 1,024,000 regions, patrolScanRegionLimit is 1000, which is max(patrolScanRegionMinLimit, 1,024,000/patrolRegionPartition) // It takes about 10s to iterate 1,024,000 regions(with DefaultPatrolRegionInterval=10ms) where other steps are not considered. patrolScanRegionMinLimit = 128 @@ -75,8 +75,9 @@ type Controller struct { // NewController create a new Controller. func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller { - pendingProcessedRegions := cache.NewDefaultCache(DefaultWaitingCacheSize) + pendingProcessedRegions := cache.NewDefaultCache(DefaultPendingRegionCacheSize) return &Controller{ + ctx: ctx, cluster: cluster, conf: conf, opController: opController, diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index bb4a60ead7c..a90de0a58d4 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -50,23 +50,23 @@ var ( // RuleChecker fix/improve region by placement rules. type RuleChecker struct { PauseController - cluster sche.CheckerCluster - ruleManager *placement.RuleManager - regionWaitingList cache.Cache - pendingList cache.Cache - switchWitnessCache *cache.TTLUint64 - record *recorder + cluster sche.CheckerCluster + ruleManager *placement.RuleManager + pendingProcessedRegions cache.Cache + pendingList cache.Cache + switchWitnessCache *cache.TTLUint64 + record *recorder } // NewRuleChecker creates a checker instance. -func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, regionWaitingList cache.Cache) *RuleChecker { +func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, pendingProcessedRegions cache.Cache) *RuleChecker { return &RuleChecker{ - cluster: cluster, - ruleManager: ruleManager, - regionWaitingList: regionWaitingList, - pendingList: cache.NewDefaultCache(maxPendingListLen), - switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()), - record: newRecord(), + cluster: cluster, + ruleManager: ruleManager, + pendingProcessedRegions: pendingProcessedRegions, + pendingList: cache.NewDefaultCache(maxPendingListLen), + switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()), + record: newRecord(), } } @@ -637,7 +637,7 @@ func (c *RuleChecker) getRuleFitStores(rf *placement.RuleFit) []*core.StoreInfo func (c *RuleChecker) handleFilterState(region *core.RegionInfo, filterByTempState bool) { if filterByTempState { - c.regionWaitingList.Put(region.GetID(), nil) + c.pendingProcessedRegions.Put(region.GetID(), nil) c.pendingList.Remove(region.GetID()) } else { c.pendingList.Put(region.GetID(), nil) From 18db300b2ab6da811de5888ce5488a19080e6b61 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 1 Aug 2024 19:01:22 +0800 Subject: [PATCH 26/33] fix Signed-off-by: lhy1024 --- pkg/schedule/checker/checker_controller.go | 2 +- server/cluster/cluster_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index de84c57afae..260e8bb0b7f 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -159,7 +159,7 @@ func (c *Controller) PatrolRegions() { start = time.Now() c.scanLimit = calculateScanLimit(c.cluster) } - failpoint.Inject("break-patrol", func() { + failpoint.Inject("breakPatrol", func() { time.Sleep(100 * time.Millisecond) // ensure the regions are handled by the workers failpoint.Return() }) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 98ab79eb97c..15a682090dc 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2878,7 +2878,7 @@ func TestCheckCache(t *testing.T) { co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/break-patrol")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol")) } func TestPatrolRegionConcurrency(t *testing.T) { @@ -2907,7 +2907,7 @@ func TestPatrolRegionConcurrency(t *testing.T) { } // test patrol region concurrency - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/break-patrol", `return`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol", `return`)) co.GetWaitGroup().Add(1) co.PatrolRegions() testutil.Eventually(re, func() bool { From 6bdb436b03fbbfe3fbc9f56617d50fc6998d77a0 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 8 Aug 2024 19:18:28 +0800 Subject: [PATCH 27/33] fix lint Signed-off-by: lhy1024 --- pkg/schedule/checker/checker_controller.go | 7 +++---- server/cluster/cluster_test.go | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 519113427d0..d8aa4a35aba 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -17,8 +17,8 @@ package checker import ( "bytes" "context" - "sync" "strconv" + "sync" "time" "github.com/pingcap/failpoint" @@ -48,7 +48,7 @@ const ( MinPatrolRegionScanLimit = 128 MaxPatrolScanRegionLimit = 8192 patrolRegionPartition = 1024 - patrolRegionChanLen = MaxPatrolScanRegionLimit + patrolRegionChanLen = MaxPatrolScanRegionLimit ) var ( @@ -56,7 +56,6 @@ var ( // WithLabelValues is a heavy operation, define variable to avoid call it every time. pendingProcessedRegionsGauge = regionListGauge.WithLabelValues("pending_processed_regions") priorityListGauge = regionListGauge.WithLabelValues("priority_list") - scanLimitGauge = regionListGauge.WithLabelValues("scan_limit") ) // Controller is used to manage all checkers. @@ -85,7 +84,7 @@ type Controller struct { // interval is the config interval of patrol regions. // It's used to update the ticker, so we need to // record it to avoid updating the ticker frequently. - interval time.Duration + interval time.Duration workerCount int // patrolRegionScanLimit is the limit of regions to scan. // It is calculated by the number of regions. diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 916d9d27ee9..100ebcad2ba 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2921,7 +2921,7 @@ func TestPatrolRegionConcurrency(t *testing.T) { for i := 0; i < 10; i++ { suspectRegions = append(suspectRegions, uint64(i)) } - co.GetCheckerController().AddPendingProcessedRegions(suspectRegions...) + co.GetCheckerController().AddPendingProcessedRegions(false, suspectRegions...) co.GetWaitGroup().Add(1) co.PatrolRegions() testutil.Eventually(re, func() bool { From 8cd88251517e90701df91b636d66ac8a388b4233 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 29 Aug 2024 17:57:38 +0800 Subject: [PATCH 28/33] address comments Signed-off-by: lhy1024 --- tools/pd-ctl/tests/config/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/pd-ctl/tests/config/config_test.go b/tools/pd-ctl/tests/config/config_test.go index d088b27702d..cf93cb9dc8c 100644 --- a/tools/pd-ctl/tests/config/config_test.go +++ b/tools/pd-ctl/tests/config/config_test.go @@ -357,7 +357,7 @@ func (suite *configTestSuite) checkConfig(cluster *pdTests.TestCluster) { re.Contains(string(output), "patrol-region-worker-count should be between 1 and 8") re.Equal(8, svr.GetScheduleConfig().PatrolRegionWorkerCount) args = []string{"-u", pdAddr, "config", "set", "patrol-region-worker-count", "0"} - _, err = tests.ExecuteCommand(cmd, args...) + output, err = tests.ExecuteCommand(cmd, args...) re.NoError(err) re.Contains(string(output), "patrol-region-worker-count should be between 1 and 8") re.Equal(8, svr.GetScheduleConfig().PatrolRegionWorkerCount) From bcd50185e79d8ac53ade3c151f48c4c72b704c9a Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 24 Sep 2024 22:00:32 +0800 Subject: [PATCH 29/33] address comments Signed-off-by: lhy1024 --- pkg/schedule/checker/checker_controller.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index f8735e0beb9..c9c2818b64d 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -53,10 +53,10 @@ const ( ) var ( - denyCheckersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("checkers", "deny") // WithLabelValues is a heavy operation, define variable to avoid call it every time. pendingProcessedRegionsGauge = regionListGauge.WithLabelValues("pending_processed_regions") priorityListGauge = regionListGauge.WithLabelValues("priority_list") + denyCheckersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("checkers", "deny") ) // Controller is used to manage all checkers. @@ -85,7 +85,8 @@ type Controller struct { // interval is the config interval of patrol regions. // It's used to update the ticker, so we need to // record it to avoid updating the ticker frequently. - interval time.Duration + interval time.Duration + // workerCount is the count of workers to patrol regions. workerCount int // patrolRegionScanLimit is the limit of regions to scan. // It is calculated by the number of regions. @@ -490,7 +491,6 @@ func (c *Controller) GetPauseController(name string) (*PauseController, error) { // PatrolRegionContext is used to store the context of patrol regions. type PatrolRegionContext struct { - // workers workersCtx context.Context workersCancel context.CancelFunc regionChan chan *core.RegionInfo @@ -503,9 +503,11 @@ func (p *PatrolRegionContext) init(ctx context.Context) { } func (p *PatrolRegionContext) stop() { + log.Debug("closing patrol region workers") close(p.regionChan) p.workersCancel() p.wg.Wait() + log.Debug("patrol region workers are closed") } func (p *PatrolRegionContext) startPatrolRegionWorkers(c *Controller) { From 457da3d57f1df3656b724b2d7e98e485d74d97a6 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 24 Sep 2024 22:33:38 +0800 Subject: [PATCH 30/33] avoid potential data race Signed-off-by: lhy1024 --- pkg/schedule/checker/rule_checker.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index 82807441bf8..7019d49fd13 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/types" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/versioninfo" "go.uber.org/zap" ) @@ -650,6 +651,7 @@ func (c *RuleChecker) handleFilterState(region *core.RegionInfo, filterByTempSta } type recorder struct { + syncutil.RWMutex offlineLeaderCounter map[uint64]uint64 lastUpdateTime time.Time } @@ -662,10 +664,14 @@ func newRecord() *recorder { } func (o *recorder) getOfflineLeaderCount(storeID uint64) uint64 { + o.RLock() + defer o.RUnlock() return o.offlineLeaderCounter[storeID] } func (o *recorder) incOfflineLeaderCount(storeID uint64) { + o.Lock() + defer o.Unlock() o.offlineLeaderCounter[storeID] += 1 o.lastUpdateTime = time.Now() } From 64abc3cc6b08889e2254e8ae9a74076da0758d91 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 25 Sep 2024 13:16:59 +0800 Subject: [PATCH 31/33] address comments: remove sleep in failpoint Signed-off-by: lhy1024 --- pkg/schedule/checker/checker_controller.go | 8 ++++- server/cluster/cluster_test.go | 41 +++++++++++----------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 62dd12fa684..e0b2b14bd0c 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -169,7 +169,6 @@ func (c *Controller) PatrolRegions() { start = time.Now() } failpoint.Inject("breakPatrol", func() { - time.Sleep(100 * time.Millisecond) // ensure the regions are handled by the workers failpoint.Return() }) case <-c.ctx.Done(): @@ -492,6 +491,13 @@ func (c *Controller) GetPauseController(name string) (*PauseController, error) { } } +func (c *Controller) IsPatrolRegionChanEmpty() bool { + if c.patrolRegionContext == nil { + return true + } + return len(c.patrolRegionContext.regionChan) == 0 +} + // PatrolRegionContext is used to store the context of patrol regions. type PatrolRegionContext struct { workersCtx context.Context diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 78a1e12c58c..4f4b2ec90e7 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2841,6 +2841,7 @@ func TestCheckCache(t *testing.T) { }, nil, nil, re) defer cleanup() oc := co.GetOperatorController() + checker := co.GetCheckerController() re.NoError(tc.addRegionStore(1, 0)) re.NoError(tc.addRegionStore(2, 0)) @@ -2851,34 +2852,34 @@ func TestCheckCache(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol", `return`)) // case 1: operator cannot be created due to replica-schedule-limit restriction - co.GetWaitGroup().Add(1) - co.PatrolRegions() + checker.PatrolRegions() + re.True(checker.IsPatrolRegionChanEmpty()) re.Empty(oc.GetOperators()) - re.Len(co.GetCheckerController().GetPendingProcessedRegions(), 1) + re.Len(checker.GetPendingProcessedRegions(), 1) // cancel the replica-schedule-limit restriction cfg := tc.GetScheduleConfig() cfg.ReplicaScheduleLimit = 10 tc.SetScheduleConfig(cfg) - co.GetWaitGroup().Add(1) - co.PatrolRegions() + checker.PatrolRegions() + re.True(checker.IsPatrolRegionChanEmpty()) re.Len(oc.GetOperators(), 1) - re.Empty(co.GetCheckerController().GetPendingProcessedRegions()) + re.Empty(checker.GetPendingProcessedRegions()) // case 2: operator cannot be created due to store limit restriction oc.RemoveOperator(oc.GetOperator(1)) tc.SetStoreLimit(1, storelimit.AddPeer, 0) - co.GetWaitGroup().Add(1) - co.PatrolRegions() - re.Len(co.GetCheckerController().GetPendingProcessedRegions(), 1) + checker.PatrolRegions() + re.True(checker.IsPatrolRegionChanEmpty()) + re.Len(checker.GetPendingProcessedRegions(), 1) // cancel the store limit restriction tc.SetStoreLimit(1, storelimit.AddPeer, 10) time.Sleep(time.Second) - co.GetWaitGroup().Add(1) - co.PatrolRegions() + checker.PatrolRegions() + re.True(checker.IsPatrolRegionChanEmpty()) re.Len(oc.GetOperators(), 1) - re.Empty(co.GetCheckerController().GetPendingProcessedRegions()) + re.Empty(checker.GetPendingProcessedRegions()) co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() @@ -2897,6 +2898,7 @@ func TestPatrolRegionConcurrency(t *testing.T) { }, nil, nil, re) defer cleanup() oc := co.GetOperatorController() + checker := co.GetCheckerController() tc.opt.SetSplitMergeInterval(time.Duration(0)) for i := 1; i < 4; i++ { @@ -2912,8 +2914,7 @@ func TestPatrolRegionConcurrency(t *testing.T) { // test patrol region concurrency re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol", `return`)) - co.GetWaitGroup().Add(1) - co.PatrolRegions() + checker.PatrolRegions() testutil.Eventually(re, func() bool { return len(oc.GetOperators()) >= mergeScheduleLimit }) @@ -2924,9 +2925,8 @@ func TestPatrolRegionConcurrency(t *testing.T) { for i := 0; i < 10; i++ { suspectRegions = append(suspectRegions, uint64(i)) } - co.GetCheckerController().AddPendingProcessedRegions(false, suspectRegions...) - co.GetWaitGroup().Add(1) - co.PatrolRegions() + checker.AddPendingProcessedRegions(false, suspectRegions...) + checker.PatrolRegions() testutil.Eventually(re, func() bool { return len(oc.GetOperators()) >= mergeScheduleLimit }) @@ -2980,8 +2980,7 @@ func checkScanLimit(re *require.Assertions, regionCount int, expectScanLimit ... re.NoError(tc.putRegion(region)) } - co.GetWaitGroup().Add(1) - co.PatrolRegions() + co.GetCheckerController().PatrolRegions() defer func() { co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() @@ -3472,9 +3471,9 @@ func BenchmarkPatrolRegion(b *testing.B) { }() <-listen - co.GetWaitGroup().Add(1) b.ResetTimer() - co.PatrolRegions() + checker := co.GetCheckerController() + checker.PatrolRegions() } func waitOperator(re *require.Assertions, co *schedule.Coordinator, regionID uint64) { From e570e3ca63ca222f12d20cd45e2c8f492f3b40e9 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Sun, 29 Sep 2024 15:43:18 +0800 Subject: [PATCH 32/33] fix lint Signed-off-by: lhy1024 --- pkg/schedule/checker/checker_controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index e0b2b14bd0c..9c9ae8fbed6 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -491,6 +491,7 @@ func (c *Controller) GetPauseController(name string) (*PauseController, error) { } } +// IsPatrolRegionChanEmpty returns whether the patrol region channel is empty. func (c *Controller) IsPatrolRegionChanEmpty() bool { if c.patrolRegionContext == nil { return true From 7e5813d0abce553cd25202df020ace33013c18a0 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 30 Oct 2024 17:49:40 +0800 Subject: [PATCH 33/33] fix lint and make test stable Signed-off-by: lhy1024 --- pkg/schedule/checker/checker_controller.go | 5 ++++- server/cluster/cluster_test.go | 12 ++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 9c9ae8fbed6..5ac8e9e940e 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -169,6 +169,9 @@ func (c *Controller) PatrolRegions() { start = time.Now() } failpoint.Inject("breakPatrol", func() { + for !c.IsPatrolRegionChanEmpty() { + time.Sleep(time.Millisecond * 10) + } failpoint.Return() }) case <-c.ctx.Done(): @@ -521,7 +524,7 @@ func (p *PatrolRegionContext) stop() { } func (p *PatrolRegionContext) startPatrolRegionWorkers(c *Controller) { - for i := 0; i < c.workerCount; i++ { + for i := range c.workerCount { p.wg.Add(1) go func(i int) { defer logutil.LogPanic() diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index b8f55bcb494..9d3a3d44590 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2886,7 +2886,6 @@ func TestCheckCache(t *testing.T) { // case 1: operator cannot be created due to replica-schedule-limit restriction checker.PatrolRegions() - re.True(checker.IsPatrolRegionChanEmpty()) re.Empty(oc.GetOperators()) re.Len(checker.GetPendingProcessedRegions(), 1) @@ -2895,7 +2894,6 @@ func TestCheckCache(t *testing.T) { cfg.ReplicaScheduleLimit = 10 tc.SetScheduleConfig(cfg) checker.PatrolRegions() - re.True(checker.IsPatrolRegionChanEmpty()) re.Len(oc.GetOperators(), 1) re.Empty(checker.GetPendingProcessedRegions()) @@ -2903,14 +2901,12 @@ func TestCheckCache(t *testing.T) { oc.RemoveOperator(oc.GetOperator(1)) tc.SetStoreLimit(1, storelimit.AddPeer, 0) checker.PatrolRegions() - re.True(checker.IsPatrolRegionChanEmpty()) re.Len(checker.GetPendingProcessedRegions(), 1) // cancel the store limit restriction tc.SetStoreLimit(1, storelimit.AddPeer, 10) time.Sleep(time.Second) checker.PatrolRegions() - re.True(checker.IsPatrolRegionChanEmpty()) re.Len(oc.GetOperators(), 1) re.Empty(checker.GetPendingProcessedRegions()) @@ -2934,12 +2930,12 @@ func TestPatrolRegionConcurrency(t *testing.T) { checker := co.GetCheckerController() tc.opt.SetSplitMergeInterval(time.Duration(0)) - for i := 1; i < 4; i++ { - if err := tc.addRegionStore(uint64(i), regionNum); err != nil { + for i := range 3 { + if err := tc.addRegionStore(uint64(i+1), regionNum); err != nil { return } } - for i := 0; i < regionNum; i++ { + for i := range regionNum { if err := tc.addLeaderRegion(uint64(i), 1, 2, 3); err != nil { return } @@ -2955,7 +2951,7 @@ func TestPatrolRegionConcurrency(t *testing.T) { // test patrol region concurrency with suspect regions suspectRegions := make([]uint64, 0) - for i := 0; i < 10; i++ { + for i := range 10 { suspectRegions = append(suspectRegions, uint64(i)) } checker.AddPendingProcessedRegions(false, suspectRegions...)