Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 24, 2024
1 parent 0796198 commit 09497bf
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 33 deletions.
3 changes: 2 additions & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type Cluster interface {
func HandleStatsAsync(c Cluster, region *core.RegionInfo) {
c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetWriteLoads()))
interval := region.GetInterval()
c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetWriteLoads(), interval.GetEndTimestamp()-interval.GetStartTimestamp()))
c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
utils.RegionWriteKeys: 0,
utils.RegionWriteQueryNum: 0,
}
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads))
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads, interval))
}

// Here we will compare the reported regions with the previous hot peers to decide if it is still hot.
Expand Down
6 changes: 3 additions & 3 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,23 +894,23 @@ func (mc *Cluster) CheckRegionRead(region *core.RegionInfo) []*statistics.HotPee
items := make([]*statistics.HotPeerStat, 0)
expiredItems := mc.HotCache.ExpiredReadItems(region)
items = append(items, expiredItems...)
return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...)
return append(items, mc.HotCache.CheckReadPeerSync(region, nil, region.GetInterval().GetEndTimestamp()-region.GetInterval().GetStartTimestamp())...)
}

// CheckRegionWrite checks region write info with all peers
func (mc *Cluster) CheckRegionWrite(region *core.RegionInfo) []*statistics.HotPeerStat {
items := make([]*statistics.HotPeerStat, 0)
expiredItems := mc.HotCache.ExpiredWriteItems(region)
items = append(items, expiredItems...)
return append(items, mc.HotCache.CheckWritePeerSync(region, nil)...)
return append(items, mc.HotCache.CheckWritePeerSync(region, nil, region.GetInterval().GetEndTimestamp()-region.GetInterval().GetStartTimestamp())...)
}

// CheckRegionLeaderRead checks region read info with leader peer
func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics.HotPeerStat {
items := make([]*statistics.HotPeerStat, 0)
expiredItems := mc.HotCache.ExpiredReadItems(region)
items = append(items, expiredItems...)
return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...)
return append(items, mc.HotCache.CheckReadPeerSync(region, nil, region.GetInterval().GetEndTimestamp()-region.GetInterval().GetStartTimestamp())...)
}

