From 52e168119752ca0a95341593ec397629a3b56107 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 24 May 2024 15:32:47 +0800 Subject: [PATCH 1/7] fix Signed-off-by: Ryan Leung --- pkg/cluster/cluster.go | 7 +- pkg/core/peer.go | 31 -------- pkg/mcs/scheduling/server/cluster.go | 3 +- pkg/mock/mockcluster/mockcluster.go | 32 +------- pkg/statistics/hot_cache.go | 8 +- pkg/statistics/hot_cache_task.go | 10 +-- pkg/statistics/hot_peer_cache.go | 95 ++++++++++++----------- pkg/statistics/hot_peer_cache_test.go | 104 ++++++++++---------------- pkg/statistics/utils/kind.go | 6 +- server/cluster/cluster.go | 3 +- tools/pd-ctl/tests/hot/hot_test.go | 3 +- 11 files changed, 111 insertions(+), 191 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 8bd2616f41f..07636bed074 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -35,12 +35,7 @@ type Cluster interface { func HandleStatsAsync(c Cluster, region *core.RegionInfo) { c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) - c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) - } + c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetWriteLoads())) c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) } diff --git a/pkg/core/peer.go b/pkg/core/peer.go index 659886e6d39..1f888ba58eb 100644 --- a/pkg/core/peer.go +++ b/pkg/core/peer.go @@ -77,34 +77,3 @@ func CountInJointState(peers ...*metapb.Peer) int { } return count } - -// PeerInfo provides peer information -type PeerInfo struct { - *metapb.Peer - loads []float64 - interval uint64 -} - -// NewPeerInfo creates PeerInfo -func NewPeerInfo(meta *metapb.Peer, loads []float64, interval uint64) *PeerInfo { - return &PeerInfo{ - Peer: meta, - loads: loads, - interval: interval, - } -} - -// GetLoads provides loads -func (p *PeerInfo) GetLoads() []float64 { - return p.loads -} - -// GetPeerID provides peer id -func (p *PeerInfo) GetPeerID() uint64 { - return p.GetId() -} - -// GetInterval returns reporting interval -func (p *PeerInfo) GetInterval() uint64 { - return p.interval -} diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index c6c365b03ad..96309d20270 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -442,8 +442,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - peerInfo := core.NewPeerInfo(peer, loads, interval) - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) } // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index e5b3e39a502..95f42ebc5e5 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -894,16 +894,7 @@ func (mc *Cluster) CheckRegionRead(region *core.RegionInfo) []*statistics.HotPee items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredReadItems(region) items = append(items, expiredItems...) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := mc.HotCache.CheckReadPeerSync(peerInfo, region) - if item != nil { - items = append(items, item) - } - } - return items + return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...) } // CheckRegionWrite checks region write info with all peers @@ -911,16 +902,7 @@ func (mc *Cluster) CheckRegionWrite(region *core.RegionInfo) []*statistics.HotPe items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredWriteItems(region) items = append(items, expiredItems...) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := mc.HotCache.CheckWritePeerSync(peerInfo, region) - if item != nil { - items = append(items, item) - } - } - return items + return append(items, mc.HotCache.CheckWritePeerSync(region, nil)...) } // CheckRegionLeaderRead checks region read info with leader peer @@ -928,15 +910,7 @@ func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics. items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredReadItems(region) items = append(items, expiredItems...) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - peer := region.GetLeader() - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := mc.HotCache.CheckReadPeerSync(peerInfo, region) - if item != nil { - items = append(items, item) - } - return items + return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...) } // ObserveRegionsStats records the current stores stats from region stats. diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index 799fb240d10..fd7b67ae9a3 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -172,14 +172,14 @@ func (w *HotCache) Update(item *HotPeerStat, kind utils.RWType) { // CheckWritePeerSync checks the write status, returns update items. // This is used for mockcluster, for test purpose. -func (w *HotCache) CheckWritePeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - return w.writeCache.checkPeerFlow(peer, region) +func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat { + return w.writeCache.checkPeerFlow(region, region.GetPeers(), loads) } // CheckReadPeerSync checks the read status, returns update items. // This is used for mockcluster, for test purpose. -func (w *HotCache) CheckReadPeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - return w.readCache.checkPeerFlow(peer, region) +func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat { + return w.readCache.checkPeerFlow(region, region.GetPeers(), loads) } // ExpiredReadItems returns the read items which are already expired. diff --git a/pkg/statistics/hot_cache_task.go b/pkg/statistics/hot_cache_task.go index fa224b522ff..962835d9089 100644 --- a/pkg/statistics/hot_cache_task.go +++ b/pkg/statistics/hot_cache_task.go @@ -26,21 +26,21 @@ type FlowItemTask interface { } type checkPeerTask struct { - peerInfo *core.PeerInfo regionInfo *core.RegionInfo + loads []float64 } // NewCheckPeerTask creates task to update peerInfo -func NewCheckPeerTask(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) FlowItemTask { +func NewCheckPeerTask(regionInfo *core.RegionInfo, loads []float64) FlowItemTask { return &checkPeerTask{ - peerInfo: peerInfo, regionInfo: regionInfo, + loads: loads, } } func (t *checkPeerTask) runTask(cache *hotPeerCache) { - stat := cache.checkPeerFlow(t.peerInfo, t.regionInfo) - if stat != nil { + stats := cache.checkPeerFlow(t.regionInfo, t.regionInfo.GetPeers(), t.loads) + for _, stat := range stats { cache.updateStat(stat) } } diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index cd27dcad4c8..f1a9ce2cc1d 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -174,58 +174,69 @@ func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerSt // checkPeerFlow checks the flow information of a peer. // Notice: checkPeerFlow couldn't be used concurrently. // checkPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. -func (f *hotPeerCache) checkPeerFlow(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - interval := peer.GetInterval() +func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64) []*HotPeerStat { + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() if Denoising && interval < HotRegionReportMinInterval { // for test or simulator purpose return nil } - storeID := peer.GetStoreId() - deltaLoads := peer.GetLoads() + + if deltaLoads == nil { + deltaLoads = region.GetLoads() + } f.collectPeerMetrics(deltaLoads, interval) // update metrics regionID := region.GetID() - oldItem := f.getOldHotPeerStat(regionID, storeID) - - // check whether the peer is allowed to be inherited - source := utils.Direct - if oldItem == nil { - for _, storeID := range f.getAllStoreIDs(region) { - oldItem = f.getOldHotPeerStat(regionID, storeID) - if oldItem != nil && oldItem.allowInherited { - source = utils.Inherit - break + + var stats []*HotPeerStat + for _, peer := range peers { + storeID := peer.GetStoreId() + oldItem := f.getOldHotPeerStat(regionID, storeID) + + // check whether the peer is allowed to be inherited + source := utils.Direct + if oldItem == nil { + for _, storeID := range f.getAllStoreIDs(region) { + oldItem = f.getOldHotPeerStat(regionID, storeID) + if oldItem != nil && oldItem.allowInherited { + source = utils.Inherit + break + } } } - } - - // check new item whether is hot - if oldItem == nil { - regionStats := f.kind.RegionStats() - thresholds := f.calcHotThresholds(storeID) - isHot := slice.AnyOf(regionStats, func(i int) bool { - return deltaLoads[regionStats[i]]/float64(interval) >= thresholds[i] - }) - if !isHot { - return nil + // check new item whether is hot + if oldItem == nil { + regionStats := f.kind.RegionStats() + thresholds := f.calcHotThresholds(storeID) + isHot := slice.AnyOf(regionStats, func(i int) bool { + return deltaLoads[regionStats[i]]/float64(interval) >= thresholds[i] + }) + if !isHot { + continue + } } - } - peers := region.GetPeers() - newItem := &HotPeerStat{ - StoreID: storeID, - RegionID: regionID, - Loads: f.kind.GetLoadRatesFromPeer(peer), - isLeader: region.GetLeader().GetStoreId() == storeID, - actionType: utils.Update, - stores: make([]uint64, len(peers)), - } - for i, peer := range peers { - newItem.stores[i] = peer.GetStoreId() - } - - if oldItem == nil { - return f.updateNewHotPeerStat(newItem, deltaLoads, time.Duration(interval)*time.Second) + newItem := &HotPeerStat{ + StoreID: storeID, + RegionID: regionID, + Loads: f.kind.GetLoadRatesFromPeer(peer, deltaLoads, interval), + isLeader: region.GetLeader().GetStoreId() == storeID, + actionType: utils.Update, + stores: make([]uint64, len(peers)), + } + for i, peer := range peers { + newItem.stores[i] = peer.GetStoreId() + } + if oldItem == nil { + if stat := f.updateNewHotPeerStat(newItem, deltaLoads, time.Duration(interval)*time.Second); stat != nil { + stats = append(stats, stat) + } + continue + } + if stat := f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second, source); stat != nil { + stats = append(stats, stat) + } } - return f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second, source) + return stats } // checkColdPeer checks the collect the un-heartbeat peer and maintain it. diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index 36f922d3830..50c6b3c961e 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -106,17 +106,8 @@ func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer } func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) { - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() res = append(res, cache.collectExpiredItems(region)...) - for _, peer := range peers { - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := cache.checkPeerFlow(peerInfo, region) - if item != nil { - res = append(res, item) - } - } - return res + return append(res, cache.checkPeerFlow(region, peers, region.GetLoads())...) } func updateFlow(cache *hotPeerCache, res []*HotPeerStat) []*HotPeerStat { @@ -318,74 +309,74 @@ func TestUpdateHotPeerStat(t *testing.T) { }() // skip interval=0 - interval := 0 + region = region.Clone(core.SetReportInterval(0, 0)) deltaLoads := []float64{0.0, 0.0, 0.0} utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 - newItem := cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) + newItem := cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) re.Nil(newItem) // new peer, interval is larger than report interval, but no hot - interval = 10 + region = region.Clone(core.SetReportInterval(10, 0)) deltaLoads = []float64{0.0, 0.0, 0.0} utils.MinHotThresholds[utils.RegionReadBytes] = 1.0 utils.MinHotThresholds[utils.RegionReadKeys] = 1.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 1.0 - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) re.Nil(newItem) // new peer, interval is less than report interval - interval = 4 + region = region.Clone(core.SetReportInterval(4, 0)) deltaLoads = []float64{60.0, 60.0, 60.0} utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) re.NotNil(newItem) - re.Equal(0, newItem.HotDegree) - re.Equal(0, newItem.AntiCount) + re.Equal(0, newItem[0].HotDegree) + re.Equal(0, newItem[0].AntiCount) // sum of interval is less than report interval - interval = 4 + region = region.Clone(core.SetReportInterval(4, 0)) deltaLoads = []float64{60.0, 60.0, 60.0} - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(0, newItem.HotDegree) - re.Equal(0, newItem.AntiCount) + cache.updateStat(newItem[0]) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + re.Equal(0, newItem[0].HotDegree) + re.Equal(0, newItem[0].AntiCount) // sum of interval is larger than report interval, and hot - newItem.AntiCount = utils.Read.DefaultAntiCount() - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(1, newItem.HotDegree) - re.Equal(2*m, newItem.AntiCount) + newItem[0].AntiCount = utils.Read.DefaultAntiCount() + cache.updateStat(newItem[0]) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + re.Equal(1, newItem[0].HotDegree) + re.Equal(2*m, newItem[0].AntiCount) // sum of interval is less than report interval - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(1, newItem.HotDegree) - re.Equal(2*m, newItem.AntiCount) + cache.updateStat(newItem[0]) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + re.Equal(1, newItem[0].HotDegree) + re.Equal(2*m, newItem[0].AntiCount) // sum of interval is larger than report interval, and hot - interval = 10 - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(2, newItem.HotDegree) - re.Equal(2*m, newItem.AntiCount) + region = region.Clone(core.SetReportInterval(10, 0)) + cache.updateStat(newItem[0]) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + re.Equal(2, newItem[0].HotDegree) + re.Equal(2*m, newItem[0].AntiCount) // sum of interval is larger than report interval, and cold utils.MinHotThresholds[utils.RegionReadBytes] = 10.0 utils.MinHotThresholds[utils.RegionReadKeys] = 10.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 10.0 - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(1, newItem.HotDegree) - re.Equal(2*m-1, newItem.AntiCount) + cache.updateStat(newItem[0]) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + re.Equal(1, newItem[0].HotDegree) + re.Equal(2*m-1, newItem[0].AntiCount) // sum of interval is larger than report interval, and cold for i := 0; i < 2*m-1; i++ { - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) + cache.updateStat(newItem[0]) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) } - re.Less(newItem.HotDegree, 0) - re.Equal(0, newItem.AntiCount) - re.Equal(utils.Remove, newItem.actionType) + re.Less(newItem[0].HotDegree, 0) + re.Equal(0, newItem[0].AntiCount) + re.Equal(utils.Remove, newItem[0].actionType) } func TestThresholdWithUpdateHotPeerStat(t *testing.T) { @@ -688,9 +679,8 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { StartTimestamp: start, EndTimestamp: end, })) - newPeer := core.NewPeerInfo(meta.Peers[0], region.GetLoads(), end-start) - stat := cache.checkPeerFlow(newPeer, newRegion) - if stat != nil { + stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), nil) + for _, stat := range stats { cache.updateStat(stat) } } @@ -717,22 +707,8 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { func BenchmarkCheckRegionFlow(b *testing.B) { cache := NewHotPeerCache(context.Background(), utils.Read) region := buildRegion(utils.Read, 3, 10) - peerInfos := make([]*core.PeerInfo, 0) - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10) - peerInfos = append(peerInfos, peerInfo) - } b.ResetTimer() for i := 0; i < b.N; i++ { - items := make([]*HotPeerStat, 0) - for _, peerInfo := range peerInfos { - item := cache.checkPeerFlow(peerInfo, region) - if item != nil { - items = append(items, item) - } - } - for _, ret := range items { - cache.updateStat(ret) - } + cache.checkPeerFlow(region, region.GetPeers(), nil) } } diff --git a/pkg/statistics/utils/kind.go b/pkg/statistics/utils/kind.go index 4d44b8d57e1..463b35c450a 100644 --- a/pkg/statistics/utils/kind.go +++ b/pkg/statistics/utils/kind.go @@ -15,7 +15,7 @@ package utils import ( - "github.com/tikv/pd/pkg/core" + "github.com/pingcap/kvproto/pkg/metapb" ) const ( @@ -231,9 +231,7 @@ func (rw RWType) DefaultAntiCount() int { } // GetLoadRatesFromPeer gets the load rates of the read or write type from PeerInfo. -func (rw RWType) GetLoadRatesFromPeer(peer *core.PeerInfo) []float64 { - deltaLoads := peer.GetLoads() - interval := peer.GetInterval() +func (rw RWType) GetLoadRatesFromPeer(peer *metapb.Peer, deltaLoads []float64, interval uint64) []float64 { loads := make([]float64, DimLen) for dim, k := range rw.RegionStats() { loads[dim] = deltaLoads[k] / float64(interval) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 148b43541a2..870e2495dc3 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -959,8 +959,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - peerInfo := core.NewPeerInfo(peer, loads, interval) - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) } } for _, stat := range stats.GetSnapshotStats() { diff --git a/tools/pd-ctl/tests/hot/hot_test.go b/tools/pd-ctl/tests/hot/hot_test.go index 7661704aa41..c06e42edf60 100644 --- a/tools/pd-ctl/tests/hot/hot_test.go +++ b/tools/pd-ctl/tests/hot/hot_test.go @@ -188,11 +188,10 @@ func (suite *hotTestSuite) checkHot(cluster *pdTests.TestCluster) { Id: 100 + regionIDCounter, StoreId: hotStoreID, } - peerInfo := core.NewPeerInfo(leader, loads, reportInterval) region := core.NewRegionInfo(&metapb.Region{ Id: hotRegionID, }, leader) - hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) testutil.Eventually(re, func() bool { hotPeerStat := getHotPeerStat(utils.Read, hotRegionID, hotStoreID) return hotPeerStat != nil From 9d88e55c8ff3e8443e69544829d90c47f6fe8511 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 24 May 2024 16:03:47 +0800 Subject: [PATCH 2/7] batch process the peer task Signed-off-by: Ryan Leung --- pkg/cluster/cluster.go | 4 ++- pkg/mcs/scheduling/server/cluster.go | 3 ++- pkg/mock/mockcluster/mockcluster.go | 12 ++++++--- pkg/statistics/hot_cache.go | 9 ++++--- pkg/statistics/hot_cache_task.go | 9 +++++-- pkg/statistics/hot_peer_cache.go | 14 ++++------ pkg/statistics/hot_peer_cache_test.go | 38 +++++++++++++++------------ pkg/statistics/utils/kind.go | 8 ++---- server/cluster/cluster.go | 2 +- tools/pd-ctl/tests/hot/hot_test.go | 2 +- 10 files changed, 56 insertions(+), 45 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 07636bed074..3928987937b 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -35,7 +35,9 @@ type Cluster interface { func HandleStatsAsync(c Cluster, region *core.RegionInfo) { c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) - c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetWriteLoads())) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetPeers(), region.GetWriteLoads(), interval)) c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 96309d20270..bfabc4660ad 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -9,6 +9,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" @@ -442,7 +443,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, []*metapb.Peer{peer}, loads, interval)) } // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 95f42ebc5e5..3f9710c48fd 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -894,7 +894,9 @@ func (mc *Cluster) CheckRegionRead(region *core.RegionInfo) []*statistics.HotPee items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredReadItems(region) items = append(items, expiredItems...) - return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + return append(items, mc.HotCache.CheckReadPeerSync(region, region.GetPeers(), region.GetLoads(), interval)...) } // CheckRegionWrite checks region write info with all peers @@ -902,7 +904,9 @@ func (mc *Cluster) CheckRegionWrite(region *core.RegionInfo) []*statistics.HotPe items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredWriteItems(region) items = append(items, expiredItems...) - return append(items, mc.HotCache.CheckWritePeerSync(region, nil)...) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + return append(items, mc.HotCache.CheckWritePeerSync(region, region.GetPeers(), region.GetLoads(), interval)...) } // CheckRegionLeaderRead checks region read info with leader peer @@ -910,7 +914,9 @@ func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics. items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredReadItems(region) items = append(items, expiredItems...) - return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + return append(items, mc.HotCache.CheckReadPeerSync(region, []*metapb.Peer{region.GetLeader()}, region.GetLoads(), interval)...) } // ObserveRegionsStats records the current stores stats from region stats. diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index fd7b67ae9a3..26548c8b47e 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -17,6 +17,7 @@ package statistics import ( "context" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/smallnest/chanx" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/statistics/utils" @@ -172,14 +173,14 @@ func (w *HotCache) Update(item *HotPeerStat, kind utils.RWType) { // CheckWritePeerSync checks the write status, returns update items. // This is used for mockcluster, for test purpose. -func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat { - return w.writeCache.checkPeerFlow(region, region.GetPeers(), loads) +func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) []*HotPeerStat { + return w.writeCache.checkPeerFlow(region, peers, loads, interval) } // CheckReadPeerSync checks the read status, returns update items. // This is used for mockcluster, for test purpose. -func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat { - return w.readCache.checkPeerFlow(region, region.GetPeers(), loads) +func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) []*HotPeerStat { + return w.readCache.checkPeerFlow(region, peers, loads, interval) } // ExpiredReadItems returns the read items which are already expired. diff --git a/pkg/statistics/hot_cache_task.go b/pkg/statistics/hot_cache_task.go index 962835d9089..01968bf17e8 100644 --- a/pkg/statistics/hot_cache_task.go +++ b/pkg/statistics/hot_cache_task.go @@ -17,6 +17,7 @@ package statistics import ( "context" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/core" ) @@ -27,19 +28,23 @@ type FlowItemTask interface { type checkPeerTask struct { regionInfo *core.RegionInfo + peers []*metapb.Peer loads []float64 + interval uint64 } // NewCheckPeerTask creates task to update peerInfo -func NewCheckPeerTask(regionInfo *core.RegionInfo, loads []float64) FlowItemTask { +func NewCheckPeerTask(regionInfo *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) FlowItemTask { return &checkPeerTask{ regionInfo: regionInfo, + peers: peers, loads: loads, + interval: interval, } } func (t *checkPeerTask) runTask(cache *hotPeerCache) { - stats := cache.checkPeerFlow(t.regionInfo, t.regionInfo.GetPeers(), t.loads) + stats := cache.checkPeerFlow(t.regionInfo, t.peers, t.loads, t.interval) for _, stat := range stats { cache.updateStat(stat) } diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index f1a9ce2cc1d..89d6ee254ac 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -174,19 +174,15 @@ func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerSt // checkPeerFlow checks the flow information of a peer. // Notice: checkPeerFlow couldn't be used concurrently. // checkPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. -func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64) []*HotPeerStat { - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() +func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64, interval uint64) []*HotPeerStat { if Denoising && interval < HotRegionReportMinInterval { // for test or simulator purpose return nil } - if deltaLoads == nil { - deltaLoads = region.GetLoads() - } f.collectPeerMetrics(deltaLoads, interval) // update metrics regionID := region.GetID() + regionPeers := region.GetPeers() var stats []*HotPeerStat for _, peer := range peers { storeID := peer.GetStoreId() @@ -218,12 +214,12 @@ func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Pe newItem := &HotPeerStat{ StoreID: storeID, RegionID: regionID, - Loads: f.kind.GetLoadRatesFromPeer(peer, deltaLoads, interval), + Loads: f.kind.GetLoadRates(deltaLoads, interval), isLeader: region.GetLeader().GetStoreId() == storeID, actionType: utils.Update, - stores: make([]uint64, len(peers)), + stores: make([]uint64, len(regionPeers)), } - for i, peer := range peers { + for i, peer := range regionPeers { newItem.stores[i] = peer.GetStoreId() } if oldItem == nil { diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index 50c6b3c961e..1ddf9b81e57 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -106,8 +106,10 @@ func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer } func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) { + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() res = append(res, cache.collectExpiredItems(region)...) - return append(res, cache.checkPeerFlow(region, peers, region.GetLoads())...) + return append(res, cache.checkPeerFlow(region, peers, region.GetLoads(), interval)...) } func updateFlow(cache *hotPeerCache, res []*HotPeerStat) []*HotPeerStat { @@ -309,56 +311,55 @@ func TestUpdateHotPeerStat(t *testing.T) { }() // skip interval=0 - region = region.Clone(core.SetReportInterval(0, 0)) + interval := uint64(0) deltaLoads := []float64{0.0, 0.0, 0.0} utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 - newItem := cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem := cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Nil(newItem) // new peer, interval is larger than report interval, but no hot - region = region.Clone(core.SetReportInterval(10, 0)) + interval = 10 deltaLoads = []float64{0.0, 0.0, 0.0} utils.MinHotThresholds[utils.RegionReadBytes] = 1.0 utils.MinHotThresholds[utils.RegionReadKeys] = 1.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 1.0 - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Nil(newItem) // new peer, interval is less than report interval - region = region.Clone(core.SetReportInterval(4, 0)) + interval = 4 deltaLoads = []float64{60.0, 60.0, 60.0} utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.NotNil(newItem) re.Equal(0, newItem[0].HotDegree) re.Equal(0, newItem[0].AntiCount) // sum of interval is less than report interval - region = region.Clone(core.SetReportInterval(4, 0)) deltaLoads = []float64{60.0, 60.0, 60.0} cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(0, newItem[0].HotDegree) re.Equal(0, newItem[0].AntiCount) // sum of interval is larger than report interval, and hot newItem[0].AntiCount = utils.Read.DefaultAntiCount() cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(1, newItem[0].HotDegree) re.Equal(2*m, newItem[0].AntiCount) // sum of interval is less than report interval cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(1, newItem[0].HotDegree) re.Equal(2*m, newItem[0].AntiCount) // sum of interval is larger than report interval, and hot - region = region.Clone(core.SetReportInterval(10, 0)) + interval = 10 cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(2, newItem[0].HotDegree) re.Equal(2*m, newItem[0].AntiCount) // sum of interval is larger than report interval, and cold @@ -366,13 +367,13 @@ func TestUpdateHotPeerStat(t *testing.T) { utils.MinHotThresholds[utils.RegionReadKeys] = 10.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 10.0 cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(1, newItem[0].HotDegree) re.Equal(2*m-1, newItem[0].AntiCount) // sum of interval is larger than report interval, and cold for i := 0; i < 2*m-1; i++ { cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) } re.Less(newItem[0].HotDegree, 0) re.Equal(0, newItem[0].AntiCount) @@ -679,7 +680,7 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { StartTimestamp: start, EndTimestamp: end, })) - stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), nil) + stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), newRegion.GetLoads(), end-start) for _, stat := range stats { cache.updateStat(stat) } @@ -709,6 +710,9 @@ func BenchmarkCheckRegionFlow(b *testing.B) { region := buildRegion(utils.Read, 3, 10) b.ResetTimer() for i := 0; i < b.N; i++ { - cache.checkPeerFlow(region, region.GetPeers(), nil) + stats := cache.checkPeerFlow(region, region.GetPeers(), region.GetLoads(), 10) + for _, stat := range stats { + cache.updateStat(stat) + } } } diff --git a/pkg/statistics/utils/kind.go b/pkg/statistics/utils/kind.go index 463b35c450a..089732f759f 100644 --- a/pkg/statistics/utils/kind.go +++ b/pkg/statistics/utils/kind.go @@ -14,10 +14,6 @@ package utils -import ( - "github.com/pingcap/kvproto/pkg/metapb" -) - const ( // BytePriority indicates hot-region-scheduler prefer byte dim BytePriority = "byte" @@ -230,8 +226,8 @@ func (rw RWType) DefaultAntiCount() int { } } -// GetLoadRatesFromPeer gets the load rates of the read or write type from PeerInfo. -func (rw RWType) GetLoadRatesFromPeer(peer *metapb.Peer, deltaLoads []float64, interval uint64) []float64 { +// GetLoadRates gets the load rates of the read or write type. +func (rw RWType) GetLoadRates(deltaLoads []float64, interval uint64) []float64 { loads := make([]float64, DimLen) for dim, k := range rw.RegionStats() { loads[dim] = deltaLoads[k] / float64(interval) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 870e2495dc3..f747b655815 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -959,7 +959,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, []*metapb.Peer{peer}, loads, interval)) } } for _, stat := range stats.GetSnapshotStats() { diff --git a/tools/pd-ctl/tests/hot/hot_test.go b/tools/pd-ctl/tests/hot/hot_test.go index c06e42edf60..f47936bd515 100644 --- a/tools/pd-ctl/tests/hot/hot_test.go +++ b/tools/pd-ctl/tests/hot/hot_test.go @@ -191,7 +191,7 @@ func (suite *hotTestSuite) checkHot(cluster *pdTests.TestCluster) { region := core.NewRegionInfo(&metapb.Region{ Id: hotRegionID, }, leader) - hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) + hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, []*metapb.Peer{leader}, loads, reportInterval)) testutil.Eventually(re, func() bool { hotPeerStat := getHotPeerStat(utils.Read, hotRegionID, hotStoreID) return hotPeerStat != nil From b89a6eb53a1b1511cf4c1ec00c721af291508271 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 27 May 2024 14:50:23 +0800 Subject: [PATCH 3/7] use pool Signed-off-by: Ryan Leung --- pkg/cluster/cluster.go | 4 +-- pkg/mcs/scheduling/server/cluster.go | 2 +- pkg/statistics/hot_cache_task.go | 49 +++++++++++++++++++++++----- pkg/statistics/hot_peer_cache.go | 7 ++++ server/cluster/cluster.go | 2 +- server/cluster/cluster_test.go | 32 ++++++++++++++++++ tools/pd-ctl/tests/hot/hot_test.go | 2 +- 7 files changed, 84 insertions(+), 14 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 3928987937b..ab97c7899db 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -35,9 +35,7 @@ type Cluster interface { func HandleStatsAsync(c Cluster, region *core.RegionInfo) { c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetPeers(), region.GetWriteLoads(), interval)) + c.GetHotStat().CheckWriteAsync(statistics.NewCheckWritePeerTask(region)) c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index bfabc4660ad..d711ab2d4f6 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -443,7 +443,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, []*metapb.Peer{peer}, loads, interval)) + c.hotStat.CheckReadAsync(statistics.NewCheckReadPeerTask(region, []*metapb.Peer{peer}, loads, interval)) } // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. diff --git a/pkg/statistics/hot_cache_task.go b/pkg/statistics/hot_cache_task.go index 01968bf17e8..346381295e6 100644 --- a/pkg/statistics/hot_cache_task.go +++ b/pkg/statistics/hot_cache_task.go @@ -16,6 +16,7 @@ package statistics import ( "context" + "sync" "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/core" @@ -26,16 +27,28 @@ type FlowItemTask interface { runTask(cache *hotPeerCache) } -type checkPeerTask struct { +var checkWritePeerTaskPool = sync.Pool{ + New: func() interface{} { + return &checkWritePeerTask{} + }, +} + +var checkExpiredTaskPool = sync.Pool{ + New: func() interface{} { + return &checkExpiredTask{} + }, +} + +type checkReadPeerTask struct { regionInfo *core.RegionInfo peers []*metapb.Peer loads []float64 interval uint64 } -// NewCheckPeerTask creates task to update peerInfo -func NewCheckPeerTask(regionInfo *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) FlowItemTask { - return &checkPeerTask{ +// NewCheckReadPeerTask creates task to update peerInfo +func NewCheckReadPeerTask(regionInfo *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) FlowItemTask { + return &checkReadPeerTask{ regionInfo: regionInfo, peers: peers, loads: loads, @@ -43,22 +56,41 @@ func NewCheckPeerTask(regionInfo *core.RegionInfo, peers []*metapb.Peer, loads [ } } -func (t *checkPeerTask) runTask(cache *hotPeerCache) { +func (t *checkReadPeerTask) runTask(cache *hotPeerCache) { stats := cache.checkPeerFlow(t.regionInfo, t.peers, t.loads, t.interval) for _, stat := range stats { cache.updateStat(stat) } } +type checkWritePeerTask struct { + regionInfo *core.RegionInfo +} + +// NewCheckWritePeerTask creates task to update peerInfo +func NewCheckWritePeerTask(regionInfo *core.RegionInfo) FlowItemTask { + task := checkWritePeerTaskPool.Get().(*checkWritePeerTask) + task.regionInfo = regionInfo + return task +} + +func (t *checkWritePeerTask) runTask(cache *hotPeerCache) { + stats := cache.checkPeerFlow(t.regionInfo, nil, nil, 0) + for _, stat := range stats { + cache.updateStat(stat) + } + checkWritePeerTaskPool.Put(t) +} + type checkExpiredTask struct { region *core.RegionInfo } // NewCheckExpiredItemTask creates task to collect expired items func NewCheckExpiredItemTask(region *core.RegionInfo) FlowItemTask { - return &checkExpiredTask{ - region: region, - } + task := checkExpiredTaskPool.Get().(*checkExpiredTask) + task.region = region + return task } func (t *checkExpiredTask) runTask(cache *hotPeerCache) { @@ -66,6 +98,7 @@ func (t *checkExpiredTask) runTask(cache *hotPeerCache) { for _, stat := range expiredStats { cache.updateStat(stat) } + checkExpiredTaskPool.Put(t) } type collectUnReportedPeerTask struct { diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 89d6ee254ac..d3c0e760e19 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -179,6 +179,13 @@ func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Pe return nil } + if peers == nil { + peers = region.GetPeers() + deltaLoads = region.GetWriteLoads() + reportInterval := region.GetInterval() + interval = reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + } + f.collectPeerMetrics(deltaLoads, interval) // update metrics regionID := region.GetID() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index f747b655815..057814b718b 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -959,7 +959,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, []*metapb.Peer{peer}, loads, interval)) + c.hotStat.CheckReadAsync(statistics.NewCheckReadPeerTask(region, []*metapb.Peer{peer}, loads, interval)) } } for _, stat := range stats.GetSnapshotStats() { diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 945e354bb6c..0f08153c8ae 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" @@ -3730,3 +3731,34 @@ func waitNoResponse(re *require.Assertions, stream mockhbstream.HeartbeatStream) return res == nil }) } + +func BenchmarkHandleStatsAsync(b *testing.B) { + // Setup: create a new instance of Cluster + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, opt, _ := newTestScheduleConfig() + c := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) + c.coordinator = schedule.NewCoordinator(ctx, c, nil) + c.SetPrepared() + region := core.NewRegionInfo(&metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + StartKey: []byte{byte(2)}, + EndKey: []byte{byte(3)}, + Peers: []*metapb.Peer{{Id: 11, StoreId: uint64(1)}}, + }, nil, + core.SetApproximateSize(10), + core.SetReportInterval(0, 10), + ) + + // Reset timer after setup + b.ResetTimer() + // Run HandleStatsAsync b.N times + for i := 0; i < b.N; i++ { + cluster.HandleStatsAsync(c, region) + } +} diff --git a/tools/pd-ctl/tests/hot/hot_test.go b/tools/pd-ctl/tests/hot/hot_test.go index f47936bd515..f65b811b36a 100644 --- a/tools/pd-ctl/tests/hot/hot_test.go +++ b/tools/pd-ctl/tests/hot/hot_test.go @@ -191,7 +191,7 @@ func (suite *hotTestSuite) checkHot(cluster *pdTests.TestCluster) { region := core.NewRegionInfo(&metapb.Region{ Id: hotRegionID, }, leader) - hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, []*metapb.Peer{leader}, loads, reportInterval)) + hotStat.CheckReadAsync(statistics.NewCheckReadPeerTask(region, []*metapb.Peer{leader}, loads, reportInterval)) testutil.Eventually(re, func() bool { hotPeerStat := getHotPeerStat(utils.Read, hotRegionID, hotStoreID) return hotPeerStat != nil From 6bf5e38b5c62d88b4e60eb61385b670449151255 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 27 May 2024 15:13:48 +0800 Subject: [PATCH 4/7] remove pool Signed-off-by: Ryan Leung --- pkg/statistics/hot_cache_task.go | 33 +++++++++----------------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/pkg/statistics/hot_cache_task.go b/pkg/statistics/hot_cache_task.go index 346381295e6..71f0a526b2f 100644 --- a/pkg/statistics/hot_cache_task.go +++ b/pkg/statistics/hot_cache_task.go @@ -16,7 +16,6 @@ package statistics import ( "context" - "sync" "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/core" @@ -27,18 +26,6 @@ type FlowItemTask interface { runTask(cache *hotPeerCache) } -var checkWritePeerTaskPool = sync.Pool{ - New: func() interface{} { - return &checkWritePeerTask{} - }, -} - -var checkExpiredTaskPool = sync.Pool{ - New: func() interface{} { - return &checkExpiredTask{} - }, -} - type checkReadPeerTask struct { regionInfo *core.RegionInfo peers []*metapb.Peer @@ -64,22 +51,21 @@ func (t *checkReadPeerTask) runTask(cache *hotPeerCache) { } type checkWritePeerTask struct { - regionInfo *core.RegionInfo + region *core.RegionInfo } // NewCheckWritePeerTask creates task to update peerInfo -func NewCheckWritePeerTask(regionInfo *core.RegionInfo) FlowItemTask { - task := checkWritePeerTaskPool.Get().(*checkWritePeerTask) - task.regionInfo = regionInfo - return task +func NewCheckWritePeerTask(region *core.RegionInfo) FlowItemTask { + return &checkWritePeerTask{ + region: region, + } } func (t *checkWritePeerTask) runTask(cache *hotPeerCache) { - stats := cache.checkPeerFlow(t.regionInfo, nil, nil, 0) + stats := cache.checkPeerFlow(t.region, nil, nil, 0) for _, stat := range stats { cache.updateStat(stat) } - checkWritePeerTaskPool.Put(t) } type checkExpiredTask struct { @@ -88,9 +74,9 @@ type checkExpiredTask struct { // NewCheckExpiredItemTask creates task to collect expired items func NewCheckExpiredItemTask(region *core.RegionInfo) FlowItemTask { - task := checkExpiredTaskPool.Get().(*checkExpiredTask) - task.region = region - return task + return &checkExpiredTask{ + region: region, + } } func (t *checkExpiredTask) runTask(cache *hotPeerCache) { @@ -98,7 +84,6 @@ func (t *checkExpiredTask) runTask(cache *hotPeerCache) { for _, stat := range expiredStats { cache.updateStat(stat) } - checkExpiredTaskPool.Put(t) } type collectUnReportedPeerTask struct { From 7a65fc7a3e85381ecd92feda951bbf1ed2953b8b Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 27 May 2024 15:27:25 +0800 Subject: [PATCH 5/7] fix Signed-off-by: Ryan Leung --- pkg/statistics/hot_cache_task.go | 4 +++- pkg/statistics/hot_peer_cache.go | 7 ------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/pkg/statistics/hot_cache_task.go b/pkg/statistics/hot_cache_task.go index 71f0a526b2f..01731f3fe4d 100644 --- a/pkg/statistics/hot_cache_task.go +++ b/pkg/statistics/hot_cache_task.go @@ -62,7 +62,9 @@ func NewCheckWritePeerTask(region *core.RegionInfo) FlowItemTask { } func (t *checkWritePeerTask) runTask(cache *hotPeerCache) { - stats := cache.checkPeerFlow(t.region, nil, nil, 0) + reportInterval := t.region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + stats := cache.checkPeerFlow(t.region, t.region.GetPeers(), t.region.GetWriteLoads(), interval) for _, stat := range stats { cache.updateStat(stat) } diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index d3c0e760e19..89d6ee254ac 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -179,13 +179,6 @@ func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Pe return nil } - if peers == nil { - peers = region.GetPeers() - deltaLoads = region.GetWriteLoads() - reportInterval := region.GetInterval() - interval = reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - } - f.collectPeerMetrics(deltaLoads, interval) // update metrics regionID := region.GetID() From 09c6b9281f09e6b568af5134d98da6562ce7fa95 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 28 May 2024 10:30:43 +0800 Subject: [PATCH 6/7] address comment Signed-off-by: Ryan Leung --- pkg/statistics/hot_peer_cache.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 89d6ee254ac..3a3d3519bd9 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -183,7 +183,7 @@ func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Pe regionID := region.GetID() regionPeers := region.GetPeers() - var stats []*HotPeerStat + stats := make([]*HotPeerStat, 0, len(peers)) for _, peer := range peers { storeID := peer.GetStoreId() oldItem := f.getOldHotPeerStat(regionID, storeID) @@ -223,14 +223,10 @@ func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Pe newItem.stores[i] = peer.GetStoreId() } if oldItem == nil { - if stat := f.updateNewHotPeerStat(newItem, deltaLoads, time.Duration(interval)*time.Second); stat != nil { - stats = append(stats, stat) - } + stats = append(stats, f.updateNewHotPeerStat(newItem, deltaLoads, time.Duration(interval)*time.Second)) continue } - if stat := f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second, source); stat != nil { - stats = append(stats, stat) - } + stats = append(stats, f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second, source)) } return stats } From d030133611d21d0b4d241cdceef6bb1dbebceb55 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 28 May 2024 10:57:35 +0800 Subject: [PATCH 7/7] address comment Signed-off-by: Ryan Leung --- pkg/statistics/hot_peer_cache_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index 1ddf9b81e57..c116e020f54 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -327,7 +327,7 @@ func TestUpdateHotPeerStat(t *testing.T) { utils.MinHotThresholds[utils.RegionReadKeys] = 1.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 1.0 newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) - re.Nil(newItem) + re.Empty(newItem) // new peer, interval is less than report interval interval = 4