Skip to content

Commit

Permalink
This is an automated cherry-pick of #7149
Browse files Browse the repository at this point in the history
close #7148

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CabinfeverB authored and ti-chi-bot committed Sep 25, 2023
1 parent 5b491e2 commit dd6d08d
Show file tree
Hide file tree
Showing 8 changed files with 556 additions and 12 deletions.
464 changes: 464 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions server/api/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ func (suite *statsTestSuite) TestRegionStats() {
statsAll := &statistics.RegionStats{
Count: 4,
EmptyCount: 1,
<<<<<<< HEAD
StorageSize: 350,
=======
StorageSize: 351,
UserStorageSize: 291,
>>>>>>> eac55a768 (Revert "statistics: fix empty region count when resuming (#7009)" (#7149))
StorageKeys: 221,
StoreLeaderCount: map[uint64]int{1: 1, 4: 2, 5: 1},
StorePeerCount: map[uint64]int{1: 3, 2: 1, 3: 1, 4: 2, 5: 2},
Expand All @@ -150,7 +155,12 @@ func (suite *statsTestSuite) TestRegionStats() {
stats23 := &statistics.RegionStats{
Count: 2,
EmptyCount: 1,
<<<<<<< HEAD
StorageSize: 200,
=======
StorageSize: 201,
UserStorageSize: 181,
>>>>>>> eac55a768 (Revert "statistics: fix empty region count when resuming (#7009)" (#7149))
StorageKeys: 151,
StoreLeaderCount: map[uint64]int{4: 1, 5: 1},
StorePeerCount: map[uint64]int{1: 2, 4: 1, 5: 2},
Expand Down
4 changes: 1 addition & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,9 +843,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
if err != nil {
return err
}
if c.GetStoreConfig().IsEnableRegionBucket() {
region.InheritBuckets(origin)
}
region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket())

c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
Expand Down
34 changes: 29 additions & 5 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,8 @@ const (
func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
// Convert unit to MB.
// If region isn't empty and less than 1MB, use 1MB instead.
// The size of empty region will be correct by the previous RegionInfo.
regionSize := heartbeat.GetApproximateSize() / units.MiB
// Due to https://github.com/tikv/tikv/pull/11170, if region size is not initialized,
// approximate size will be zero, and region size is zero not EmptyRegionApproximateSize
if heartbeat.GetApproximateSize() > 0 && regionSize < EmptyRegionApproximateSize {
regionSize = EmptyRegionApproximateSize
}
Expand Down Expand Up @@ -189,9 +188,19 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
return region
}

// InheritBuckets inherits the buckets from the parent region if bucket enabled.
func (r *RegionInfo) InheritBuckets(origin *RegionInfo) {
if origin != nil && r.buckets == nil {
// Inherit inherits the buckets and region size from the parent region if bucket enabled.
// correct approximate size and buckets by the previous size if here exists a reported RegionInfo.
// See https://github.com/tikv/tikv/issues/11114
func (r *RegionInfo) Inherit(origin *RegionInfo, bucketEnable bool) {
// regionSize should not be zero if region is not empty.
if r.GetApproximateSize() == 0 {
if origin != nil {
r.approximateSize = origin.approximateSize
} else {
r.approximateSize = EmptyRegionApproximateSize
}
}
if bucketEnable && origin != nil && r.buckets == nil {
r.buckets = origin.buckets
}
}
Expand Down Expand Up @@ -469,11 +478,26 @@ func (r *RegionInfo) GetApproximateSize() int64 {
return r.approximateSize
}

<<<<<<< HEAD:server/core/region.go
// IsEmptyRegion returns whether the region is empty.
func (r *RegionInfo) IsEmptyRegion() bool {
// When cluster resumes, the region size may be not initialized, but region heartbeat is send.
// So use `==` here.
return r.approximateSize == EmptyRegionApproximateSize
=======
// GetStorePeerApproximateKeys returns the approximate keys of the peer on the specified store.
func (r *RegionInfo) GetStorePeerApproximateKeys(storeID uint64) int64 {
peer := r.GetStorePeer(storeID)
if storeID != 0 && peer != nil && peer.IsWitness {
return 0
}
return r.approximateKeys
}

// GetApproximateKvSize returns the approximate kv size of the region.
func (r *RegionInfo) GetApproximateKvSize() int64 {
return r.approximateKvSize
>>>>>>> eac55a768 (Revert "statistics: fix empty region count when resuming (#7009)" (#7149)):pkg/core/region.go
}

// GetApproximateKeys returns the approximate keys of the region.
Expand Down
31 changes: 29 additions & 2 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,35 @@ func TestSortedEqual(t *testing.T) {
}
}

func TestInheritBuckets(t *testing.T) {
func TestInherit(t *testing.T) {
re := require.New(t)
// size in MB
// case for approximateSize
testCases := []struct {
originExists bool
originSize uint64
size uint64
expect uint64
}{
{false, 0, 0, 1},
{false, 0, 2, 2},
{true, 0, 2, 2},
{true, 1, 2, 2},
{true, 2, 0, 2},
}
for _, testCase := range testCases {
var origin *RegionInfo
if testCase.originExists {
origin = NewRegionInfo(&metapb.Region{Id: 100}, nil)
origin.approximateSize = int64(testCase.originSize)
}
r := NewRegionInfo(&metapb.Region{Id: 100}, nil)
r.approximateSize = int64(testCase.size)
r.Inherit(origin, false)
re.Equal(int64(testCase.expect), r.approximateSize)
}

// bucket
data := []struct {
originBuckets *metapb.Buckets
buckets *metapb.Buckets
Expand All @@ -201,11 +227,12 @@ func TestInheritBuckets(t *testing.T) {
for _, d := range data {
origin := NewRegionInfo(&metapb.Region{Id: 100}, nil, SetBuckets(d.originBuckets))
r := NewRegionInfo(&metapb.Region{Id: 100}, nil)
r.InheritBuckets(origin)
r.Inherit(origin, true)
re.Equal(d.originBuckets, r.GetBuckets())
// region will not inherit bucket keys.
if origin.GetBuckets() != nil {
newRegion := NewRegionInfo(&metapb.Region{Id: 100}, nil)
newRegion.Inherit(origin, false)
re.NotEqual(d.originBuckets, newRegion.GetBuckets())
}
}
Expand Down
9 changes: 9 additions & 0 deletions server/statistics/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,21 @@ func (s *RegionStats) Observe(r *core.RegionInfo) {
s.Count++
approximateKeys := r.GetApproximateKeys()
approximateSize := r.GetApproximateSize()
<<<<<<< HEAD:server/statistics/region.go
if approximateSize == core.EmptyRegionApproximateSize {
s.EmptyCount++
}
if !r.IsEmptyRegion() {
s.StorageSize += approximateSize
}
=======
approximateKvSize := r.GetApproximateKvSize()
if approximateSize <= core.EmptyRegionApproximateSize {
s.EmptyCount++
}
s.StorageSize += approximateSize
s.UserStorageSize += approximateKvSize
>>>>>>> eac55a768 (Revert "statistics: fix empty region count when resuming (#7009)" (#7149)):pkg/statistics/region.go
s.StorageKeys += approximateKeys
leader := r.GetLeader()
if leader != nil {
Expand Down
9 changes: 8 additions & 1 deletion server/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,22 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store
DownPeer: len(region.GetDownPeers()) > 0,
PendingPeer: len(region.GetPendingPeers()) > 0,
LearnerPeer: len(region.GetLearners()) > 0,
EmptyRegion: region.IsEmptyRegion(),
EmptyRegion: region.GetApproximateSize() <= core.EmptyRegionApproximateSize,
OversizedRegion: region.IsOversized(
int64(r.storeConfigManager.GetStoreConfig().GetRegionMaxSize()),
int64(r.storeConfigManager.GetStoreConfig().GetRegionMaxKeys()),
),
UndersizedRegion: region.NeedMerge(
<<<<<<< HEAD:server/statistics/region_collection.go
int64(r.opt.GetMaxMergeRegionSize()),
int64(r.opt.GetMaxMergeRegionKeys()),
) && region.GetApproximateSize() >= core.EmptyRegionApproximateSize,
=======
int64(r.conf.GetMaxMergeRegionSize()),
int64(r.conf.GetMaxMergeRegionKeys()),
),
WitnessLeader: region.GetLeader().GetIsWitness(),
>>>>>>> eac55a768 (Revert "statistics: fix empty region count when resuming (#7009)" (#7149)):pkg/statistics/region_collection.go
}

for typ, c := range conditions {
Expand Down
7 changes: 6 additions & 1 deletion server/statistics/region_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestRegionStatistics(t *testing.T) {
stores[3] = store3
r1 := &metapb.Region{Id: 1, Peers: peers, StartKey: []byte("aa"), EndKey: []byte("bb")}
r2 := &metapb.Region{Id: 2, Peers: peers[0:2], StartKey: []byte("cc"), EndKey: []byte("dd")}
region1 := core.NewRegionInfo(r1, peers[0], core.SetApproximateSize(1))
region1 := core.NewRegionInfo(r1, peers[0])
region2 := core.NewRegionInfo(r2, peers[0])
regionStats := NewRegionStatistics(opt, manager, nil)
regionStats.Observe(region1, stores)
Expand Down Expand Up @@ -103,6 +103,7 @@ func TestRegionStatistics(t *testing.T) {
re.Len(regionStats.stats[PendingPeer], 1)
re.Len(regionStats.stats[LearnerPeer], 1)
re.Len(regionStats.stats[OversizedRegion], 1)
<<<<<<< HEAD:server/statistics/region_collection_test.go
re.Len(regionStats.stats[UndersizedRegion], 0)
re.Len(regionStats.stats[EmptyRegion], 0)
re.Len(regionStats.offlineStats[ExtraPeer], 1)
Expand All @@ -111,6 +112,10 @@ func TestRegionStatistics(t *testing.T) {
re.Len(regionStats.offlineStats[PendingPeer], 1)
re.Len(regionStats.offlineStats[LearnerPeer], 1)
re.Len(regionStats.offlineStats[OfflinePeer], 1)
=======
re.Len(regionStats.stats[UndersizedRegion], 1)
re.Len(regionStats.stats[OfflinePeer], 1)
>>>>>>> eac55a768 (Revert "statistics: fix empty region count when resuming (#7009)" (#7149)):pkg/statistics/region_collection_test.go

region1 = region1.Clone(core.WithRemoveStorePeer(7))
regionStats.Observe(region1, stores[0:3])
Expand Down

0 comments on commit dd6d08d

Please sign in to comment.