// ObserveRegionsStats records the current stores stats from region stats.
Expand Down
8 changes: 4 additions & 4 deletions pkg/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,14 @@ func (w *HotCache) Update(item *HotPeerStat, kind utils.RWType) {

// CheckWritePeerSync checks the write status, returns update items.
// This is used for mockcluster, for test purpose.
func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat {
return w.writeCache.checkPeerFlow(region, region.GetPeers(), loads)
func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, loads []float64, interval uint64) []*HotPeerStat {
return w.writeCache.checkPeerFlow(region, region.GetPeers(), loads, interval)
}

// CheckReadPeerSync checks the read status, returns update items.
// This is used for mockcluster, for test purpose.
func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat {
return w.readCache.checkPeerFlow(region, region.GetPeers(), loads)
func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, loads []float64, interval uint64) []*HotPeerStat {
return w.readCache.checkPeerFlow(region, region.GetPeers(), loads, interval)
}

// ExpiredReadItems returns the read items which are already expired.
Expand Down
6 changes: 4 additions & 2 deletions pkg/statistics/hot_cache_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,20 @@ type FlowItemTask interface {
type checkPeerTask struct {
regionInfo *core.RegionInfo
loads []float64
interval uint64
}

// NewCheckPeerTask creates task to update peerInfo
func NewCheckPeerTask(regionInfo *core.RegionInfo, loads []float64) FlowItemTask {
func NewCheckPeerTask(regionInfo *core.RegionInfo, loads []float64, interval uint64) FlowItemTask {
return &checkPeerTask{
regionInfo: regionInfo,
loads: loads,
interval: interval,
}
}

func (t *checkPeerTask) runTask(cache *hotPeerCache) {
stats := cache.checkPeerFlow(t.regionInfo, t.regionInfo.GetPeers(), t.loads)
stats := cache.checkPeerFlow(t.regionInfo, t.regionInfo.GetPeers(), t.loads, t.interval)
for _, stat := range stats {
cache.updateStat(stat)
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,7 @@ func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerSt
// checkPeerFlow checks the flow information of a peer.
// Notice: checkPeerFlow couldn't be used concurrently.
// checkPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here.
func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64) []*HotPeerStat {
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64, interval uint64) []*HotPeerStat {
if Denoising && interval < HotRegionReportMinInterval { // for test or simulator purpose
return nil
}
Expand Down
36 changes: 19 additions & 17 deletions pkg/statistics/hot_peer_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer

func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) {
res = append(res, cache.collectExpiredItems(region)...)
return append(res, cache.checkPeerFlow(region, peers, region.GetLoads())...)
return append(res, cache.checkPeerFlow(region, peers, region.GetLoads(), region.GetInterval().GetEndTimestamp()-region.GetInterval().GetStartTimestamp())...)
}

func updateFlow(cache *hotPeerCache, res []*HotPeerStat) []*HotPeerStat {
Expand Down Expand Up @@ -309,70 +309,69 @@ func TestUpdateHotPeerStat(t *testing.T) {
}()

// skip interval=0
region = region.Clone(core.SetReportInterval(0, 0))
interval := uint64(0)
deltaLoads := []float64{0.0, 0.0, 0.0}
utils.MinHotThresholds[utils.RegionReadBytes] = 0.0
utils.MinHotThresholds[utils.RegionReadKeys] = 0.0
utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0

newItem := cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem := cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.Nil(newItem)

// new peer, interval is larger than report interval, but no hot
region = region.Clone(core.SetReportInterval(0, 10))
interval = 10
deltaLoads = []float64{0.0, 0.0, 0.0}
utils.MinHotThresholds[utils.RegionReadBytes] = 1.0
utils.MinHotThresholds[utils.RegionReadKeys] = 1.0
utils.MinHotThresholds[utils.RegionReadQueryNum] = 1.0
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.Nil(newItem)

// new peer, interval is less than report interval
region = region.Clone(core.SetReportInterval(0, 4))
interval = 4
deltaLoads = []float64{60.0, 60.0, 60.0}
utils.MinHotThresholds[utils.RegionReadBytes] = 0.0
utils.MinHotThresholds[utils.RegionReadKeys] = 0.0
utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.NotNil(newItem)
re.Equal(0, newItem[0].HotDegree)
re.Equal(0, newItem[0].AntiCount)
// sum of interval is less than report interval
region = region.Clone(core.SetReportInterval(0, 4))
deltaLoads = []float64{60.0, 60.0, 60.0}
cache.updateStat(newItem[0])
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.Equal(0, newItem[0].HotDegree)
re.Equal(0, newItem[0].AntiCount)
// sum of interval is larger than report interval, and hot
newItem[0].AntiCount = utils.Read.DefaultAntiCount()
cache.updateStat(newItem[0])
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.Equal(1, newItem[0].HotDegree)
re.Equal(2*m, newItem[0].AntiCount)
// sum of interval is less than report interval
cache.updateStat(newItem[0])
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.Equal(1, newItem[0].HotDegree)
re.Equal(2*m, newItem[0].AntiCount)
// sum of interval is larger than report interval, and hot
region = region.Clone(core.SetReportInterval(0, 10))
interval = 10
cache.updateStat(newItem[0])
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.Equal(2, newItem[0].HotDegree)
re.Equal(2*m, newItem[0].AntiCount)
// sum of interval is larger than report interval, and cold
utils.MinHotThresholds[utils.RegionReadBytes] = 10.0
utils.MinHotThresholds[utils.RegionReadKeys] = 10.0
utils.MinHotThresholds[utils.RegionReadQueryNum] = 10.0
cache.updateStat(newItem[0])
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
re.Equal(1, newItem[0].HotDegree)
re.Equal(2*m-1, newItem[0].AntiCount)
// sum of interval is larger than report interval, and cold
for i := 0; i < 2*m-1; i++ {
cache.updateStat(newItem[0])
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads)
newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval)
}
re.Less(newItem[0].HotDegree, 0)
re.Equal(0, newItem[0].AntiCount)
Expand Down Expand Up @@ -679,7 +678,7 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) {
StartTimestamp: start,
EndTimestamp: end,
}))
stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), nil)
stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), nil, end-start)
for _, stat := range stats {
cache.updateStat(stat)
}
Expand Down Expand Up @@ -709,6 +708,9 @@ func BenchmarkCheckRegionFlow(b *testing.B) {
region := buildRegion(utils.Read, 3, 10)
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.checkPeerFlow(region, region.GetPeers(), nil)
stats := cache.checkPeerFlow(region, region.GetPeers(), nil, 10)
for _, stat := range stats {
cache.updateStat(stat)
}
}
}
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
utils.RegionWriteKeys: 0,
utils.RegionWriteQueryNum: 0,
}
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads))
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads, interval))
}
}
for _, stat := range stats.GetSnapshotStats() {
Expand Down
3 changes: 2 additions & 1 deletion tools/pd-ctl/tests/hot/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ func (suite *hotTestSuite) checkHot(cluster *pdTests.TestCluster) {
region := core.NewRegionInfo(&metapb.Region{
Id: hotRegionID,
}, leader)
hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads))
interval := region.GetInterval()
hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads, interval.GetEndTimestamp()-interval.GetStartTimestamp()))
testutil.Eventually(re, func() bool {
hotPeerStat := getHotPeerStat(utils.Read, hotRegionID, hotStoreID)
return hotPeerStat != nil
Expand Down

0 comments on commit 09497bf

Please sign in to comment.