diff --git a/pkg/core/region.go b/pkg/core/region.go index 2ac323a1272..c9daa69c477 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -41,7 +41,10 @@ import ( "go.uber.org/zap" ) -const randomRegionMaxRetry = 10 +const ( + randomRegionMaxRetry = 10 + scanRegionLimit = 1000 +) // errRegionIsStale is error info for region is stale. func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error { @@ -1610,16 +1613,31 @@ func (r *RegionsInfo) ScanRegionWithIterator(startKey []byte, iterator func(regi // GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 { - r.t.RLock() - defer r.t.RUnlock() var size int64 - r.tree.scanRange(startKey, func(region *RegionInfo) bool { - if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { - return false + for { + r.t.RLock() + var cnt int + r.tree.scanRange(startKey, func(region *RegionInfo) bool { + if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { + return false + } + if cnt >= scanRegionLimit { + return false + } + cnt++ + startKey = region.GetEndKey() + size += region.GetApproximateSize() + return true + }) + r.t.RUnlock() + if cnt == 0 { + break } - size += region.GetApproximateSize() - return true - }) + if len(startKey) == 0 { + break + } + } + return size } diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 50302de920e..508e7aa59aa 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -18,8 +18,10 @@ import ( "crypto/rand" "fmt" "math" + mrand "math/rand" "strconv" "testing" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" @@ -658,6 +660,124 @@ func BenchmarkRandomRegion(b *testing.B) { } } +func BenchmarkRandomSetRegion(b *testing.B) { + regions := NewRegionsInfo() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + items = append(items, region) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + item := items[i%len(items)] + item.approximateKeys = int64(200000) + item.approximateSize = int64(20) + origin, overlaps, rangeChanged := regions.SetRegion(item) + regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + } +} + +func TestGetRegionSizeByRange(t *testing.T) { + regions := NewRegionsInfo() + nums := 1000010 + for i := 0; i < nums; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + endKey := []byte(fmt.Sprintf("%20d", i+1)) + if i == nums-1 { + endKey = []byte("") + } + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: endKey, + }, peer, SetApproximateSize(10)) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + } + totalSize := regions.GetRegionSizeByRange([]byte(""), []byte("")) + require.Equal(t, int64(nums*10), totalSize) + for i := 1; i < 10; i++ { + verifyNum := nums / i + endKey := fmt.Sprintf("%20d", verifyNum) + totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(endKey)) + require.Equal(t, int64(verifyNum*10), totalSize) + } +} + +func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) { + regions := NewRegionsInfo() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer, SetApproximateSize(10)) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + items = append(items, region) + } + b.ResetTimer() + go func() { + for { + regions.GetRegionSizeByRange([]byte(""), []byte("")) + time.Sleep(time.Millisecond) + } + }() + for i := 0; i < b.N; i++ { + item := items[i%len(items)] + item.approximateKeys = int64(200000) + origin, overlaps, rangeChanged := regions.SetRegion(item) + regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + } +} + +func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) { + regions := NewRegionsInfo() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + items = append(items, region) + } + b.ResetTimer() + go func() { + for { + regions.GetRegionSizeByRange([]byte(""), []byte("")) + time.Sleep(time.Millisecond) + } + }() + + b.RunParallel( + func(pb *testing.PB) { + for pb.Next() { + item := items[mrand.Intn(len(items))] + n := item.Clone(SetApproximateSize(20)) + origin, overlaps, rangeChanged := regions.SetRegion(n) + regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + } + }, + ) +} + const keyLength = 100 func randomBytes(n int) []byte { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 25a47a7fca9..8362ee9f331 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1846,12 +1846,13 @@ func (c *RaftCluster) checkStores() { if err := c.ReadyToServe(storeID); err != nil { log.Error("change store to serving failed", zap.Stringer("store", store.GetMeta()), + zap.Int("region-count", c.GetTotalRegionCount()), errs.ZapError(err)) } } else if c.IsPrepared() { threshold := c.getThreshold(stores, store) - log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold)) regionSize := float64(store.GetRegionSize()) + log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize)) if regionSize >= threshold { if err := c.ReadyToServe(storeID); err != nil { log.Error("change store to serving failed", diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index ff430f1b848..04bcdc0d461 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -914,7 +914,7 @@ func TestPreparingProgress(t *testing.T) { tests.MustPutStore(re, cluster, store) } for i := 0; i < 100; i++ { - tests.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10)) + tests.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("%20d", i)), []byte(fmt.Sprintf("%20d", i+1)), core.SetApproximateSize(10)) } // no store preparing output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound) @@ -941,8 +941,8 @@ func TestPreparingProgress(t *testing.T) { re.Equal(math.MaxFloat64, p.LeftSeconds) // update size - tests.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10)) - tests.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40)) + tests.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%20d", 1000)), []byte(fmt.Sprintf("%20d", 1001)), core.SetApproximateSize(10)) + tests.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%20d", 1001)), []byte(fmt.Sprintf("%20d", 1002)), core.SetApproximateSize(40)) time.Sleep(2 * time.Second) output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK) re.NoError(json.Unmarshal(output, &p))