Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#7252
Browse files Browse the repository at this point in the history
close tikv#7248

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
rleungx authored and ti-chi-bot committed Jan 10, 2024
1 parent b092996 commit b5e9460
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 7 deletions.
3 changes: 2 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,12 +1447,13 @@ func (c *RaftCluster) checkStores() {
if err := c.ReadyToServe(storeID); err != nil {
log.Error("change store to serving failed",
zap.Stringer("store", store.GetMeta()),
zap.Int("region-count", c.GetTotalRegionCount()),
errs.ZapError(err))
}
} else if c.IsPrepared() {
threshold := c.getThreshold(stores, store)
log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold))
regionSize := float64(store.GetRegionSize())
log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize))
if regionSize >= threshold {
if err := c.ReadyToServe(storeID); err != nil {
log.Error("change store to serving failed",
Expand Down
37 changes: 31 additions & 6 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ import (
"go.uber.org/zap"
)

<<<<<<< HEAD:server/core/region.go
=======
const (
randomRegionMaxRetry = 10
scanRegionLimit = 1000
)

>>>>>>> d651c6b91 (core: batch get region size (#7252)):pkg/core/region.go
// errRegionIsStale is error info for region is stale.
func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error {
return errors.Errorf("region is stale: region %v origin %v", region, origin)
Expand Down Expand Up @@ -1194,13 +1202,30 @@ func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(regio
// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range.
func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 {
var size int64
r.tree.scanRange(startKey, func(region *RegionInfo) bool {
if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 {
return false
for {
r.t.RLock()
var cnt int
r.tree.scanRange(startKey, func(region *RegionInfo) bool {
if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 {
return false
}
if cnt >= scanRegionLimit {
return false
}
cnt++
startKey = region.GetEndKey()
size += region.GetApproximateSize()
return true
})
r.t.RUnlock()
if cnt == 0 {
break
}
size += region.GetApproximateSize()
return true
})
if len(startKey) == 0 {
break
}
}

return size
}

Expand Down
123 changes: 123 additions & 0 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ package core
import (
"fmt"
"math"
<<<<<<< HEAD:server/core/region_test.go

Check failure on line 20 in server/core/region_test.go

View workflow job for this annotation

GitHub Actions / statics

expected 'STRING', found '<<'

Check failure on line 20 in server/core/region_test.go

View workflow job for this annotation

GitHub Actions / statics

expected 'STRING', found '<<'
"math/rand"
=======
mrand "math/rand"
>>>>>>> d651c6b91 (core: batch get region size (#7252)):pkg/core/region_test.go
"strconv"
"strings"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -662,6 +667,124 @@ func BenchmarkRandomRegion(b *testing.B) {
}
}

func BenchmarkRandomSetRegion(b *testing.B) {
regions := NewRegionsInfo()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
items = append(items, region)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
item := items[i%len(items)]
item.approximateKeys = int64(200000)
item.approximateSize = int64(20)
origin, overlaps, rangeChanged := regions.SetRegion(item)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
}
}

func TestGetRegionSizeByRange(t *testing.T) {
regions := NewRegionsInfo()
nums := 1000010
for i := 0; i < nums; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
endKey := []byte(fmt.Sprintf("%20d", i+1))
if i == nums-1 {
endKey = []byte("")
}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: endKey,
}, peer, SetApproximateSize(10))
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
}
totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(""))
require.Equal(t, int64(nums*10), totalSize)
for i := 1; i < 10; i++ {
verifyNum := nums / i
endKey := fmt.Sprintf("%20d", verifyNum)
totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(endKey))
require.Equal(t, int64(verifyNum*10), totalSize)
}
}

func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) {
regions := NewRegionsInfo()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer, SetApproximateSize(10))
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
items = append(items, region)
}
b.ResetTimer()
go func() {
for {
regions.GetRegionSizeByRange([]byte(""), []byte(""))
time.Sleep(time.Millisecond)
}
}()
for i := 0; i < b.N; i++ {
item := items[i%len(items)]
item.approximateKeys = int64(200000)
origin, overlaps, rangeChanged := regions.SetRegion(item)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
}
}

func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) {
regions := NewRegionsInfo()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
items = append(items, region)
}
b.ResetTimer()
go func() {
for {
regions.GetRegionSizeByRange([]byte(""), []byte(""))
time.Sleep(time.Millisecond)
}
}()

b.RunParallel(
func(pb *testing.PB) {
for pb.Next() {
item := items[mrand.Intn(len(items))]
n := item.Clone(SetApproximateSize(20))
origin, overlaps, rangeChanged := regions.SetRegion(n)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
}
},
)
}

const keyLength = 100

func randomBytes(n int) []byte {
Expand Down
9 changes: 9 additions & 0 deletions tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,11 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) {
pdctl.MustPutStore(c, leader.GetServer(), store)
}
for i := 0; i < 100; i++ {
<<<<<<< HEAD
pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10))
=======
tests.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("%20d", i)), []byte(fmt.Sprintf("%20d", i+1)), core.SetApproximateSize(10))
>>>>>>> d651c6b91 (core: batch get region size (#7252))
}
// no store preparing
output := sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound)
Expand All @@ -713,8 +717,13 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) {
c.Assert(p.LeftSeconds, Equals, math.MaxFloat64)

// update size
<<<<<<< HEAD
pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10))
pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40))
=======
tests.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%20d", 1000)), []byte(fmt.Sprintf("%20d", 1001)), core.SetApproximateSize(10))
tests.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%20d", 1001)), []byte(fmt.Sprintf("%20d", 1002)), core.SetApproximateSize(40))
>>>>>>> d651c6b91 (core: batch get region size (#7252))
time.Sleep(2 * time.Second)
output = sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK)
c.Assert(json.Unmarshal(output, &p), IsNil)
Expand Down

0 comments on commit b5e9460

Please sign in to comment.