diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 19132757751..efc6be2ae81 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1453,8 +1453,8 @@ func (c *RaftCluster) checkStores() { } } else if c.IsPrepared() { threshold := c.getThreshold(stores, store) - log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold)) regionSize := float64(store.GetRegionSize()) + log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize)) if regionSize >= threshold { if err := c.ReadyToServe(storeID); err != nil { log.Error("change store to serving failed", diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index eee97c11d11..d82852c6bca 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -26,6 +26,8 @@ import ( "go.uber.org/zap" ) +var scanRegionLimit = 1000 + // BasicCluster provides basic data member and interface for a tikv cluster. type BasicCluster struct { Stores struct { @@ -384,9 +386,32 @@ func (bc *BasicCluster) GetOverlaps(region *RegionInfo) []*RegionInfo { // GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. func (bc *BasicCluster) GetRegionSizeByRange(startKey, endKey []byte) int64 { - bc.Regions.mu.RLock() - defer bc.Regions.mu.RUnlock() - return bc.Regions.GetRegionSizeByRange(startKey, endKey) + var size int64 + for { + bc.Regions.mu.RLock() + var cnt int + bc.Regions.tree.scanRange(startKey, func(region *RegionInfo) bool { + if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { + return false + } + if cnt >= scanRegionLimit { + return false + } + cnt++ + startKey = region.GetEndKey() + size += region.GetApproximateSize() + return true + }) + bc.Regions.mu.RUnlock() + if cnt == 0 { + break + } + if len(startKey) == 0 { + break + } + } + + return size } func (bc *BasicCluster) getWriteRate( diff --git a/server/core/region.go b/server/core/region.go index e43ddd032ad..f97c6aea7e6 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -1191,19 +1191,6 @@ func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(regio r.tree.scanRange(startKey, iterator) } -// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. -func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 { - var size int64 - r.tree.scanRange(startKey, func(region *RegionInfo) bool { - if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { - return false - } - size += region.GetApproximateSize() - return true - }) - return size -} - // GetAdjacentRegions returns region's info that is adjacent with specific region func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) { p, n := r.tree.getAdjacentRegions(region) diff --git a/server/core/region_test.go b/server/core/region_test.go index c1ed83b7f46..8717b5d6be3 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -21,10 +21,12 @@ import ( "strconv" "strings" "testing" + "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/server/id" ) @@ -662,6 +664,117 @@ func BenchmarkRandomRegion(b *testing.B) { } } +func BenchmarkRandomSetRegion(b *testing.B) { + cluster := NewBasicCluster() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer) + cluster.Regions.SetRegion(region) + items = append(items, region) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + item := items[i%len(items)] + item.approximateKeys = int64(200000) + item.approximateSize = int64(20) + cluster.Regions.SetRegion(item) + } +} + +func TestGetRegionSizeByRange(t *testing.T) { + cluster := NewBasicCluster() + nums := 1000010 + for i := 0; i < nums; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + endKey := []byte(fmt.Sprintf("%20d", i+1)) + if i == nums-1 { + endKey = []byte("") + } + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: endKey, + }, peer, SetApproximateSize(10)) + cluster.Regions.SetRegion(region) + } + totalSize := cluster.GetRegionSizeByRange([]byte(""), []byte("")) + require.Equal(t, int64(nums*10), totalSize) + for i := 1; i < 10; i++ { + verifyNum := nums / i + endKey := fmt.Sprintf("%20d", verifyNum) + totalSize := cluster.GetRegionSizeByRange([]byte(""), []byte(endKey)) + require.Equal(t, int64(verifyNum*10), totalSize) + } +} + +func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) { + cluster := NewBasicCluster() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer, SetApproximateSize(10)) + cluster.Regions.SetRegion(region) + items = append(items, region) + } + b.ResetTimer() + go func() { + for { + cluster.GetRegionSizeByRange([]byte(""), []byte("")) + time.Sleep(time.Millisecond) + } + }() + for i := 0; i < b.N; i++ { + item := items[i%len(items)] + item.approximateKeys = int64(200000) + cluster.Regions.SetRegion(item) + } +} + +func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) { + cluster := NewBasicCluster() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer) + cluster.Regions.SetRegion(region) + items = append(items, region) + } + b.ResetTimer() + go func() { + for { + cluster.GetRegionSizeByRange([]byte(""), []byte("")) + time.Sleep(time.Millisecond) + } + }() + + b.RunParallel( + func(pb *testing.PB) { + for pb.Next() { + item := items[rand.Intn(len(items))] + n := item.Clone(SetApproximateSize(20)) + cluster.Regions.SetRegion(n) + } + }, + ) +} + const keyLength = 100 func randomBytes(n int) []byte { diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 8102532102f..ab96e312962 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -686,7 +686,7 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) { pdctl.MustPutStore(c, leader.GetServer(), store) } for i := 0; i < 100; i++ { - pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10)) + pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("%20d", i)), []byte(fmt.Sprintf("%20d", i+1)), core.SetApproximateSize(10)) } // no store preparing output := sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound) @@ -713,8 +713,8 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) { c.Assert(p.LeftSeconds, Equals, math.MaxFloat64) // update size - pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10)) - pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40)) + pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%20d", 1000)), []byte(fmt.Sprintf("%20d", 1001)), core.SetApproximateSize(10)) + pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%20d", 1001)), []byte(fmt.Sprintf("%20d", 1002)), core.SetApproximateSize(40)) time.Sleep(2 * time.Second) output = sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK) c.Assert(json.Unmarshal(output, &p), IsNil)