From 00521761013139efe8109c95ffaad2b857402ac2 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 25 Oct 2023 17:48:37 +0800 Subject: [PATCH 1/5] batch get region size Signed-off-by: Ryan Leung --- pkg/core/region.go | 35 ++++++++++++++++++++++++++--------- server/cluster/cluster.go | 5 +++-- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 6875844b7a6..4dcf8c65c26 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1609,17 +1609,34 @@ func (r *RegionsInfo) ScanRegionWithIterator(startKey []byte, iterator func(regi } // GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. -func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 { - r.t.RLock() - defer r.t.RUnlock() +func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte, limit int) int64 { var size int64 - r.tree.scanRange(startKey, func(region *RegionInfo) bool { - if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { - return false + for { + r.t.RLock() + var cnt int + r.tree.scanRange(startKey, func(region *RegionInfo) bool { + if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { + startKey = region.GetEndKey() + return false + } + if limit > 0 && cnt >= limit { + startKey = region.GetEndKey() + return false + } + cnt++ + size += region.GetApproximateSize() + return true + }) + r.t.RUnlock() + if cnt == 0 { + break } - size += region.GetApproximateSize() - return true - }) + + if len(startKey) == 0 { + break + } + } + return size } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 25a47a7fca9..9b8f72f8e1c 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -108,6 +108,7 @@ const ( // minSnapshotDurationSec is the minimum duration that a store can tolerate. // It should enlarge the limiter if the snapshot's duration is less than this value. minSnapshotDurationSec = 5 + scanRegionLimit = 1000 ) // Server is the interface for cluster. @@ -1908,7 +1909,7 @@ func (c *RaftCluster) checkStores() { func (c *RaftCluster) getThreshold(stores []*core.StoreInfo, store *core.StoreInfo) float64 { start := time.Now() if !c.opt.IsPlacementRulesEnabled() { - regionSize := c.core.GetRegionSizeByRange([]byte(""), []byte("")) * int64(c.opt.GetMaxReplicas()) + regionSize := c.core.GetRegionSizeByRange([]byte(""), []byte(""), scanRegionLimit) * int64(c.opt.GetMaxReplicas()) weight := getStoreTopoWeight(store, stores, c.opt.GetLocationLabels(), c.opt.GetMaxReplicas()) return float64(regionSize) * weight * 0.9 } @@ -1948,7 +1949,7 @@ func (c *RaftCluster) calculateRange(stores []*core.StoreInfo, store *core.Store matchStores = append(matchStores, s) } } - regionSize := c.core.GetRegionSizeByRange(startKey, endKey) * int64(rule.Count) + regionSize := c.core.GetRegionSizeByRange(startKey, endKey, scanRegionLimit) * int64(rule.Count) weight := getStoreTopoWeight(store, matchStores, rule.LocationLabels, rule.Count) storeSize += float64(regionSize) * weight log.Debug("calculate range result", From 8073cb9cd4b80bf3fd8995cf4a901f1a18b8fae6 Mon Sep 17 00:00:00 2001 From: nolouch Date: Tue, 7 Nov 2023 17:47:36 +0800 Subject: [PATCH 2/5] add benchmark Signed-off-by: nolouch --- pkg/core/region.go | 9 ++-- pkg/core/region_test.go | 93 +++++++++++++++++++++++++++++++++++++++ server/cluster/cluster.go | 5 +-- 3 files changed, 101 insertions(+), 6 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 4dcf8c65c26..2f2d2ad48c3 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -41,7 +41,10 @@ import ( "go.uber.org/zap" ) -const randomRegionMaxRetry = 10 +const ( + randomRegionMaxRetry = 10 + scanRegionLimit = 1000 +) // errRegionIsStale is error info for region is stale. func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error { @@ -1609,7 +1612,7 @@ func (r *RegionsInfo) ScanRegionWithIterator(startKey []byte, iterator func(regi } // GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. -func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte, limit int) int64 { +func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 { var size int64 for { r.t.RLock() @@ -1619,7 +1622,7 @@ func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte, limit int) i startKey = region.GetEndKey() return false } - if limit > 0 && cnt >= limit { + if cnt >= scanRegionLimit { startKey = region.GetEndKey() return false } diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 50302de920e..3928c774fbd 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -18,8 +18,10 @@ import ( "crypto/rand" "fmt" "math" + mrand "math/rand" "strconv" "testing" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" @@ -658,6 +660,97 @@ func BenchmarkRandomRegion(b *testing.B) { } } +func BenchmarkRandomSetRegion(b *testing.B) { + regions := NewRegionsInfo() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + items = append(items, region) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + item := items[i%len(items)] + item.approximateKeys = int64(200000) + item.approximateSize = int64(20) + origin, overlaps, rangeChanged := regions.SetRegion(item) + regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + } +} + +func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) { + regions := NewRegionsInfo() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + items = append(items, region) + } + b.ResetTimer() + go func() { + for { + regions.GetRegionSizeByRange([]byte(""), []byte("")) + time.Sleep(time.Millisecond) + } + }() + for i := 0; i < b.N; i++ { + item := items[i%len(items)] + item.approximateKeys = int64(200000) + item.approximateSize = int64(20) + origin, overlaps, rangeChanged := regions.SetRegion(item) + regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + } +} + +func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) { + regions := NewRegionsInfo() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + items = append(items, region) + } + b.ResetTimer() + go func() { + for { + regions.GetRegionSizeByRange([]byte(""), []byte("")) + time.Sleep(time.Millisecond) + } + }() + + b.RunParallel( + func(pb *testing.PB) { + for pb.Next() { + item := items[mrand.Intn(len(items))] + n := item.Clone(SetApproximateSize(20)) + origin, overlaps, rangeChanged := regions.SetRegion(n) + regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + } + }, + ) +} + const keyLength = 100 func randomBytes(n int) []byte { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 9b8f72f8e1c..25a47a7fca9 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -108,7 +108,6 @@ const ( // minSnapshotDurationSec is the minimum duration that a store can tolerate. // It should enlarge the limiter if the snapshot's duration is less than this value. minSnapshotDurationSec = 5 - scanRegionLimit = 1000 ) // Server is the interface for cluster. @@ -1909,7 +1908,7 @@ func (c *RaftCluster) checkStores() { func (c *RaftCluster) getThreshold(stores []*core.StoreInfo, store *core.StoreInfo) float64 { start := time.Now() if !c.opt.IsPlacementRulesEnabled() { - regionSize := c.core.GetRegionSizeByRange([]byte(""), []byte(""), scanRegionLimit) * int64(c.opt.GetMaxReplicas()) + regionSize := c.core.GetRegionSizeByRange([]byte(""), []byte("")) * int64(c.opt.GetMaxReplicas()) weight := getStoreTopoWeight(store, stores, c.opt.GetLocationLabels(), c.opt.GetMaxReplicas()) return float64(regionSize) * weight * 0.9 } @@ -1949,7 +1948,7 @@ func (c *RaftCluster) calculateRange(stores []*core.StoreInfo, store *core.Store matchStores = append(matchStores, s) } } - regionSize := c.core.GetRegionSizeByRange(startKey, endKey, scanRegionLimit) * int64(rule.Count) + regionSize := c.core.GetRegionSizeByRange(startKey, endKey) * int64(rule.Count) weight := getStoreTopoWeight(store, matchStores, rule.LocationLabels, rule.Count) storeSize += float64(regionSize) * weight log.Debug("calculate range result", From 2a8ae9c130ed6aea86c273ffa0d3275b28d76908 Mon Sep 17 00:00:00 2001 From: nolouch Date: Tue, 7 Nov 2023 18:22:30 +0800 Subject: [PATCH 3/5] fix Signed-off-by: nolouch --- pkg/core/region.go | 6 ++---- pkg/core/region_test.go | 31 +++++++++++++++++++++++++++++-- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 2f2d2ad48c3..1a0a74abc0d 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1619,14 +1619,13 @@ func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 { var cnt int r.tree.scanRange(startKey, func(region *RegionInfo) bool { if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { - startKey = region.GetEndKey() return false } if cnt >= scanRegionLimit { - startKey = region.GetEndKey() return false } cnt++ + startKey = region.GetEndKey() size += region.GetApproximateSize() return true }) @@ -1634,8 +1633,7 @@ func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 { if cnt == 0 { break } - - if len(startKey) == 0 { + if len(startKey) == 0 || (len(endKey) > 0 && bytes.Compare(startKey, endKey) >= 0) { break } } diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 3928c774fbd..0663c3144ca 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -685,6 +685,34 @@ func BenchmarkRandomSetRegion(b *testing.B) { } } +func TestGetRegionSizeByRange(t *testing.T) { + regions := NewRegionsInfo() + nums := 1000010 + for i := 0; i < nums; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + endKey := []byte(fmt.Sprintf("%20d", i+1)) + if i == nums-1 { + endKey = []byte("") + } + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: endKey, + }, peer, SetApproximateSize(10)) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + } + totalSize := regions.GetRegionSizeByRange([]byte(""), []byte("")) + require.Equal(t, int64(nums*10), totalSize) + for i := 1; i < 10; i++ { + verifyNum := int(nums / i) + endKey := fmt.Sprintf("%20d", verifyNum) + totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(endKey)) + require.Equal(t, int64(verifyNum*10), totalSize) + } +} + func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) { regions := NewRegionsInfo() var items []*RegionInfo @@ -695,7 +723,7 @@ func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) { Peers: []*metapb.Peer{peer}, StartKey: []byte(fmt.Sprintf("%20d", i)), EndKey: []byte(fmt.Sprintf("%20d", i+1)), - }, peer) + }, peer, SetApproximateSize(10)) origin, overlaps, rangeChanged := regions.SetRegion(region) regions.UpdateSubTree(region, origin, overlaps, rangeChanged) items = append(items, region) @@ -710,7 +738,6 @@ func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) { for i := 0; i < b.N; i++ { item := items[i%len(items)] item.approximateKeys = int64(200000) - item.approximateSize = int64(20) origin, overlaps, rangeChanged := regions.SetRegion(item) regions.UpdateSubTree(item, origin, overlaps, rangeChanged) } From 79a6904d1e5264237da34b1d8aa5db13012b9a0a Mon Sep 17 00:00:00 2001 From: nolouch Date: Tue, 7 Nov 2023 19:36:22 +0800 Subject: [PATCH 4/5] lint Signed-off-by: nolouch --- pkg/core/region.go | 2 +- pkg/core/region_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 1a0a74abc0d..44323c94ddc 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1633,7 +1633,7 @@ func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 { if cnt == 0 { break } - if len(startKey) == 0 || (len(endKey) > 0 && bytes.Compare(startKey, endKey) >= 0) { + if len(startKey) == 0 { break } } diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 0663c3144ca..508e7aa59aa 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -706,7 +706,7 @@ func TestGetRegionSizeByRange(t *testing.T) { totalSize := regions.GetRegionSizeByRange([]byte(""), []byte("")) require.Equal(t, int64(nums*10), totalSize) for i := 1; i < 10; i++ { - verifyNum := int(nums / i) + verifyNum := nums / i endKey := fmt.Sprintf("%20d", verifyNum) totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(endKey)) require.Equal(t, int64(verifyNum*10), totalSize) From 1dbb94f2ee09a85cdabc6879e528ad44fb23d1b8 Mon Sep 17 00:00:00 2001 From: nolouch Date: Wed, 8 Nov 2023 03:08:29 +0800 Subject: [PATCH 5/5] fix test Signed-off-by: nolouch --- server/cluster/cluster.go | 3 ++- tests/server/api/api_test.go | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 25a47a7fca9..8362ee9f331 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1846,12 +1846,13 @@ func (c *RaftCluster) checkStores() { if err := c.ReadyToServe(storeID); err != nil { log.Error("change store to serving failed", zap.Stringer("store", store.GetMeta()), + zap.Int("region-count", c.GetTotalRegionCount()), errs.ZapError(err)) } } else if c.IsPrepared() { threshold := c.getThreshold(stores, store) - log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold)) regionSize := float64(store.GetRegionSize()) + log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize)) if regionSize >= threshold { if err := c.ReadyToServe(storeID); err != nil { log.Error("change store to serving failed", diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index ff430f1b848..04bcdc0d461 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -914,7 +914,7 @@ func TestPreparingProgress(t *testing.T) { tests.MustPutStore(re, cluster, store) } for i := 0; i < 100; i++ { - tests.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10)) + tests.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("%20d", i)), []byte(fmt.Sprintf("%20d", i+1)), core.SetApproximateSize(10)) } // no store preparing output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound) @@ -941,8 +941,8 @@ func TestPreparingProgress(t *testing.T) { re.Equal(math.MaxFloat64, p.LeftSeconds) // update size - tests.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10)) - tests.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40)) + tests.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%20d", 1000)), []byte(fmt.Sprintf("%20d", 1001)), core.SetApproximateSize(10)) + tests.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%20d", 1001)), []byte(fmt.Sprintf("%20d", 1002)), core.SetApproximateSize(40)) time.Sleep(2 * time.Second) output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK) re.NoError(json.Unmarshal(output, &p))