From 9911a5bc60da137dd612031eaeda9c9fde44df3c Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 24 May 2024 17:50:05 +0800 Subject: [PATCH] fix Signed-off-by: Ryan Leung --- pkg/cluster/cluster.go | 2 +- pkg/mcs/scheduling/server/cluster.go | 3 ++- pkg/mock/mockcluster/mockcluster.go | 6 +++--- pkg/statistics/hot_cache.go | 7 +++++++ pkg/statistics/hot_cache_task.go | 7 +++++-- pkg/statistics/hot_peer_cache.go | 3 --- pkg/statistics/hot_peer_cache_test.go | 4 ++-- server/cluster/cluster.go | 2 +- tools/pd-ctl/tests/hot/hot_test.go | 2 +- 9 files changed, 22 insertions(+), 14 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index bc89c9e98068..3928987937b8 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -37,7 +37,7 @@ func HandleStatsAsync(c Cluster, region *core.RegionInfo) { c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetWriteLoads(), interval)) + c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetPeers(), region.GetWriteLoads(), interval)) c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index a25d8471d6d1..bfabc4660ad2 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -9,6 +9,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" @@ -442,7 +443,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads, interval)) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, []*metapb.Peer{peer}, loads, interval)) } // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 8b8539a610a0..8813e95b04df 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -896,7 +896,7 @@ func (mc *Cluster) CheckRegionRead(region *core.RegionInfo) []*statistics.HotPee items = append(items, expiredItems...) reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - return append(items, mc.HotCache.CheckReadPeerSync(region, nil, interval)...) + return append(items, mc.HotCache.CheckReadPeerSync(region, region.GetLoads(), interval)...) } // CheckRegionWrite checks region write info with all peers @@ -906,7 +906,7 @@ func (mc *Cluster) CheckRegionWrite(region *core.RegionInfo) []*statistics.HotPe items = append(items, expiredItems...) reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - return append(items, mc.HotCache.CheckWritePeerSync(region, nil, interval)...) + return append(items, mc.HotCache.CheckWritePeerSync(region, region.GetLoads(), interval)...) } // CheckRegionLeaderRead checks region read info with leader peer @@ -916,7 +916,7 @@ func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics. items = append(items, expiredItems...) reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - return append(items, mc.HotCache.CheckReadPeerSync(region, nil, interval)...) + return append(items, mc.HotCache.CheckReadLeaderSync(region, region.GetLoads(), interval)...) } // ObserveRegionsStats records the current stores stats from region stats. diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index 7c1bf2c14104..d3cadd59ae12 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -17,6 +17,7 @@ package statistics import ( "context" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/smallnest/chanx" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/statistics/utils" @@ -182,6 +183,12 @@ func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, loads []float64, i return w.readCache.checkPeerFlow(region, region.GetPeers(), loads, interval) } +// CheckReadLeaderSync checks the read status, returns update items. +// This is used for mockcluster, for test purpose. +func (w *HotCache) CheckReadLeaderSync(region *core.RegionInfo, loads []float64, interval uint64) []*HotPeerStat { + return w.readCache.checkPeerFlow(region, []*metapb.Peer{region.GetLeader()}, loads, interval) +} + // ExpiredReadItems returns the read items which are already expired. // This is used for mockcluster, for test purpose. func (w *HotCache) ExpiredReadItems(region *core.RegionInfo) []*HotPeerStat { diff --git a/pkg/statistics/hot_cache_task.go b/pkg/statistics/hot_cache_task.go index 3c102a2559f1..01968bf17e83 100644 --- a/pkg/statistics/hot_cache_task.go +++ b/pkg/statistics/hot_cache_task.go @@ -17,6 +17,7 @@ package statistics import ( "context" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/core" ) @@ -27,21 +28,23 @@ type FlowItemTask interface { type checkPeerTask struct { regionInfo *core.RegionInfo + peers []*metapb.Peer loads []float64 interval uint64 } // NewCheckPeerTask creates task to update peerInfo -func NewCheckPeerTask(regionInfo *core.RegionInfo, loads []float64, interval uint64) FlowItemTask { +func NewCheckPeerTask(regionInfo *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) FlowItemTask { return &checkPeerTask{ regionInfo: regionInfo, + peers: peers, loads: loads, interval: interval, } } func (t *checkPeerTask) runTask(cache *hotPeerCache) { - stats := cache.checkPeerFlow(t.regionInfo, t.regionInfo.GetPeers(), t.loads, t.interval) + stats := cache.checkPeerFlow(t.regionInfo, t.peers, t.loads, t.interval) for _, stat := range stats { cache.updateStat(stat) } diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index ae4f1fb39283..b462a3e83cc7 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -179,9 +179,6 @@ func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Pe return nil } - if deltaLoads == nil { - deltaLoads = region.GetLoads() - } f.collectPeerMetrics(deltaLoads, interval) // update metrics regionID := region.GetID() diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index 151d7a631e82..1ddf9b81e573 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -680,7 +680,7 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { StartTimestamp: start, EndTimestamp: end, })) - stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), nil, end-start) + stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), newRegion.GetLoads(), end-start) for _, stat := range stats { cache.updateStat(stat) } @@ -710,7 +710,7 @@ func BenchmarkCheckRegionFlow(b *testing.B) { region := buildRegion(utils.Read, 3, 10) b.ResetTimer() for i := 0; i < b.N; i++ { - stats := cache.checkPeerFlow(region, region.GetPeers(), nil, 10) + stats := cache.checkPeerFlow(region, region.GetPeers(), region.GetLoads(), 10) for _, stat := range stats { cache.updateStat(stat) } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c12f749de496..f747b655815a 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -959,7 +959,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads, interval)) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, []*metapb.Peer{peer}, loads, interval)) } } for _, stat := range stats.GetSnapshotStats() { diff --git a/tools/pd-ctl/tests/hot/hot_test.go b/tools/pd-ctl/tests/hot/hot_test.go index 2909612f822b..6a5a4689f5f1 100644 --- a/tools/pd-ctl/tests/hot/hot_test.go +++ b/tools/pd-ctl/tests/hot/hot_test.go @@ -191,7 +191,7 @@ func (suite *hotTestSuite) checkHot(cluster *pdTests.TestCluster) { region := core.NewRegionInfo(&metapb.Region{ Id: hotRegionID, }, leader) - hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads, reportInterval)) + hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, region.GetPeers(), loads, reportInterval)) testutil.Eventually(re, func() bool { hotPeerStat := getHotPeerStat(utils.Read, hotRegionID, hotStoreID) return hotPeerStat != nil