diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go new file mode 100644 index 00000000000..b5e42b40ac8 --- /dev/null +++ b/pkg/mcs/scheduling/server/cluster.go @@ -0,0 +1,464 @@ +package server + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/schedulingpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cluster" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + "github.com/tikv/pd/pkg/schedule" + sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/hbstream" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/buckets" + "github.com/tikv/pd/pkg/statistics/utils" + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/logutil" + "go.uber.org/zap" +) + +// Cluster is used to manage all information for scheduling purpose. +type Cluster struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + *core.BasicCluster + persistConfig *config.PersistConfig + ruleManager *placement.RuleManager + labelerManager *labeler.RegionLabeler + regionStats *statistics.RegionStatistics + labelStats *statistics.LabelStatistics + hotStat *statistics.HotStat + storage storage.Storage + coordinator *schedule.Coordinator + checkMembershipCh chan struct{} + apiServerLeader atomic.Value + clusterID uint64 +} + +const regionLabelGCInterval = time.Hour + +// NewCluster creates a new cluster. +func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) { + ctx, cancel := context.WithCancel(parentCtx) + labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval) + if err != nil { + cancel() + return nil, err + } + ruleManager := placement.NewRuleManager(storage, basicCluster, persistConfig) + c := &Cluster{ + ctx: ctx, + cancel: cancel, + BasicCluster: basicCluster, + ruleManager: ruleManager, + labelerManager: labelerManager, + persistConfig: persistConfig, + hotStat: statistics.NewHotStat(ctx), + labelStats: statistics.NewLabelStatistics(), + regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), + storage: storage, + clusterID: clusterID, + checkMembershipCh: checkMembershipCh, + } + c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) + err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) + if err != nil { + cancel() + return nil, err + } + return c, nil +} + +// GetCoordinator returns the coordinator +func (c *Cluster) GetCoordinator() *schedule.Coordinator { + return c.coordinator +} + +// GetHotStat gets hot stat. +func (c *Cluster) GetHotStat() *statistics.HotStat { + return c.hotStat +} + +// GetRegionStats gets region statistics. +func (c *Cluster) GetRegionStats() *statistics.RegionStatistics { + return c.regionStats +} + +// GetLabelStats gets label statistics. +func (c *Cluster) GetLabelStats() *statistics.LabelStatistics { + return c.labelStats +} + +// GetBasicCluster returns the basic cluster. +func (c *Cluster) GetBasicCluster() *core.BasicCluster { + return c.BasicCluster +} + +// GetSharedConfig returns the shared config. +func (c *Cluster) GetSharedConfig() sc.SharedConfigProvider { + return c.persistConfig +} + +// GetRuleManager returns the rule manager. +func (c *Cluster) GetRuleManager() *placement.RuleManager { + return c.ruleManager +} + +// GetRegionLabeler returns the region labeler. +func (c *Cluster) GetRegionLabeler() *labeler.RegionLabeler { + return c.labelerManager +} + +// GetStoresLoads returns load stats of all stores. +func (c *Cluster) GetStoresLoads() map[uint64][]float64 { + return c.hotStat.GetStoresLoads() +} + +// IsRegionHot checks if a region is in hot state. +func (c *Cluster) IsRegionHot(region *core.RegionInfo) bool { + return c.hotStat.IsRegionHot(region, c.persistConfig.GetHotRegionCacheHitsThreshold()) +} + +// GetHotPeerStat returns hot peer stat with specified regionID and storeID. +func (c *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { + return c.hotStat.GetHotPeerStat(rw, regionID, storeID) +} + +// RegionReadStats returns hot region's read stats. +// The result only includes peers that are hot enough. +// RegionStats is a thread-safe method +func (c *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { + // As read stats are reported by store heartbeat, the threshold needs to be adjusted. + threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + return c.hotStat.RegionStats(utils.Read, threshold) +} + +// RegionWriteStats returns hot region's write stats. +// The result only includes peers that are hot enough. +func (c *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { + // RegionStats is a thread-safe method + return c.hotStat.RegionStats(utils.Write, c.persistConfig.GetHotRegionCacheHitsThreshold()) +} + +// BucketsStats returns hot region's buckets stats. +func (c *Cluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { + return c.hotStat.BucketsStats(degree, regionIDs...) +} + +// GetStorage returns the storage. +func (c *Cluster) GetStorage() storage.Storage { + return c.storage +} + +// GetCheckerConfig returns the checker config. +func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return c.persistConfig } + +// GetSchedulerConfig returns the scheduler config. +func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return c.persistConfig } + +// GetStoreConfig returns the store config. +func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConfig } + +// AllocID allocates a new ID. +func (c *Cluster) AllocID() (uint64, error) { + client, err := c.getAPIServerLeaderClient() + if err != nil { + return 0, err + } + resp, err := client.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) + if err != nil { + c.checkMembershipCh <- struct{}{} + return 0, err + } + return resp.GetId(), nil +} + +func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) { + cli := c.apiServerLeader.Load() + if cli == nil { + c.checkMembershipCh <- struct{}{} + return nil, errors.New("API server leader is not found") + } + return cli.(pdpb.PDClient), nil +} + +// SwitchAPIServerLeader switches the API server leader. +func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { + old := c.apiServerLeader.Load() + return c.apiServerLeader.CompareAndSwap(old, new) +} + +// updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion. +func (c *Cluster) updateScheduler() { + defer logutil.LogPanic() + defer c.wg.Done() + + // Make sure the coordinator has initialized all the existing schedulers. + c.waitSchedulersInitialized() + // Establish a notifier to listen the schedulers updating. + notifier := make(chan struct{}, 1) + // Make sure the check will be triggered once later. + notifier <- struct{}{} + c.persistConfig.SetSchedulersUpdatingNotifier(notifier) + for { + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop listening the schedulers updating notifier") + return + case <-notifier: + // This is triggered by the watcher when the schedulers are updated. + } + + log.Info("schedulers updating notifier is triggered, try to update the scheduler") + var ( + schedulersController = c.coordinator.GetSchedulersController() + latestSchedulersConfig = c.persistConfig.GetScheduleConfig().Schedulers + ) + // Create the newly added schedulers. + for _, scheduler := range latestSchedulersConfig { + s, err := schedulers.CreateScheduler( + scheduler.Type, + c.coordinator.GetOperatorController(), + c.storage, + schedulers.ConfigSliceDecoder(scheduler.Type, scheduler.Args), + schedulersController.RemoveScheduler, + ) + if err != nil { + log.Error("failed to create scheduler", + zap.String("scheduler-type", scheduler.Type), + zap.Strings("scheduler-args", scheduler.Args), + errs.ZapError(err)) + continue + } + name := s.GetName() + if existed, _ := schedulersController.IsSchedulerExisted(name); existed { + log.Info("scheduler has already existed, skip adding it", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args)) + continue + } + if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil { + log.Error("failed to add scheduler", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args), + errs.ZapError(err)) + continue + } + log.Info("add scheduler successfully", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args)) + } + // Remove the deleted schedulers. + for _, name := range schedulersController.GetSchedulerNames() { + scheduler := schedulersController.GetScheduler(name) + if slice.AnyOf(latestSchedulersConfig, func(i int) bool { + return latestSchedulersConfig[i].Type == scheduler.GetType() + }) { + continue + } + if err := schedulersController.RemoveScheduler(name); err != nil { + log.Error("failed to remove scheduler", + zap.String("scheduler-name", name), + errs.ZapError(err)) + continue + } + log.Info("remove scheduler successfully", + zap.String("scheduler-name", name)) + } + } +} + +func (c *Cluster) waitSchedulersInitialized() { + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + for { + if c.coordinator.AreSchedulersInitialized() { + return + } + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop waiting the schedulers initialization") + return + case <-ticker.C: + } + } +} + +// TODO: implement the following methods + +// UpdateRegionsLabelLevelStats updates the status of the region label level by types. +func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { + for _, region := range regions { + c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) + } +} + +func (c *Cluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { + stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) + for _, p := range region.GetPeers() { + if store := c.GetStore(p.GetStoreId()); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { + stores = append(stores, store) + } + } + return stores +} + +// HandleStoreHeartbeat updates the store status. +func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatRequest) error { + stats := heartbeat.GetStats() + storeID := stats.GetStoreId() + store := c.GetStore(storeID) + if store == nil { + return errors.Errorf("store %v not found", storeID) + } + + nowTime := time.Now() + newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime)) + + if store := c.GetStore(storeID); store != nil { + statistics.UpdateStoreHeartbeatMetrics(store) + } + c.PutStore(newStore) + c.hotStat.Observe(storeID, newStore.GetStoreStats()) + c.hotStat.FilterUnhealthyStore(c) + reportInterval := stats.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + + regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) + for _, peerStat := range stats.GetPeerStats() { + regionID := peerStat.GetRegionId() + region := c.GetRegion(regionID) + regions[regionID] = region + if region == nil { + log.Warn("discard hot peer stat for unknown region", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + peer := region.GetStorePeer(storeID) + if peer == nil { + log.Warn("discard hot peer stat for unknown region peer", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats()) + loads := []float64{ + utils.RegionReadBytes: float64(peerStat.GetReadBytes()), + utils.RegionReadKeys: float64(peerStat.GetReadKeys()), + utils.RegionReadQueryNum: float64(readQueryNum), + utils.RegionWriteBytes: 0, + utils.RegionWriteKeys: 0, + utils.RegionWriteQueryNum: 0, + } + peerInfo := core.NewPeerInfo(peer, loads, interval) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + } + + // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. + c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) + return nil +} + +// runUpdateStoreStats updates store stats periodically. +func (c *Cluster) runUpdateStoreStats() { + defer logutil.LogPanic() + defer c.wg.Done() + + ticker := time.NewTicker(9 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + log.Info("update store stats background jobs has been stopped") + return + case <-ticker.C: + c.UpdateAllStoreStatus() + } + } +} + +// StartBackgroundJobs starts background jobs. +func (c *Cluster) StartBackgroundJobs() { + c.wg.Add(2) + go c.updateScheduler() + go c.runUpdateStoreStats() +} + +// StopBackgroundJobs stops background jobs. +func (c *Cluster) StopBackgroundJobs() { + c.cancel() + c.wg.Wait() +} + +// HandleRegionHeartbeat processes RegionInfo reports from client. +func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { + if err := c.processRegionHeartbeat(region); err != nil { + return err + } + + c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) + return nil +} + +// processRegionHeartbeat updates the region information. +func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { + origin, _, err := c.PreCheckPutRegion(region) + if err != nil { + return err + } + region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) + + cluster.HandleStatsAsync(c, region) + + hasRegionStats := c.regionStats != nil + // Save to storage if meta is updated, except for flashback. + // Save to cache if meta or leader is updated, or contains any down/pending peer. + // Mark isNew if the region in cache does not have leader. + isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin) + if !saveCache && !isNew { + // Due to some config changes need to update the region stats as well, + // so we do some extra checks here. + if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { + c.regionStats.Observe(region, c.GetRegionStores(region)) + } + return nil + } + + var overlaps []*core.RegionInfo + if saveCache { + // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, + // check its validation again here. + // + // However it can't solve the race condition of concurrent heartbeats from the same region. + if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil { + return err + } + + cluster.HandleOverlaps(c, overlaps) + } + + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) + return nil +} + +// IsPrepared return true if the prepare checker is ready. +func (c *Cluster) IsPrepared() bool { + return c.coordinator.GetPrepareChecker().IsPrepared() +} diff --git a/server/api/stats_test.go b/server/api/stats_test.go index 19a27ab0c53..1285bd26fc5 100644 --- a/server/api/stats_test.go +++ b/server/api/stats_test.go @@ -137,7 +137,12 @@ func (suite *statsTestSuite) TestRegionStats() { statsAll := &statistics.RegionStats{ Count: 4, EmptyCount: 1, +<<<<<<< HEAD StorageSize: 350, +======= + StorageSize: 351, + UserStorageSize: 291, +>>>>>>> eac55a768 (Revert "statistics: fix empty region count when resuming (#7009)" (#7149)) StorageKeys: 221, StoreLeaderCount: map[uint64]int{1: 1, 4: 2, 5: 1}, StorePeerCount: map[uint64]int{1: 3, 2: 1, 3: 1, 4: 2, 5: 2}, @@ -150,7 +155,12 @@ func (suite *statsTestSuite) TestRegionStats() { stats23 := &statistics.RegionStats{ Count: 2, EmptyCount: 1, +<<<<<<< HEAD StorageSize: 200, +======= + StorageSize: 201, + UserStorageSize: 181, +>>>>>>> eac55a768 (Revert "statistics: fix empty region count when resuming (#7009)" (#7149)) StorageKeys: 151, StoreLeaderCount: map[uint64]int{4: 1, 5: 1}, StorePeerCount: map[uint64]int{1: 2, 4: 1, 5: 2}, diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 631dd85129d..e76e8fe2085 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -843,9 +843,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { if err != nil { return err } - if c.GetStoreConfig().IsEnableRegionBucket() { - region.InheritBuckets(origin) - } + region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) diff --git a/server/core/region.go b/server/core/region.go index 7a1fb2c87c2..f155d9516c4 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -144,9 +144,8 @@ const ( func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo { // Convert unit to MB. // If region isn't empty and less than 1MB, use 1MB instead. + // The size of empty region will be correct by the previous RegionInfo. regionSize := heartbeat.GetApproximateSize() / units.MiB - // Due to https://github.com/tikv/tikv/pull/11170, if region size is not initialized, - // approximate size will be zero, and region size is zero not EmptyRegionApproximateSize if heartbeat.GetApproximateSize() > 0 && regionSize < EmptyRegionApproximateSize { regionSize = EmptyRegionApproximateSize } @@ -189,9 +188,19 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC return region } -// InheritBuckets inherits the buckets from the parent region if bucket enabled. -func (r *RegionInfo) InheritBuckets(origin *RegionInfo) { - if origin != nil && r.buckets == nil { +// Inherit inherits the buckets and region size from the parent region if bucket enabled. +// correct approximate size and buckets by the previous size if here exists a reported RegionInfo. +// See https://github.com/tikv/tikv/issues/11114 +func (r *RegionInfo) Inherit(origin *RegionInfo, bucketEnable bool) { + // regionSize should not be zero if region is not empty. + if r.GetApproximateSize() == 0 { + if origin != nil { + r.approximateSize = origin.approximateSize + } else { + r.approximateSize = EmptyRegionApproximateSize + } + } + if bucketEnable && origin != nil && r.buckets == nil { r.buckets = origin.buckets } } @@ -469,11 +478,26 @@ func (r *RegionInfo) GetApproximateSize() int64 { return r.approximateSize } +<<<<<<< HEAD:server/core/region.go // IsEmptyRegion returns whether the region is empty. func (r *RegionInfo) IsEmptyRegion() bool { // When cluster resumes, the region size may be not initialized, but region heartbeat is send. // So use `==` here. return r.approximateSize == EmptyRegionApproximateSize +======= +// GetStorePeerApproximateKeys returns the approximate keys of the peer on the specified store. +func (r *RegionInfo) GetStorePeerApproximateKeys(storeID uint64) int64 { + peer := r.GetStorePeer(storeID) + if storeID != 0 && peer != nil && peer.IsWitness { + return 0 + } + return r.approximateKeys +} + +// GetApproximateKvSize returns the approximate kv size of the region. +func (r *RegionInfo) GetApproximateKvSize() int64 { + return r.approximateKvSize +>>>>>>> eac55a768 (Revert "statistics: fix empty region count when resuming (#7009)" (#7149)):pkg/core/region.go } // GetApproximateKeys returns the approximate keys of the region. diff --git a/server/core/region_test.go b/server/core/region_test.go index 8f63125e5d6..d6030997fa3 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -186,9 +186,35 @@ func TestSortedEqual(t *testing.T) { } } -func TestInheritBuckets(t *testing.T) { +func TestInherit(t *testing.T) { re := require.New(t) + // size in MB + // case for approximateSize + testCases := []struct { + originExists bool + originSize uint64 + size uint64 + expect uint64 + }{ + {false, 0, 0, 1}, + {false, 0, 2, 2}, + {true, 0, 2, 2}, + {true, 1, 2, 2}, + {true, 2, 0, 2}, + } + for _, testCase := range testCases { + var origin *RegionInfo + if testCase.originExists { + origin = NewRegionInfo(&metapb.Region{Id: 100}, nil) + origin.approximateSize = int64(testCase.originSize) + } + r := NewRegionInfo(&metapb.Region{Id: 100}, nil) + r.approximateSize = int64(testCase.size) + r.Inherit(origin, false) + re.Equal(int64(testCase.expect), r.approximateSize) + } + // bucket data := []struct { originBuckets *metapb.Buckets buckets *metapb.Buckets @@ -201,11 +227,12 @@ func TestInheritBuckets(t *testing.T) { for _, d := range data { origin := NewRegionInfo(&metapb.Region{Id: 100}, nil, SetBuckets(d.originBuckets)) r := NewRegionInfo(&metapb.Region{Id: 100}, nil) - r.InheritBuckets(origin) + r.Inherit(origin, true) re.Equal(d.originBuckets, r.GetBuckets()) // region will not inherit bucket keys. if origin.GetBuckets() != nil { newRegion := NewRegionInfo(&metapb.Region{Id: 100}, nil) + newRegion.Inherit(origin, false) re.NotEqual(d.originBuckets, newRegion.GetBuckets()) } } diff --git a/server/statistics/region.go b/server/statistics/region.go index 60e2f19935d..eaa5f80f8ab 100644 --- a/server/statistics/region.go +++ b/server/statistics/region.go @@ -57,12 +57,21 @@ func (s *RegionStats) Observe(r *core.RegionInfo) { s.Count++ approximateKeys := r.GetApproximateKeys() approximateSize := r.GetApproximateSize() +<<<<<<< HEAD:server/statistics/region.go if approximateSize == core.EmptyRegionApproximateSize { s.EmptyCount++ } if !r.IsEmptyRegion() { s.StorageSize += approximateSize } +======= + approximateKvSize := r.GetApproximateKvSize() + if approximateSize <= core.EmptyRegionApproximateSize { + s.EmptyCount++ + } + s.StorageSize += approximateSize + s.UserStorageSize += approximateKvSize +>>>>>>> eac55a768 (Revert "statistics: fix empty region count when resuming (#7009)" (#7149)):pkg/statistics/region.go s.StorageKeys += approximateKeys leader := r.GetLeader() if leader != nil { diff --git a/server/statistics/region_collection.go b/server/statistics/region_collection.go index 56d5516c8b7..07363882ac9 100644 --- a/server/statistics/region_collection.go +++ b/server/statistics/region_collection.go @@ -198,15 +198,22 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store DownPeer: len(region.GetDownPeers()) > 0, PendingPeer: len(region.GetPendingPeers()) > 0, LearnerPeer: len(region.GetLearners()) > 0, - EmptyRegion: region.IsEmptyRegion(), + EmptyRegion: region.GetApproximateSize() <= core.EmptyRegionApproximateSize, OversizedRegion: region.IsOversized( int64(r.storeConfigManager.GetStoreConfig().GetRegionMaxSize()), int64(r.storeConfigManager.GetStoreConfig().GetRegionMaxKeys()), ), UndersizedRegion: region.NeedMerge( +<<<<<<< HEAD:server/statistics/region_collection.go int64(r.opt.GetMaxMergeRegionSize()), int64(r.opt.GetMaxMergeRegionKeys()), ) && region.GetApproximateSize() >= core.EmptyRegionApproximateSize, +======= + int64(r.conf.GetMaxMergeRegionSize()), + int64(r.conf.GetMaxMergeRegionKeys()), + ), + WitnessLeader: region.GetLeader().GetIsWitness(), +>>>>>>> eac55a768 (Revert "statistics: fix empty region count when resuming (#7009)" (#7149)):pkg/statistics/region_collection.go } for typ, c := range conditions { diff --git a/server/statistics/region_collection_test.go b/server/statistics/region_collection_test.go index 179d78b539f..d168aeb5cb1 100644 --- a/server/statistics/region_collection_test.go +++ b/server/statistics/region_collection_test.go @@ -63,7 +63,7 @@ func TestRegionStatistics(t *testing.T) { stores[3] = store3 r1 := &metapb.Region{Id: 1, Peers: peers, StartKey: []byte("aa"), EndKey: []byte("bb")} r2 := &metapb.Region{Id: 2, Peers: peers[0:2], StartKey: []byte("cc"), EndKey: []byte("dd")} - region1 := core.NewRegionInfo(r1, peers[0], core.SetApproximateSize(1)) + region1 := core.NewRegionInfo(r1, peers[0]) region2 := core.NewRegionInfo(r2, peers[0]) regionStats := NewRegionStatistics(opt, manager, nil) regionStats.Observe(region1, stores) @@ -103,6 +103,7 @@ func TestRegionStatistics(t *testing.T) { re.Len(regionStats.stats[PendingPeer], 1) re.Len(regionStats.stats[LearnerPeer], 1) re.Len(regionStats.stats[OversizedRegion], 1) +<<<<<<< HEAD:server/statistics/region_collection_test.go re.Len(regionStats.stats[UndersizedRegion], 0) re.Len(regionStats.stats[EmptyRegion], 0) re.Len(regionStats.offlineStats[ExtraPeer], 1) @@ -111,6 +112,10 @@ func TestRegionStatistics(t *testing.T) { re.Len(regionStats.offlineStats[PendingPeer], 1) re.Len(regionStats.offlineStats[LearnerPeer], 1) re.Len(regionStats.offlineStats[OfflinePeer], 1) +======= + re.Len(regionStats.stats[UndersizedRegion], 1) + re.Len(regionStats.stats[OfflinePeer], 1) +>>>>>>> eac55a768 (Revert "statistics: fix empty region count when resuming (#7009)" (#7149)):pkg/statistics/region_collection_test.go region1 = region1.Clone(core.WithRemoveStorePeer(7)) regionStats.Observe(region1, stores[0:3])