diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 07636bed074..3928987937b 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -35,7 +35,9 @@ type Cluster interface { func HandleStatsAsync(c Cluster, region *core.RegionInfo) { c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) - c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetWriteLoads())) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetPeers(), region.GetWriteLoads(), interval)) c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 96309d20270..bfabc4660ad 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -9,6 +9,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" @@ -442,7 +443,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, []*metapb.Peer{peer}, loads, interval)) } // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 95f42ebc5e5..3f9710c48fd 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -894,7 +894,9 @@ func (mc *Cluster) CheckRegionRead(region *core.RegionInfo) []*statistics.HotPee items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredReadItems(region) items = append(items, expiredItems...) - return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + return append(items, mc.HotCache.CheckReadPeerSync(region, region.GetPeers(), region.GetLoads(), interval)...) } // CheckRegionWrite checks region write info with all peers @@ -902,7 +904,9 @@ func (mc *Cluster) CheckRegionWrite(region *core.RegionInfo) []*statistics.HotPe items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredWriteItems(region) items = append(items, expiredItems...) - return append(items, mc.HotCache.CheckWritePeerSync(region, nil)...) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + return append(items, mc.HotCache.CheckWritePeerSync(region, region.GetPeers(), region.GetLoads(), interval)...) } // CheckRegionLeaderRead checks region read info with leader peer @@ -910,7 +914,9 @@ func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics. items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredReadItems(region) items = append(items, expiredItems...) - return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + return append(items, mc.HotCache.CheckReadPeerSync(region, []*metapb.Peer{region.GetLeader()}, region.GetLoads(), interval)...) } // ObserveRegionsStats records the current stores stats from region stats. diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index fd7b67ae9a3..26548c8b47e 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -17,6 +17,7 @@ package statistics import ( "context" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/smallnest/chanx" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/statistics/utils" @@ -172,14 +173,14 @@ func (w *HotCache) Update(item *HotPeerStat, kind utils.RWType) { // CheckWritePeerSync checks the write status, returns update items. // This is used for mockcluster, for test purpose. -func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat { - return w.writeCache.checkPeerFlow(region, region.GetPeers(), loads) +func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) []*HotPeerStat { + return w.writeCache.checkPeerFlow(region, peers, loads, interval) } // CheckReadPeerSync checks the read status, returns update items. // This is used for mockcluster, for test purpose. -func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat { - return w.readCache.checkPeerFlow(region, region.GetPeers(), loads) +func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) []*HotPeerStat { + return w.readCache.checkPeerFlow(region, peers, loads, interval) } // ExpiredReadItems returns the read items which are already expired. diff --git a/pkg/statistics/hot_cache_task.go b/pkg/statistics/hot_cache_task.go index 962835d9089..01968bf17e8 100644 --- a/pkg/statistics/hot_cache_task.go +++ b/pkg/statistics/hot_cache_task.go @@ -17,6 +17,7 @@ package statistics import ( "context" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/core" ) @@ -27,19 +28,23 @@ type FlowItemTask interface { type checkPeerTask struct { regionInfo *core.RegionInfo + peers []*metapb.Peer loads []float64 + interval uint64 } // NewCheckPeerTask creates task to update peerInfo -func NewCheckPeerTask(regionInfo *core.RegionInfo, loads []float64) FlowItemTask { +func NewCheckPeerTask(regionInfo *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) FlowItemTask { return &checkPeerTask{ regionInfo: regionInfo, + peers: peers, loads: loads, + interval: interval, } } func (t *checkPeerTask) runTask(cache *hotPeerCache) { - stats := cache.checkPeerFlow(t.regionInfo, t.regionInfo.GetPeers(), t.loads) + stats := cache.checkPeerFlow(t.regionInfo, t.peers, t.loads, t.interval) for _, stat := range stats { cache.updateStat(stat) } diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index f1a9ce2cc1d..89d6ee254ac 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -174,19 +174,15 @@ func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerSt // checkPeerFlow checks the flow information of a peer. // Notice: checkPeerFlow couldn't be used concurrently. // checkPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. -func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64) []*HotPeerStat { - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() +func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64, interval uint64) []*HotPeerStat { if Denoising && interval < HotRegionReportMinInterval { // for test or simulator purpose return nil } - if deltaLoads == nil { - deltaLoads = region.GetLoads() - } f.collectPeerMetrics(deltaLoads, interval) // update metrics regionID := region.GetID() + regionPeers := region.GetPeers() var stats []*HotPeerStat for _, peer := range peers { storeID := peer.GetStoreId() @@ -218,12 +214,12 @@ func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Pe newItem := &HotPeerStat{ StoreID: storeID, RegionID: regionID, - Loads: f.kind.GetLoadRatesFromPeer(peer, deltaLoads, interval), + Loads: f.kind.GetLoadRates(deltaLoads, interval), isLeader: region.GetLeader().GetStoreId() == storeID, actionType: utils.Update, - stores: make([]uint64, len(peers)), + stores: make([]uint64, len(regionPeers)), } - for i, peer := range peers { + for i, peer := range regionPeers { newItem.stores[i] = peer.GetStoreId() } if oldItem == nil { diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index 50c6b3c961e..1ddf9b81e57 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -106,8 +106,10 @@ func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer } func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) { + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() res = append(res, cache.collectExpiredItems(region)...) - return append(res, cache.checkPeerFlow(region, peers, region.GetLoads())...) + return append(res, cache.checkPeerFlow(region, peers, region.GetLoads(), interval)...) } func updateFlow(cache *hotPeerCache, res []*HotPeerStat) []*HotPeerStat { @@ -309,56 +311,55 @@ func TestUpdateHotPeerStat(t *testing.T) { }() // skip interval=0 - region = region.Clone(core.SetReportInterval(0, 0)) + interval := uint64(0) deltaLoads := []float64{0.0, 0.0, 0.0} utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 - newItem := cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem := cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Nil(newItem) // new peer, interval is larger than report interval, but no hot - region = region.Clone(core.SetReportInterval(10, 0)) + interval = 10 deltaLoads = []float64{0.0, 0.0, 0.0} utils.MinHotThresholds[utils.RegionReadBytes] = 1.0 utils.MinHotThresholds[utils.RegionReadKeys] = 1.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 1.0 - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Nil(newItem) // new peer, interval is less than report interval - region = region.Clone(core.SetReportInterval(4, 0)) + interval = 4 deltaLoads = []float64{60.0, 60.0, 60.0} utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.NotNil(newItem) re.Equal(0, newItem[0].HotDegree) re.Equal(0, newItem[0].AntiCount) // sum of interval is less than report interval - region = region.Clone(core.SetReportInterval(4, 0)) deltaLoads = []float64{60.0, 60.0, 60.0} cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(0, newItem[0].HotDegree) re.Equal(0, newItem[0].AntiCount) // sum of interval is larger than report interval, and hot newItem[0].AntiCount = utils.Read.DefaultAntiCount() cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(1, newItem[0].HotDegree) re.Equal(2*m, newItem[0].AntiCount) // sum of interval is less than report interval cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(1, newItem[0].HotDegree) re.Equal(2*m, newItem[0].AntiCount) // sum of interval is larger than report interval, and hot - region = region.Clone(core.SetReportInterval(10, 0)) + interval = 10 cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(2, newItem[0].HotDegree) re.Equal(2*m, newItem[0].AntiCount) // sum of interval is larger than report interval, and cold @@ -366,13 +367,13 @@ func TestUpdateHotPeerStat(t *testing.T) { utils.MinHotThresholds[utils.RegionReadKeys] = 10.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 10.0 cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(1, newItem[0].HotDegree) re.Equal(2*m-1, newItem[0].AntiCount) // sum of interval is larger than report interval, and cold for i := 0; i < 2*m-1; i++ { cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) } re.Less(newItem[0].HotDegree, 0) re.Equal(0, newItem[0].AntiCount) @@ -679,7 +680,7 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { StartTimestamp: start, EndTimestamp: end, })) - stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), nil) + stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), newRegion.GetLoads(), end-start) for _, stat := range stats { cache.updateStat(stat) } @@ -709,6 +710,9 @@ func BenchmarkCheckRegionFlow(b *testing.B) { region := buildRegion(utils.Read, 3, 10) b.ResetTimer() for i := 0; i < b.N; i++ { - cache.checkPeerFlow(region, region.GetPeers(), nil) + stats := cache.checkPeerFlow(region, region.GetPeers(), region.GetLoads(), 10) + for _, stat := range stats { + cache.updateStat(stat) + } } } diff --git a/pkg/statistics/utils/kind.go b/pkg/statistics/utils/kind.go index 463b35c450a..089732f759f 100644 --- a/pkg/statistics/utils/kind.go +++ b/pkg/statistics/utils/kind.go @@ -14,10 +14,6 @@ package utils -import ( - "github.com/pingcap/kvproto/pkg/metapb" -) - const ( // BytePriority indicates hot-region-scheduler prefer byte dim BytePriority = "byte" @@ -230,8 +226,8 @@ func (rw RWType) DefaultAntiCount() int { } } -// GetLoadRatesFromPeer gets the load rates of the read or write type from PeerInfo. -func (rw RWType) GetLoadRatesFromPeer(peer *metapb.Peer, deltaLoads []float64, interval uint64) []float64 { +// GetLoadRates gets the load rates of the read or write type. +func (rw RWType) GetLoadRates(deltaLoads []float64, interval uint64) []float64 { loads := make([]float64, DimLen) for dim, k := range rw.RegionStats() { loads[dim] = deltaLoads[k] / float64(interval) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 870e2495dc3..f747b655815 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -959,7 +959,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, []*metapb.Peer{peer}, loads, interval)) } } for _, stat := range stats.GetSnapshotStats() { diff --git a/tools/pd-ctl/tests/hot/hot_test.go b/tools/pd-ctl/tests/hot/hot_test.go index c06e42edf60..f47936bd515 100644 --- a/tools/pd-ctl/tests/hot/hot_test.go +++ b/tools/pd-ctl/tests/hot/hot_test.go @@ -191,7 +191,7 @@ func (suite *hotTestSuite) checkHot(cluster *pdTests.TestCluster) { region := core.NewRegionInfo(&metapb.Region{ Id: hotRegionID, }, leader) - hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) + hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, []*metapb.Peer{leader}, loads, reportInterval)) testutil.Eventually(re, func() bool { hotPeerStat := getHotPeerStat(utils.Read, hotRegionID, hotStoreID) return hotPeerStat != nil