diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 07636bed074d..0aa011fb6b14 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -35,7 +35,8 @@ type Cluster interface { func HandleStatsAsync(c Cluster, region *core.RegionInfo) { c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) - c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetWriteLoads())) + interval := region.GetInterval() + c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetWriteLoads(), interval.GetEndTimestamp()-interval.GetStartTimestamp())) c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 96309d20270c..a25d8471d6d1 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -442,7 +442,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads, interval)) } // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 95f42ebc5e5d..aed8c8fd3ba4 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -894,7 +894,7 @@ func (mc *Cluster) CheckRegionRead(region *core.RegionInfo) []*statistics.HotPee items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredReadItems(region) items = append(items, expiredItems...) - return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...) + return append(items, mc.HotCache.CheckReadPeerSync(region, nil, region.GetInterval().GetEndTimestamp()-region.GetInterval().GetStartTimestamp())...) } // CheckRegionWrite checks region write info with all peers @@ -902,7 +902,7 @@ func (mc *Cluster) CheckRegionWrite(region *core.RegionInfo) []*statistics.HotPe items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredWriteItems(region) items = append(items, expiredItems...) - return append(items, mc.HotCache.CheckWritePeerSync(region, nil)...) + return append(items, mc.HotCache.CheckWritePeerSync(region, nil, region.GetInterval().GetEndTimestamp()-region.GetInterval().GetStartTimestamp())...) } // CheckRegionLeaderRead checks region read info with leader peer @@ -910,7 +910,7 @@ func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics. items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredReadItems(region) items = append(items, expiredItems...) - return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...) + return append(items, mc.HotCache.CheckReadPeerSync(region, nil, region.GetInterval().GetEndTimestamp()-region.GetInterval().GetStartTimestamp())...) } // ObserveRegionsStats records the current stores stats from region stats. diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index fd7b67ae9a30..7c1bf2c14104 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -172,14 +172,14 @@ func (w *HotCache) Update(item *HotPeerStat, kind utils.RWType) { // CheckWritePeerSync checks the write status, returns update items. // This is used for mockcluster, for test purpose. -func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat { - return w.writeCache.checkPeerFlow(region, region.GetPeers(), loads) +func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, loads []float64, interval uint64) []*HotPeerStat { + return w.writeCache.checkPeerFlow(region, region.GetPeers(), loads, interval) } // CheckReadPeerSync checks the read status, returns update items. // This is used for mockcluster, for test purpose. -func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat { - return w.readCache.checkPeerFlow(region, region.GetPeers(), loads) +func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, loads []float64, interval uint64) []*HotPeerStat { + return w.readCache.checkPeerFlow(region, region.GetPeers(), loads, interval) } // ExpiredReadItems returns the read items which are already expired. diff --git a/pkg/statistics/hot_cache_task.go b/pkg/statistics/hot_cache_task.go index 962835d90896..3c102a2559f1 100644 --- a/pkg/statistics/hot_cache_task.go +++ b/pkg/statistics/hot_cache_task.go @@ -28,18 +28,20 @@ type FlowItemTask interface { type checkPeerTask struct { regionInfo *core.RegionInfo loads []float64 + interval uint64 } // NewCheckPeerTask creates task to update peerInfo -func NewCheckPeerTask(regionInfo *core.RegionInfo, loads []float64) FlowItemTask { +func NewCheckPeerTask(regionInfo *core.RegionInfo, loads []float64, interval uint64) FlowItemTask { return &checkPeerTask{ regionInfo: regionInfo, loads: loads, + interval: interval, } } func (t *checkPeerTask) runTask(cache *hotPeerCache) { - stats := cache.checkPeerFlow(t.regionInfo, t.regionInfo.GetPeers(), t.loads) + stats := cache.checkPeerFlow(t.regionInfo, t.regionInfo.GetPeers(), t.loads, t.interval) for _, stat := range stats { cache.updateStat(stat) } diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index b71ceb94da1c..ae4f1fb39283 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -174,9 +174,7 @@ func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerSt // checkPeerFlow checks the flow information of a peer. // Notice: checkPeerFlow couldn't be used concurrently. // checkPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. -func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64) []*HotPeerStat { - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() +func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64, interval uint64) []*HotPeerStat { if Denoising && interval < HotRegionReportMinInterval { // for test or simulator purpose return nil } diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index a333885dad60..2a9dcc44df7a 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -107,7 +107,7 @@ func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) { res = append(res, cache.collectExpiredItems(region)...) - return append(res, cache.checkPeerFlow(region, peers, region.GetLoads())...) + return append(res, cache.checkPeerFlow(region, peers, region.GetLoads(), region.GetInterval().GetEndTimestamp()-region.GetInterval().GetStartTimestamp())...) } func updateFlow(cache *hotPeerCache, res []*HotPeerStat) []*HotPeerStat { @@ -309,56 +309,55 @@ func TestUpdateHotPeerStat(t *testing.T) { }() // skip interval=0 - region = region.Clone(core.SetReportInterval(0, 0)) + interval := uint64(0) deltaLoads := []float64{0.0, 0.0, 0.0} utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 - newItem := cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem := cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Nil(newItem) // new peer, interval is larger than report interval, but no hot - region = region.Clone(core.SetReportInterval(0, 10)) + interval = 10 deltaLoads = []float64{0.0, 0.0, 0.0} utils.MinHotThresholds[utils.RegionReadBytes] = 1.0 utils.MinHotThresholds[utils.RegionReadKeys] = 1.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 1.0 - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Nil(newItem) // new peer, interval is less than report interval - region = region.Clone(core.SetReportInterval(0, 4)) + interval = 4 deltaLoads = []float64{60.0, 60.0, 60.0} utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.NotNil(newItem) re.Equal(0, newItem[0].HotDegree) re.Equal(0, newItem[0].AntiCount) // sum of interval is less than report interval - region = region.Clone(core.SetReportInterval(0, 4)) deltaLoads = []float64{60.0, 60.0, 60.0} cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(0, newItem[0].HotDegree) re.Equal(0, newItem[0].AntiCount) // sum of interval is larger than report interval, and hot newItem[0].AntiCount = utils.Read.DefaultAntiCount() cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(1, newItem[0].HotDegree) re.Equal(2*m, newItem[0].AntiCount) // sum of interval is less than report interval cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(1, newItem[0].HotDegree) re.Equal(2*m, newItem[0].AntiCount) // sum of interval is larger than report interval, and hot - region = region.Clone(core.SetReportInterval(0, 10)) + interval = 10 cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(2, newItem[0].HotDegree) re.Equal(2*m, newItem[0].AntiCount) // sum of interval is larger than report interval, and cold @@ -366,13 +365,13 @@ func TestUpdateHotPeerStat(t *testing.T) { utils.MinHotThresholds[utils.RegionReadKeys] = 10.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 10.0 cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(1, newItem[0].HotDegree) re.Equal(2*m-1, newItem[0].AntiCount) // sum of interval is larger than report interval, and cold for i := 0; i < 2*m-1; i++ { cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) } re.Less(newItem[0].HotDegree, 0) re.Equal(0, newItem[0].AntiCount) @@ -679,7 +678,7 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { StartTimestamp: start, EndTimestamp: end, })) - stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), nil) + stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), nil, end-start) for _, stat := range stats { cache.updateStat(stat) } @@ -709,6 +708,9 @@ func BenchmarkCheckRegionFlow(b *testing.B) { region := buildRegion(utils.Read, 3, 10) b.ResetTimer() for i := 0; i < b.N; i++ { - cache.checkPeerFlow(region, region.GetPeers(), nil) + stats := cache.checkPeerFlow(region, region.GetPeers(), nil, 10) + for _, stat := range stats { + cache.updateStat(stat) + } } } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 870e2495dc3a..c12f749de496 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -959,7 +959,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads, interval)) } } for _, stat := range stats.GetSnapshotStats() { diff --git a/tools/pd-ctl/tests/hot/hot_test.go b/tools/pd-ctl/tests/hot/hot_test.go index c06e42edf60e..27efa1c5a77a 100644 --- a/tools/pd-ctl/tests/hot/hot_test.go +++ b/tools/pd-ctl/tests/hot/hot_test.go @@ -191,7 +191,8 @@ func (suite *hotTestSuite) checkHot(cluster *pdTests.TestCluster) { region := core.NewRegionInfo(&metapb.Region{ Id: hotRegionID, }, leader) - hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) + interval := region.GetInterval() + hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads, interval.GetEndTimestamp()-interval.GetStartTimestamp())) testutil.Eventually(re, func() bool { hotPeerStat := getHotPeerStat(utils.Read, hotRegionID, hotStoreID) return hotPeerStat != nil