diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 3edefbcaefd..8bd2616f41f 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -59,7 +59,11 @@ func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) { func Collect(c Cluster, region *core.RegionInfo, hasRegionStats bool) { if hasRegionStats { // get region again from root tree. make sure the observed region is the latest. - region = c.GetBasicCluster().GetRegion(region.GetID()) + bc := c.GetBasicCluster() + if bc == nil { + return + } + region = bc.GetRegion(region.GetID()) if region == nil { return } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 76a9d770ce7..5e7a7a6f6a9 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -63,6 +63,9 @@ const ( regionLabelGCInterval = time.Hour requestTimeout = 3 * time.Second collectWaitTime = time.Minute + + // heartbeat relative const + hbAsyncRunner = "heartbeat-async-task-runner" ) var syncRunner = ratelimit.NewSyncRunner() @@ -90,7 +93,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, clusterID: clusterID, checkMembershipCh: checkMembershipCh, - taskRunner: ratelimit.NewAsyncRunner("heartbeat-async-task-runner", time.Minute), + taskRunner: ratelimit.NewAsyncRunner(hbAsyncRunner, time.Minute), hbConcurrencyLimiter: ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU() * 2)), } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 52655b093a2..488763142e1 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -157,6 +157,18 @@ func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool { region.IsOversized(int64(r.conf.GetRegionMaxSize()), int64(r.conf.GetRegionMaxKeys())) { return true } + // expected to be zero for below type + if r.IsRegionStatsType(regionID, PendingPeer) && len(region.GetPendingPeers()) == 0 { + return true + } + if r.IsRegionStatsType(regionID, DownPeer) && len(region.GetDownPeers()) == 0 { + return true + } + if r.IsRegionStatsType(regionID, LearnerPeer) && len(region.GetLearners()) == 0 { + return true + } + + // merge return r.IsRegionStatsType(regionID, UndersizedRegion) != region.NeedMerge(int64(r.conf.GetMaxMergeRegionSize()), int64(r.conf.GetMaxMergeRegionKeys())) } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 95507d8c4aa..d9bf15e4d34 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -106,6 +106,9 @@ const ( // minSnapshotDurationSec is the minimum duration that a store can tolerate. // It should enlarge the limiter if the snapshot's duration is less than this value. minSnapshotDurationSec = 5 + + // heartbeat relative const + hbAsyncRunner = "heartbeat-async-task-runner" ) // Server is the interface for cluster. @@ -195,7 +198,7 @@ func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.Ba etcdClient: etcdClient, core: basicCluster, storage: storage, - taskRunner: ratelimit.NewAsyncRunner("heartbeat-async-task-runner", time.Minute), + taskRunner: ratelimit.NewAsyncRunner(hbAsyncRunner, time.Minute), hbConcurrencyLimiter: ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU() * 2)), } }