diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index ab97c7899db9..2cf5787646a8 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -33,9 +33,25 @@ type Cluster interface { // HandleStatsAsync handles the flow asynchronously. func HandleStatsAsync(c Cluster, region *core.RegionInfo) { - c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) - c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) - c.GetHotStat().CheckWriteAsync(statistics.NewCheckWritePeerTask(region)) + checkWritePeerTask := func(cache *statistics.HotPeerCache) { + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + stats := cache.CheckPeerFlow(region, region.GetPeers(), region.GetWriteLoads(), interval) + for _, stat := range stats { + cache.UpdateStat(stat) + } + } + + checkExpiredTask := func(cache *statistics.HotPeerCache) { + expiredStats := cache.CollectExpiredItems(region) + for _, stat := range expiredStats { + cache.UpdateStat(stat) + } + } + + c.GetHotStat().CheckWriteAsync(checkExpiredTask) + c.GetHotStat().CheckReadAsync(checkExpiredTask) + c.GetHotStat().CheckWriteAsync(checkWritePeerTask) c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) } diff --git a/pkg/core/region.go b/pkg/core/region.go index 19c1d0d4794d..c485bdfee216 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -56,7 +56,6 @@ func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error { // the properties are Read-Only once created except buckets. // the `buckets` could be modified by the request `report buckets` with greater version. type RegionInfo struct { - term uint64 meta *metapb.Region learners []*metapb.Peer witnesses []*metapb.Peer @@ -64,6 +63,7 @@ type RegionInfo struct { leader *metapb.Peer downPeers []*pdpb.PeerStats pendingPeers []*metapb.Peer + term uint64 cpuUsage uint64 writtenBytes uint64 writtenKeys uint64 @@ -137,26 +137,23 @@ func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCre // classifyVoterAndLearner sorts out voter and learner from peers into different slice. func classifyVoterAndLearner(region *RegionInfo) { - learners := make([]*metapb.Peer, 0, 1) - voters := make([]*metapb.Peer, 0, len(region.meta.Peers)) - witnesses := make([]*metapb.Peer, 0, 1) + // Reset slices + region.learners = region.learners[:0] + region.voters = region.voters[:0] + region.witnesses = region.witnesses[:0] for _, p := range region.meta.Peers { if IsLearner(p) { - learners = append(learners, p) + region.learners = append(region.learners, p) } else { - voters = append(voters, p) + region.voters = append(region.voters, p) } - // Whichever peer role can be a witness if IsWitness(p) { - witnesses = append(witnesses, p) + region.witnesses = append(region.witnesses, p) } } - sort.Sort(peerSlice(learners)) - sort.Sort(peerSlice(voters)) - sort.Sort(peerSlice(witnesses)) - region.learners = learners - region.voters = voters - region.witnesses = witnesses + sort.Sort(peerSlice(region.learners)) + sort.Sort(peerSlice(region.voters)) + sort.Sort(peerSlice(region.witnesses)) } // peersEqualTo returns true when the peers are not changed, which may caused by: the region leader not changed, @@ -214,7 +211,7 @@ type RegionHeartbeatRequest interface { } // RegionFromHeartbeat constructs a Region from region heartbeat. -func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo { +func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, flowRound int) *RegionInfo { // Convert unit to MB. // If region isn't empty and less than 1MB, use 1MB instead. // The size of empty region will be correct by the previous RegionInfo. @@ -224,20 +221,21 @@ func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateO } region := &RegionInfo{ - term: heartbeat.GetTerm(), - meta: heartbeat.GetRegion(), - leader: heartbeat.GetLeader(), - downPeers: heartbeat.GetDownPeers(), - pendingPeers: heartbeat.GetPendingPeers(), - writtenBytes: heartbeat.GetBytesWritten(), - writtenKeys: heartbeat.GetKeysWritten(), - readBytes: heartbeat.GetBytesRead(), - readKeys: heartbeat.GetKeysRead(), - approximateSize: int64(regionSize), - approximateKeys: int64(heartbeat.GetApproximateKeys()), - interval: heartbeat.GetInterval(), - queryStats: heartbeat.GetQueryStats(), - source: Heartbeat, + term: heartbeat.GetTerm(), + meta: heartbeat.GetRegion(), + leader: heartbeat.GetLeader(), + downPeers: heartbeat.GetDownPeers(), + pendingPeers: heartbeat.GetPendingPeers(), + writtenBytes: heartbeat.GetBytesWritten(), + writtenKeys: heartbeat.GetKeysWritten(), + readBytes: heartbeat.GetBytesRead(), + readKeys: heartbeat.GetKeysRead(), + approximateSize: int64(regionSize), + approximateKeys: int64(heartbeat.GetApproximateKeys()), + interval: heartbeat.GetInterval(), + queryStats: heartbeat.GetQueryStats(), + source: Heartbeat, + flowRoundDivisor: uint64(flowRound), } // scheduling service doesn't need the following fields. @@ -247,10 +245,6 @@ func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateO region.cpuUsage = h.GetCpuUsage() } - for _, opt := range opts { - opt(region) - } - if region.writtenKeys >= ImpossibleFlowSize || region.writtenBytes >= ImpossibleFlowSize { region.writtenKeys = 0 region.writtenBytes = 0 @@ -957,11 +951,11 @@ func (r *RegionsInfo) getRegionLocked(regionID uint64) *RegionInfo { func (r *RegionsInfo) CheckAndPutRegion(region *RegionInfo) []*RegionInfo { r.t.Lock() origin := r.getRegionLocked(region.GetID()) - var ols []*regionItem + var ols []*RegionInfo if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { ols = r.tree.overlaps(®ionItem{RegionInfo: region}) } - err := check(region, origin, convertItemsToRegions(ols)) + err := check(region, origin, ols) if err != nil { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) // return the state region to delete. @@ -988,25 +982,17 @@ func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*Reg return origin, overlaps, err } -func convertItemsToRegions(items []*regionItem) []*RegionInfo { - regions := make([]*RegionInfo, 0, len(items)) - for _, item := range items { - regions = append(regions, item.RegionInfo) - } - return regions -} - // AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put. func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) { tracer := ctx.Tracer r.t.Lock() - var ols []*regionItem + var ols []*RegionInfo origin := r.getRegionLocked(region.GetID()) if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { ols = r.tree.overlaps(®ionItem{RegionInfo: region}) } tracer.OnCheckOverlapsFinished() - err := check(region, origin, convertItemsToRegions(ols)) + err := check(region, origin, ols) if err != nil { r.t.Unlock() tracer.OnValidateRegionFinished() @@ -1026,13 +1012,13 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *R func (r *RegionsInfo) CheckAndPutRootTree(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) { tracer := ctx.Tracer r.t.Lock() - var ols []*regionItem + var ols []*RegionInfo origin := r.getRegionLocked(region.GetID()) if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { ols = r.tree.overlaps(®ionItem{RegionInfo: region}) } tracer.OnCheckOverlapsFinished() - err := check(region, origin, convertItemsToRegions(ols)) + err := check(region, origin, ols) if err != nil { r.t.Unlock() tracer.OnValidateRegionFinished() @@ -1123,7 +1109,7 @@ func (r *RegionsInfo) updateSubTreeLocked(rangeChanged bool, overlaps []*RegionI if len(overlaps) == 0 { // If the range has changed but the overlapped regions are not provided, collect them by `[]*regionItem`. for _, item := range r.getOverlapRegionFromOverlapTreeLocked(region) { - r.removeRegionFromSubTreeLocked(item.RegionInfo) + r.removeRegionFromSubTreeLocked(item) } } else { // Remove all provided overlapped regions from the subtrees. @@ -1164,7 +1150,7 @@ func (r *RegionsInfo) updateSubTreeLocked(rangeChanged bool, overlaps []*RegionI setPeers(r.pendingPeers, region.GetPendingPeers()) } -func (r *RegionsInfo) getOverlapRegionFromOverlapTreeLocked(region *RegionInfo) []*regionItem { +func (r *RegionsInfo) getOverlapRegionFromOverlapTreeLocked(region *RegionInfo) []*RegionInfo { return r.overlapTree.overlaps(®ionItem{RegionInfo: region}) } @@ -1174,9 +1160,7 @@ func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo defer r.t.RUnlock() origin = r.getRegionLocked(region.GetID()) if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { - for _, item := range r.tree.overlaps(®ionItem{RegionInfo: region}) { - overlaps = append(overlaps, item.RegionInfo) - } + return origin, r.tree.overlaps(®ionItem{RegionInfo: region}) } return } @@ -1211,7 +1195,7 @@ func (r *RegionsInfo) SetRegion(region *RegionInfo) (*RegionInfo, []*RegionInfo, return r.setRegionLocked(region, false) } -func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol ...*regionItem) (*RegionInfo, []*RegionInfo, bool) { +func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol ...*RegionInfo) (*RegionInfo, []*RegionInfo, bool) { var ( item *regionItem // Pointer to the *RegionInfo of this ID. origin *RegionInfo @@ -1240,7 +1224,7 @@ func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol item.RegionInfo = region } else { // If the range is not changed, only the statistical on the regionTree needs to be updated. - r.tree.updateStat(origin, region) + r.tree.UpdateStat(origin, region) // Update the RegionInfo in the regionItem. item.RegionInfo = region return origin, nil, rangeChanged @@ -1281,7 +1265,7 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi func (r *RegionsInfo) updateSubTreeStat(origin *RegionInfo, region *RegionInfo) { updatePeerStat := func(peersMap map[uint64]*regionTree, storeID uint64) { if tree, ok := peersMap[storeID]; ok { - tree.updateStat(origin, region) + tree.UpdateStat(origin, region) } } for _, peer := range region.GetVoters() { @@ -1311,7 +1295,7 @@ func (r *RegionsInfo) TreeLen() int { } // GetOverlaps returns the regions which are overlapped with the specified region range. -func (r *RegionsInfo) GetOverlaps(region *RegionInfo) []*regionItem { +func (r *RegionsInfo) GetOverlaps(region *RegionInfo) []*RegionInfo { r.t.RLock() defer r.t.RUnlock() return r.tree.overlaps(®ionItem{RegionInfo: region}) diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index aaf440eeeea2..ec694a2ba980 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -162,12 +162,12 @@ func TestSortedEqual(t *testing.T) { Region: &metapb.Region{Id: 100, Peers: pickPeers(testCase.idsA)}, DownPeers: pickPeerStats(testCase.idsA), PendingPeers: pickPeers(testCase.idsA), - }) + }, 3) regionB := RegionFromHeartbeat(&pdpb.RegionHeartbeatRequest{ Region: &metapb.Region{Id: 100, Peers: pickPeers(testCase.idsB)}, DownPeers: pickPeerStats(testCase.idsB), PendingPeers: pickPeers(testCase.idsB), - }) + }, 3) re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetVoters(), regionB.GetVoters())) re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetVoters(), regionB.GetVoters())) re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetPendingPeers(), regionB.GetPendingPeers())) @@ -952,7 +952,7 @@ func BenchmarkRegionFromHeartbeat(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - RegionFromHeartbeat(regionReq) + RegionFromHeartbeat(regionReq, 3) } } diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index d4ef4a880fc8..0c7426e6ce73 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -104,7 +104,7 @@ func (t *regionTree) notFromStorageRegionsCount() int { } // GetOverlaps returns the range items that has some intersections with the given items. -func (t *regionTree) overlaps(item *regionItem) []*regionItem { +func (t *regionTree) overlaps(item *regionItem) []*RegionInfo { // note that Find() gets the last item that is less or equal than the item. // in the case: |_______a_______|_____b_____|___c___| // new item is |______d______| @@ -116,12 +116,12 @@ func (t *regionTree) overlaps(item *regionItem) []*regionItem { result = item } endKey := item.GetEndKey() - var overlaps []*regionItem + var overlaps []*RegionInfo t.tree.AscendGreaterOrEqual(result, func(i *regionItem) bool { if len(endKey) > 0 && bytes.Compare(endKey, i.GetStartKey()) <= 0 { return false } - overlaps = append(overlaps, i) + overlaps = append(overlaps, i.RegionInfo) return true }) return overlaps @@ -130,7 +130,7 @@ func (t *regionTree) overlaps(item *regionItem) []*regionItem { // update updates the tree with the region. // It finds and deletes all the overlapped regions first, and then // insert the region. -func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*regionItem) []*RegionInfo { +func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*RegionInfo) []*RegionInfo { region := item.RegionInfo t.totalSize += region.approximateSize regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate() @@ -145,7 +145,7 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re } for _, old := range overlaps { - t.tree.Delete(old) + t.tree.Delete(®ionItem{RegionInfo: old}) } t.tree.ReplaceOrInsert(item) if t.countRef { @@ -153,7 +153,7 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re } result := make([]*RegionInfo, len(overlaps)) for i, overlap := range overlaps { - old := overlap.RegionInfo + old := overlap result[i] = old log.Debug("overlapping region", zap.Uint64("region-id", old.GetID()), @@ -174,8 +174,8 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re return result } -// updateStat is used to update statistics when regionItem.RegionInfo is directly replaced. -func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) { +// UpdateStat is used to update statistics when RegionInfo is directly replaced. +func (t *regionTree) UpdateStat(origin *RegionInfo, region *RegionInfo) { t.totalSize += region.approximateSize regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate() t.totalWriteBytesRate += regionWriteBytesRate diff --git a/pkg/core/region_tree_test.go b/pkg/core/region_tree_test.go index 5886103191c7..2726b4fdab55 100644 --- a/pkg/core/region_tree_test.go +++ b/pkg/core/region_tree_test.go @@ -159,8 +159,8 @@ func TestRegionTree(t *testing.T) { updateNewItem(tree, regionA) updateNewItem(tree, regionC) re.Nil(tree.overlaps(newRegionItem([]byte("b"), []byte("c")))) - re.Equal(regionC, tree.overlaps(newRegionItem([]byte("c"), []byte("d")))[0].RegionInfo) - re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1].RegionInfo) + re.Equal(regionC, tree.overlaps(newRegionItem([]byte("c"), []byte("d")))[0]) + re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1]) re.Nil(tree.search([]byte{})) re.Equal(regionA, tree.search([]byte("a"))) re.Nil(tree.search([]byte("b"))) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index d711ab2d4f6e..7cecc0cb6d28 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -443,11 +443,23 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - c.hotStat.CheckReadAsync(statistics.NewCheckReadPeerTask(region, []*metapb.Peer{peer}, loads, interval)) + checkReadPeerTask := func(cache *statistics.HotPeerCache) { + stats := cache.CheckPeerFlow(region, []*metapb.Peer{peer}, loads, interval) + for _, stat := range stats { + cache.UpdateStat(stat) + } + } + c.hotStat.CheckReadAsync(checkReadPeerTask) } // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. - c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) + collectUnReportedPeerTask := func(cache *statistics.HotPeerCache) { + stats := cache.CheckColdPeer(storeID, regions, interval) + for _, stat := range stats { + cache.UpdateStat(stat) + } + } + c.hotStat.CheckReadAsync(collectUnReportedPeerTask) return nil } diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index 605ec73dad5e..842e876885c1 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -158,7 +158,8 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat s.hbStreams.BindStream(storeID, server) lastBind = time.Now() } - region := core.RegionFromHeartbeat(request) + // scheduling service doesn't sync the pd server config, so we use 0 here + region := core.RegionFromHeartbeat(request, 0) err = c.HandleRegionHeartbeat(region) if err != nil { // TODO: if we need to send the error back to API server. diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index 26548c8b47eb..3f076734a7b2 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -34,8 +34,8 @@ var ( // HotCache is a cache hold hot regions. type HotCache struct { ctx context.Context - writeCache *hotPeerCache - readCache *hotPeerCache + writeCache *HotPeerCache + readCache *HotPeerCache } // NewHotCache creates a new hot spot cache. @@ -51,7 +51,7 @@ func NewHotCache(ctx context.Context) *HotCache { } // CheckWriteAsync puts the flowItem into queue, and check it asynchronously -func (w *HotCache) CheckWriteAsync(task FlowItemTask) bool { +func (w *HotCache) CheckWriteAsync(task func(cache *HotPeerCache)) bool { if w.writeCache.taskQueue.Len() > chanMaxLength { return false } @@ -64,7 +64,7 @@ func (w *HotCache) CheckWriteAsync(task FlowItemTask) bool { } // CheckReadAsync puts the flowItem into queue, and check it asynchronously -func (w *HotCache) CheckReadAsync(task FlowItemTask) bool { +func (w *HotCache) CheckReadAsync(task func(cache *HotPeerCache)) bool { if w.readCache.taskQueue.Len() > chanMaxLength { return false } @@ -78,52 +78,86 @@ func (w *HotCache) CheckReadAsync(task FlowItemTask) bool { // RegionStats returns hot items according to kind func (w *HotCache) RegionStats(kind utils.RWType, minHotDegree int) map[uint64][]*HotPeerStat { - task := newCollectRegionStatsTask(minHotDegree) + ret := make(chan map[uint64][]*HotPeerStat, 1) + collectRegionStatsTask := func(cache *HotPeerCache) { + ret <- cache.RegionStats(minHotDegree) + } var succ bool switch kind { case utils.Write: - succ = w.CheckWriteAsync(task) + succ = w.CheckWriteAsync(collectRegionStatsTask) case utils.Read: - succ = w.CheckReadAsync(task) + succ = w.CheckReadAsync(collectRegionStatsTask) } if !succ { return nil } - return task.waitRet(w.ctx) + select { + case <-w.ctx.Done(): + return nil + case r := <-ret: + return r + } } // IsRegionHot checks if the region is hot. func (w *HotCache) IsRegionHot(region *core.RegionInfo, minHotDegree int) bool { - checkRegionHotWriteTask := newCheckRegionHotTask(region, minHotDegree) - checkRegionHotReadTask := newCheckRegionHotTask(region, minHotDegree) + retWrite := make(chan bool, 1) + retRead := make(chan bool, 1) + checkRegionHotWriteTask := func(cache *HotPeerCache) { + retWrite <- cache.isRegionHotWithAnyPeers(region, minHotDegree) + } + checkRegionHotReadTask := func(cache *HotPeerCache) { + retRead <- cache.isRegionHotWithAnyPeers(region, minHotDegree) + } succ1 := w.CheckWriteAsync(checkRegionHotWriteTask) succ2 := w.CheckReadAsync(checkRegionHotReadTask) if succ1 && succ2 { - return checkRegionHotWriteTask.waitRet(w.ctx) || checkRegionHotReadTask.waitRet(w.ctx) + select { + case <-w.ctx.Done(): + return false + case r := <-retWrite: + return r + case r := <-retRead: + return r + } } return false } // GetHotPeerStat returns hot peer stat with specified regionID and storeID. func (w *HotCache) GetHotPeerStat(kind utils.RWType, regionID, storeID uint64) *HotPeerStat { - task := newGetHotPeerStatTask(regionID, storeID) + ret := make(chan *HotPeerStat, 1) + getHotPeerStatTask := func(cache *HotPeerCache) { + ret <- cache.getHotPeerStat(regionID, storeID) + } + var succ bool switch kind { case utils.Read: - succ = w.CheckReadAsync(task) + succ = w.CheckReadAsync(getHotPeerStatTask) case utils.Write: - succ = w.CheckWriteAsync(task) + succ = w.CheckWriteAsync(getHotPeerStatTask) } if !succ { return nil } - return task.waitRet(w.ctx) + select { + case <-w.ctx.Done(): + return nil + case r := <-ret: + return r + } } // CollectMetrics collects the hot cache metrics. func (w *HotCache) CollectMetrics() { - w.CheckWriteAsync(newCollectMetricsTask()) - w.CheckReadAsync(newCollectMetricsTask()) + w.CheckWriteAsync(func(cache *HotPeerCache) { + cache.collectMetrics() + }) + w.CheckReadAsync(func(cache *HotPeerCache) { + cache.collectMetrics() + }) } // ResetHotCacheStatusMetrics resets the hot cache metrics. @@ -131,7 +165,7 @@ func ResetHotCacheStatusMetrics() { hotCacheStatusGauge.Reset() } -func (w *HotCache) updateItems(queue *chanx.UnboundedChan[FlowItemTask], runTask func(task FlowItemTask)) { +func (w *HotCache) updateItems(queue *chanx.UnboundedChan[func(*HotPeerCache)], runTask func(task func(*HotPeerCache))) { defer logutil.LogPanic() for { @@ -144,18 +178,18 @@ func (w *HotCache) updateItems(queue *chanx.UnboundedChan[FlowItemTask], runTask } } -func (w *HotCache) runReadTask(task FlowItemTask) { +func (w *HotCache) runReadTask(task func(cache *HotPeerCache)) { if task != nil { // TODO: do we need a run-task timeout to protect the queue won't be stuck by a task? - task.runTask(w.readCache) + task(w.readCache) readTaskMetrics.Set(float64(w.readCache.taskQueue.Len())) } } -func (w *HotCache) runWriteTask(task FlowItemTask) { +func (w *HotCache) runWriteTask(task func(cache *HotPeerCache)) { if task != nil { // TODO: do we need a run-task timeout to protect the queue won't be stuck by a task? - task.runTask(w.writeCache) + task(w.writeCache) writeTaskMetrics.Set(float64(w.writeCache.taskQueue.Len())) } } @@ -165,34 +199,34 @@ func (w *HotCache) runWriteTask(task FlowItemTask) { func (w *HotCache) Update(item *HotPeerStat, kind utils.RWType) { switch kind { case utils.Write: - w.writeCache.updateStat(item) + w.writeCache.UpdateStat(item) case utils.Read: - w.readCache.updateStat(item) + w.readCache.UpdateStat(item) } } // CheckWritePeerSync checks the write status, returns update items. // This is used for mockcluster, for test purpose. func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) []*HotPeerStat { - return w.writeCache.checkPeerFlow(region, peers, loads, interval) + return w.writeCache.CheckPeerFlow(region, peers, loads, interval) } // CheckReadPeerSync checks the read status, returns update items. // This is used for mockcluster, for test purpose. func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) []*HotPeerStat { - return w.readCache.checkPeerFlow(region, peers, loads, interval) + return w.readCache.CheckPeerFlow(region, peers, loads, interval) } // ExpiredReadItems returns the read items which are already expired. // This is used for mockcluster, for test purpose. func (w *HotCache) ExpiredReadItems(region *core.RegionInfo) []*HotPeerStat { - return w.readCache.collectExpiredItems(region) + return w.readCache.CollectExpiredItems(region) } // ExpiredWriteItems returns the write items which are already expired. // This is used for mockcluster, for test purpose. func (w *HotCache) ExpiredWriteItems(region *core.RegionInfo) []*HotPeerStat { - return w.writeCache.collectExpiredItems(region) + return w.writeCache.CollectExpiredItems(region) } // GetThresholds returns thresholds. diff --git a/pkg/statistics/hot_cache_task.go b/pkg/statistics/hot_cache_task.go deleted file mode 100644 index 01731f3fe4d5..000000000000 --- a/pkg/statistics/hot_cache_task.go +++ /dev/null @@ -1,204 +0,0 @@ -// Copyright 2021 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package statistics - -import ( - "context" - - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/tikv/pd/pkg/core" -) - -// FlowItemTask indicates the task in flowItem queue -type FlowItemTask interface { - runTask(cache *hotPeerCache) -} - -type checkReadPeerTask struct { - regionInfo *core.RegionInfo - peers []*metapb.Peer - loads []float64 - interval uint64 -} - -// NewCheckReadPeerTask creates task to update peerInfo -func NewCheckReadPeerTask(regionInfo *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) FlowItemTask { - return &checkReadPeerTask{ - regionInfo: regionInfo, - peers: peers, - loads: loads, - interval: interval, - } -} - -func (t *checkReadPeerTask) runTask(cache *hotPeerCache) { - stats := cache.checkPeerFlow(t.regionInfo, t.peers, t.loads, t.interval) - for _, stat := range stats { - cache.updateStat(stat) - } -} - -type checkWritePeerTask struct { - region *core.RegionInfo -} - -// NewCheckWritePeerTask creates task to update peerInfo -func NewCheckWritePeerTask(region *core.RegionInfo) FlowItemTask { - return &checkWritePeerTask{ - region: region, - } -} - -func (t *checkWritePeerTask) runTask(cache *hotPeerCache) { - reportInterval := t.region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - stats := cache.checkPeerFlow(t.region, t.region.GetPeers(), t.region.GetWriteLoads(), interval) - for _, stat := range stats { - cache.updateStat(stat) - } -} - -type checkExpiredTask struct { - region *core.RegionInfo -} - -// NewCheckExpiredItemTask creates task to collect expired items -func NewCheckExpiredItemTask(region *core.RegionInfo) FlowItemTask { - return &checkExpiredTask{ - region: region, - } -} - -func (t *checkExpiredTask) runTask(cache *hotPeerCache) { - expiredStats := cache.collectExpiredItems(t.region) - for _, stat := range expiredStats { - cache.updateStat(stat) - } -} - -type collectUnReportedPeerTask struct { - storeID uint64 - regions map[uint64]*core.RegionInfo - interval uint64 -} - -// NewCollectUnReportedPeerTask creates task to collect unreported peers -func NewCollectUnReportedPeerTask(storeID uint64, regions map[uint64]*core.RegionInfo, interval uint64) FlowItemTask { - return &collectUnReportedPeerTask{ - storeID: storeID, - regions: regions, - interval: interval, - } -} - -func (t *collectUnReportedPeerTask) runTask(cache *hotPeerCache) { - stats := cache.checkColdPeer(t.storeID, t.regions, t.interval) - for _, stat := range stats { - cache.updateStat(stat) - } -} - -type collectRegionStatsTask struct { - minDegree int - ret chan map[uint64][]*HotPeerStat -} - -func newCollectRegionStatsTask(minDegree int) *collectRegionStatsTask { - return &collectRegionStatsTask{ - minDegree: minDegree, - ret: make(chan map[uint64][]*HotPeerStat, 1), - } -} - -func (t *collectRegionStatsTask) runTask(cache *hotPeerCache) { - t.ret <- cache.RegionStats(t.minDegree) -} - -// TODO: do we need a wait-return timeout? -func (t *collectRegionStatsTask) waitRet(ctx context.Context) map[uint64][]*HotPeerStat { - select { - case <-ctx.Done(): - return nil - case ret := <-t.ret: - return ret - } -} - -type checkRegionHotTask struct { - region *core.RegionInfo - minHotDegree int - ret chan bool -} - -func newCheckRegionHotTask(region *core.RegionInfo, minDegree int) *checkRegionHotTask { - return &checkRegionHotTask{ - region: region, - minHotDegree: minDegree, - ret: make(chan bool, 1), - } -} - -func (t *checkRegionHotTask) runTask(cache *hotPeerCache) { - t.ret <- cache.isRegionHotWithAnyPeers(t.region, t.minHotDegree) -} - -// TODO: do we need a wait-return timeout? -func (t *checkRegionHotTask) waitRet(ctx context.Context) bool { - select { - case <-ctx.Done(): - return false - case r := <-t.ret: - return r - } -} - -type collectMetricsTask struct { -} - -func newCollectMetricsTask() *collectMetricsTask { - return &collectMetricsTask{} -} - -func (*collectMetricsTask) runTask(cache *hotPeerCache) { - cache.collectMetrics() -} - -type getHotPeerStatTask struct { - regionID uint64 - storeID uint64 - ret chan *HotPeerStat -} - -func newGetHotPeerStatTask(regionID, storeID uint64) *getHotPeerStatTask { - return &getHotPeerStatTask{ - regionID: regionID, - storeID: storeID, - ret: make(chan *HotPeerStat, 1), - } -} - -func (t *getHotPeerStatTask) runTask(cache *hotPeerCache) { - t.ret <- cache.getHotPeerStat(t.regionID, t.storeID) -} - -// TODO: do we need a wait-return timeout? -func (t *getHotPeerStatTask) waitRet(ctx context.Context) *HotPeerStat { - select { - case <-ctx.Done(): - return nil - case r := <-t.ret: - return r - } -} diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 3a3d3519bd96..4db0c304bb95 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -57,27 +57,27 @@ type thresholds struct { metrics [utils.DimLen + 1]prometheus.Gauge // 0 is for byte, 1 is for key, 2 is for query, 3 is for total length. } -// hotPeerCache saves the hot peer's statistics. -type hotPeerCache struct { +// HotPeerCache saves the hot peer's statistics. +type HotPeerCache struct { kind utils.RWType peersOfStore map[uint64]*utils.TopN // storeID -> hot peers storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs topNTTL time.Duration - taskQueue *chanx.UnboundedChan[FlowItemTask] + taskQueue *chanx.UnboundedChan[func(*HotPeerCache)] thresholdsOfStore map[uint64]*thresholds // storeID -> thresholds metrics map[uint64][utils.ActionTypeLen]prometheus.Gauge // storeID -> metrics // TODO: consider to remove store info when store is offline. } -// NewHotPeerCache creates a hotPeerCache -func NewHotPeerCache(ctx context.Context, kind utils.RWType) *hotPeerCache { - return &hotPeerCache{ +// NewHotPeerCache creates a HotPeerCache +func NewHotPeerCache(ctx context.Context, kind utils.RWType) *HotPeerCache { + return &HotPeerCache{ kind: kind, peersOfStore: make(map[uint64]*utils.TopN), storesOfRegion: make(map[uint64]map[uint64]struct{}), regionsOfStore: make(map[uint64]map[uint64]struct{}), - taskQueue: chanx.NewUnboundedChan[FlowItemTask](ctx, queueCap), + taskQueue: chanx.NewUnboundedChan[func(*HotPeerCache)](ctx, queueCap), thresholdsOfStore: make(map[uint64]*thresholds), topNTTL: time.Duration(3*kind.ReportInterval()) * time.Second, metrics: make(map[uint64][utils.ActionTypeLen]prometheus.Gauge), @@ -86,7 +86,7 @@ func NewHotPeerCache(ctx context.Context, kind utils.RWType) *hotPeerCache { // TODO: rename RegionStats as PeerStats // RegionStats returns hot items -func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { +func (f *HotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { res := make(map[uint64][]*HotPeerStat) defaultAntiCount := f.kind.DefaultAntiCount() for storeID, peers := range f.peersOfStore { @@ -102,7 +102,7 @@ func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { return res } -func (f *hotPeerCache) updateStat(item *HotPeerStat) { +func (f *HotPeerCache) UpdateStat(item *HotPeerStat) { switch item.actionType { case utils.Remove: f.removeItem(item) @@ -116,7 +116,7 @@ func (f *hotPeerCache) updateStat(item *HotPeerStat) { f.incMetrics(item.actionType, item.StoreID) } -func (f *hotPeerCache) incMetrics(action utils.ActionType, storeID uint64) { +func (f *HotPeerCache) incMetrics(action utils.ActionType, storeID uint64) { if _, ok := f.metrics[storeID]; !ok { store := storeTag(storeID) kind := f.kind.String() @@ -129,7 +129,7 @@ func (f *hotPeerCache) incMetrics(action utils.ActionType, storeID uint64) { f.metrics[storeID][action].Inc() } -func (f *hotPeerCache) collectPeerMetrics(loads []float64, interval uint64) { +func (f *HotPeerCache) collectPeerMetrics(loads []float64, interval uint64) { regionHeartbeatIntervalHist.Observe(float64(interval)) if interval == 0 { return @@ -153,8 +153,8 @@ func (f *hotPeerCache) collectPeerMetrics(loads []float64, interval uint64) { } } -// collectExpiredItems collects expired items, mark them as needDelete and puts them into inherit items -func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerStat { +// CollectExpiredItems collects expired items, mark them as needDelete and puts them into inherit items +func (f *HotPeerCache) CollectExpiredItems(region *core.RegionInfo) []*HotPeerStat { regionID := region.GetID() items := make([]*HotPeerStat, 0) if ids, ok := f.storesOfRegion[regionID]; ok { @@ -171,10 +171,10 @@ func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerSt return items } -// checkPeerFlow checks the flow information of a peer. -// Notice: checkPeerFlow couldn't be used concurrently. -// checkPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. -func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64, interval uint64) []*HotPeerStat { +// CheckPeerFlow checks the flow information of a peer. +// Notice: CheckPeerFlow couldn't be used concurrently. +// CheckPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. +func (f *HotPeerCache) CheckPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64, interval uint64) []*HotPeerStat { if Denoising && interval < HotRegionReportMinInterval { // for test or simulator purpose return nil } @@ -231,8 +231,8 @@ func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Pe return stats } -// checkColdPeer checks the collect the un-heartbeat peer and maintain it. -func (f *hotPeerCache) checkColdPeer(storeID uint64, reportRegions map[uint64]*core.RegionInfo, interval uint64) (ret []*HotPeerStat) { +// CheckColdPeer checks the collect the un-heartbeat peer and maintain it. +func (f *HotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]*core.RegionInfo, interval uint64) (ret []*HotPeerStat) { // for test or simulator purpose if Denoising && interval < HotRegionReportMinInterval { return @@ -278,7 +278,7 @@ func (f *hotPeerCache) checkColdPeer(storeID uint64, reportRegions map[uint64]*c return } -func (f *hotPeerCache) collectMetrics() { +func (f *HotPeerCache) collectMetrics() { for _, thresholds := range f.thresholdsOfStore { thresholds.metrics[utils.ByteDim].Set(thresholds.rates[utils.ByteDim]) thresholds.metrics[utils.KeyDim].Set(thresholds.rates[utils.KeyDim]) @@ -287,7 +287,7 @@ func (f *hotPeerCache) collectMetrics() { } } -func (f *hotPeerCache) getOldHotPeerStat(regionID, storeID uint64) *HotPeerStat { +func (f *HotPeerCache) getOldHotPeerStat(regionID, storeID uint64) *HotPeerStat { if hotPeers, ok := f.peersOfStore[storeID]; ok { if v := hotPeers.Get(regionID); v != nil { return v.(*HotPeerStat) @@ -296,7 +296,7 @@ func (f *hotPeerCache) getOldHotPeerStat(regionID, storeID uint64) *HotPeerStat return nil } -func (f *hotPeerCache) calcHotThresholds(storeID uint64) []float64 { +func (f *HotPeerCache) calcHotThresholds(storeID uint64) []float64 { // check whether the thresholds is updated recently t, ok := f.thresholdsOfStore[storeID] if ok && time.Since(t.updatedTime) <= ThresholdsUpdateInterval { @@ -336,7 +336,7 @@ func (f *hotPeerCache) calcHotThresholds(storeID uint64) []float64 { } // gets the storeIDs, including old region and new region -func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) []uint64 { +func (f *HotPeerCache) getAllStoreIDs(region *core.RegionInfo) []uint64 { regionPeers := region.GetPeers() ret := make([]uint64, 0, len(regionPeers)) isInSlice := func(id uint64) bool { @@ -364,7 +364,7 @@ func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) []uint64 { return ret } -func (f *hotPeerCache) isOldColdPeer(oldItem *HotPeerStat, storeID uint64) bool { +func (f *HotPeerCache) isOldColdPeer(oldItem *HotPeerStat, storeID uint64) bool { isOldPeer := func() bool { for _, id := range oldItem.stores { if id == storeID { @@ -384,7 +384,7 @@ func (f *hotPeerCache) isOldColdPeer(oldItem *HotPeerStat, storeID uint64) bool return isOldPeer() && !isInHotCache() } -func (f *hotPeerCache) justTransferLeader(region *core.RegionInfo, oldItem *HotPeerStat) bool { +func (f *HotPeerCache) justTransferLeader(region *core.RegionInfo, oldItem *HotPeerStat) bool { if region == nil { return false } @@ -406,7 +406,7 @@ func (f *hotPeerCache) justTransferLeader(region *core.RegionInfo, oldItem *HotP return false } -func (f *hotPeerCache) isRegionHotWithAnyPeers(region *core.RegionInfo, hotDegree int) bool { +func (f *HotPeerCache) isRegionHotWithAnyPeers(region *core.RegionInfo, hotDegree int) bool { for _, peer := range region.GetPeers() { if f.isRegionHotWithPeer(region, peer, hotDegree) { return true @@ -415,7 +415,7 @@ func (f *hotPeerCache) isRegionHotWithAnyPeers(region *core.RegionInfo, hotDegre return false } -func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb.Peer, hotDegree int) bool { +func (f *HotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb.Peer, hotDegree int) bool { if peer == nil { return false } @@ -425,7 +425,7 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb return false } -func (f *hotPeerCache) getHotPeerStat(regionID, storeID uint64) *HotPeerStat { +func (f *HotPeerCache) getHotPeerStat(regionID, storeID uint64) *HotPeerStat { if peers, ok := f.peersOfStore[storeID]; ok { if stat := peers.Get(regionID); stat != nil { return stat.(*HotPeerStat) @@ -434,7 +434,7 @@ func (f *hotPeerCache) getHotPeerStat(regionID, storeID uint64) *HotPeerStat { return nil } -func (f *hotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration, source utils.SourceKind) *HotPeerStat { +func (f *HotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration, source utils.SourceKind) *HotPeerStat { regionStats := f.kind.RegionStats() if source == utils.Inherit { @@ -495,7 +495,7 @@ func (f *hotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldIt return newItem } -func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { +func (f *HotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { regionStats := f.kind.RegionStats() // interval is not 0 which is guaranteed by the caller. if interval.Seconds() >= float64(f.kind.ReportInterval()) { @@ -514,7 +514,7 @@ func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []f return newItem } -func (f *hotPeerCache) putItem(item *HotPeerStat) { +func (f *HotPeerCache) putItem(item *HotPeerStat) { peers, ok := f.peersOfStore[item.StoreID] if !ok { peers = utils.NewTopN(utils.DimLen, TopNN, f.topNTTL) @@ -535,7 +535,7 @@ func (f *hotPeerCache) putItem(item *HotPeerStat) { regions[item.RegionID] = struct{}{} } -func (f *hotPeerCache) removeItem(item *HotPeerStat) { +func (f *HotPeerCache) removeItem(item *HotPeerStat) { if peers, ok := f.peersOfStore[item.StoreID]; ok { peers.Remove(item.RegionID) } @@ -549,12 +549,12 @@ func (f *hotPeerCache) removeItem(item *HotPeerStat) { // removeAllItem removes all items of the cache. // It is used for test. -func (f *hotPeerCache) removeAllItem() { +func (f *HotPeerCache) removeAllItem() { for _, peers := range f.peersOfStore { for _, peer := range peers.GetAll() { item := peer.(*HotPeerStat) item.actionType = utils.Remove - f.updateStat(item) + f.UpdateStat(item) } } } @@ -590,7 +590,7 @@ func inheritItem(newItem, oldItem *HotPeerStat) { newItem.AntiCount = oldItem.AntiCount } -func (f *hotPeerCache) interval() time.Duration { +func (f *HotPeerCache) interval() time.Duration { return time.Duration(f.kind.ReportInterval()) * time.Second } diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index c116e020f544..ef524b76390a 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -93,7 +93,7 @@ func TestCache(t *testing.T) { } } -func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer { +func orderingPeers(cache *HotPeerCache, region *core.RegionInfo) []*metapb.Peer { var peers []*metapb.Peer for _, peer := range region.GetPeers() { if cache.getOldHotPeerStat(region.GetID(), peer.StoreId) != nil { @@ -105,23 +105,23 @@ func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer return peers } -func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) { +func checkFlow(cache *HotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) { reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - res = append(res, cache.collectExpiredItems(region)...) - return append(res, cache.checkPeerFlow(region, peers, region.GetLoads(), interval)...) + res = append(res, cache.CollectExpiredItems(region)...) + return append(res, cache.CheckPeerFlow(region, peers, region.GetLoads(), interval)...) } -func updateFlow(cache *hotPeerCache, res []*HotPeerStat) []*HotPeerStat { +func updateFlow(cache *HotPeerCache, res []*HotPeerStat) []*HotPeerStat { for _, p := range res { - cache.updateStat(p) + cache.UpdateStat(p) } return res } -type check func(re *require.Assertions, cache *hotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) +type check func(re *require.Assertions, cache *HotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) -func checkAndUpdate(re *require.Assertions, cache *hotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) { +func checkAndUpdate(re *require.Assertions, cache *HotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) { res = checkFlow(cache, region, region.GetPeers()) if len(expect) != 0 { re.Len(res, expect[0]) @@ -131,7 +131,7 @@ func checkAndUpdate(re *require.Assertions, cache *hotPeerCache, region *core.Re // Check and update peers in the specified order that old item that he items that have not expired come first, and the items that have expired come second. // This order is also similar to the previous version. By the way the order in now version is random. -func checkAndUpdateWithOrdering(re *require.Assertions, cache *hotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) { +func checkAndUpdateWithOrdering(re *require.Assertions, cache *HotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) { res = checkFlow(cache, region, orderingPeers(cache, region)) if len(expect) != 0 { re.Len(res, expect[0]) @@ -139,7 +139,7 @@ func checkAndUpdateWithOrdering(re *require.Assertions, cache *hotPeerCache, reg return updateFlow(cache, res) } -func checkAndUpdateSkipOne(re *require.Assertions, cache *hotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) { +func checkAndUpdateSkipOne(re *require.Assertions, cache *HotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) { res = checkFlow(cache, region, region.GetPeers()[1:]) if len(expect) != 0 { re.Len(res, expect[0]) @@ -147,7 +147,7 @@ func checkAndUpdateSkipOne(re *require.Assertions, cache *hotPeerCache, region * return updateFlow(cache, res) } -func checkHit(re *require.Assertions, cache *hotPeerCache, region *core.RegionInfo, kind utils.RWType, actionType utils.ActionType) { +func checkHit(re *require.Assertions, cache *HotPeerCache, region *core.RegionInfo, kind utils.RWType, actionType utils.ActionType) { var peers []*metapb.Peer if kind == utils.Read { peers = []*metapb.Peer{region.GetLeader()} @@ -171,7 +171,7 @@ func checkOp(re *require.Assertions, ret []*HotPeerStat, storeID uint64, actionT } // checkIntervalSum checks whether the interval sum of the peers are different. -func checkIntervalSum(cache *hotPeerCache, region *core.RegionInfo) bool { +func checkIntervalSum(cache *HotPeerCache, region *core.RegionInfo) bool { var intervalSums []int for _, peer := range region.GetPeers() { oldItem := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) @@ -317,7 +317,7 @@ func TestUpdateHotPeerStat(t *testing.T) { utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 - newItem := cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) + newItem := cache.CheckPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Nil(newItem) // new peer, interval is larger than report interval, but no hot @@ -326,7 +326,7 @@ func TestUpdateHotPeerStat(t *testing.T) { utils.MinHotThresholds[utils.RegionReadBytes] = 1.0 utils.MinHotThresholds[utils.RegionReadKeys] = 1.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 1.0 - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) + newItem = cache.CheckPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Empty(newItem) // new peer, interval is less than report interval @@ -335,45 +335,45 @@ func TestUpdateHotPeerStat(t *testing.T) { utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) + newItem = cache.CheckPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.NotNil(newItem) re.Equal(0, newItem[0].HotDegree) re.Equal(0, newItem[0].AntiCount) // sum of interval is less than report interval deltaLoads = []float64{60.0, 60.0, 60.0} - cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) + cache.UpdateStat(newItem[0]) + newItem = cache.CheckPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(0, newItem[0].HotDegree) re.Equal(0, newItem[0].AntiCount) // sum of interval is larger than report interval, and hot newItem[0].AntiCount = utils.Read.DefaultAntiCount() - cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) + cache.UpdateStat(newItem[0]) + newItem = cache.CheckPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(1, newItem[0].HotDegree) re.Equal(2*m, newItem[0].AntiCount) // sum of interval is less than report interval - cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) + cache.UpdateStat(newItem[0]) + newItem = cache.CheckPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(1, newItem[0].HotDegree) re.Equal(2*m, newItem[0].AntiCount) // sum of interval is larger than report interval, and hot interval = 10 - cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) + cache.UpdateStat(newItem[0]) + newItem = cache.CheckPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(2, newItem[0].HotDegree) re.Equal(2*m, newItem[0].AntiCount) // sum of interval is larger than report interval, and cold utils.MinHotThresholds[utils.RegionReadBytes] = 10.0 utils.MinHotThresholds[utils.RegionReadKeys] = 10.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 10.0 - cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) + cache.UpdateStat(newItem[0]) + newItem = cache.CheckPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) re.Equal(1, newItem[0].HotDegree) re.Equal(2*m-1, newItem[0].AntiCount) // sum of interval is larger than report interval, and cold for i := 0; i < 2*m-1; i++ { - cache.updateStat(newItem[0]) - newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) + cache.UpdateStat(newItem[0]) + newItem = cache.CheckPeerFlow(region, []*metapb.Peer{peer}, deltaLoads, interval) } re.Less(newItem[0].HotDegree, 0) re.Equal(0, newItem[0].AntiCount) @@ -422,7 +422,7 @@ func testMetrics(re *require.Assertions, interval, byteRate, expectThreshold flo } else { item = cache.updateHotPeerStat(nil, newItem, oldItem, loads, time.Duration(interval)*time.Second, utils.Direct) } - cache.updateStat(item) + cache.UpdateStat(item) } thresholds := cache.calcHotThresholds(storeID) if i < TopNN { @@ -521,7 +521,7 @@ func TestRemoveFromCacheRandom(t *testing.T) { } } -func checkCoolDown(re *require.Assertions, cache *hotPeerCache, region *core.RegionInfo, expect bool) { +func checkCoolDown(re *require.Assertions, cache *HotPeerCache, region *core.RegionInfo, expect bool) { item := cache.getOldHotPeerStat(region.GetID(), region.GetLeader().GetStoreId()) re.Equal(expect, item.IsNeedCoolDownTransferLeader(3, cache.kind)) } @@ -680,9 +680,9 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { StartTimestamp: start, EndTimestamp: end, })) - stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), newRegion.GetLoads(), end-start) + stats := cache.CheckPeerFlow(newRegion, newRegion.GetPeers(), newRegion.GetLoads(), end-start) for _, stat := range stats { - cache.updateStat(stat) + cache.UpdateStat(stat) } } if ThresholdsUpdateInterval == 0 { @@ -710,9 +710,9 @@ func BenchmarkCheckRegionFlow(b *testing.B) { region := buildRegion(utils.Read, 3, 10) b.ResetTimer() for i := 0; i < b.N; i++ { - stats := cache.checkPeerFlow(region, region.GetPeers(), region.GetLoads(), 10) + stats := cache.CheckPeerFlow(region, region.GetPeers(), region.GetLoads(), 10) for _, stat := range stats { - cache.updateStat(stat) + cache.UpdateStat(stat) } } } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 057814b718bc..9f316073e1af 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -959,7 +959,13 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - c.hotStat.CheckReadAsync(statistics.NewCheckReadPeerTask(region, []*metapb.Peer{peer}, loads, interval)) + checkReadPeerTask := func(cache *statistics.HotPeerCache) { + stats := cache.CheckPeerFlow(region, []*metapb.Peer{peer}, loads, interval) + for _, stat := range stats { + cache.UpdateStat(stat) + } + } + c.hotStat.CheckReadAsync(checkReadPeerTask) } } for _, stat := range stats.GetSnapshotStats() { @@ -982,7 +988,13 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest } if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. - c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) + collectUnReportedPeerTask := func(cache *statistics.HotPeerCache) { + stats := cache.CheckColdPeer(storeID, regions, interval) + for _, stat := range stats { + cache.UpdateStat(stat) + } + } + c.hotStat.CheckReadAsync(collectUnReportedPeerTask) } return nil } @@ -1082,7 +1094,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // check its validation again here. // // However, it can't solve the race condition of concurrent heartbeats from the same region. - if overlaps, err = c.core.CheckAndPutRootTree(ctx, region); err != nil { + if _, err = c.core.CheckAndPutRootTree(ctx, region); err != nil { tracer.OnSaveCacheFinished() return err } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 0f08153c8ae7..dcd430a54c95 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -32,8 +32,8 @@ import ( "github.com/pingcap/kvproto/pkg/eraftpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" @@ -55,6 +55,7 @@ import ( "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/operatorutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -3732,7 +3733,7 @@ func waitNoResponse(re *require.Assertions, stream mockhbstream.HeartbeatStream) }) } -func BenchmarkHandleStatsAsync(b *testing.B) { +func BenchmarkHandleRegionHeartbeat(b *testing.B) { // Setup: create a new instance of Cluster ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -3741,24 +3742,51 @@ func BenchmarkHandleStatsAsync(b *testing.B) { c := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) c.coordinator = schedule.NewCoordinator(ctx, c, nil) c.SetPrepared() - region := core.NewRegionInfo(&metapb.Region{ - Id: 1, - RegionEpoch: &metapb.RegionEpoch{ - ConfVer: 1, - Version: 1, - }, - StartKey: []byte{byte(2)}, - EndKey: []byte{byte(3)}, - Peers: []*metapb.Peer{{Id: 11, StoreId: uint64(1)}}, - }, nil, - core.SetApproximateSize(10), - core.SetReportInterval(0, 10), - ) + log.SetLevel(logutil.StringToZapLogLevel("fatal")) + peers := []*metapb.Peer{ + {Id: 11, StoreId: 1}, + {Id: 22, StoreId: 2}, + {Id: 33, StoreId: 3}, + } + queryStats := &pdpb.QueryStats{ + Get: 5, + Coprocessor: 6, + Scan: 7, + Put: 8, + Delete: 9, + DeleteRange: 10, + AcquirePessimisticLock: 11, + Rollback: 12, + Prewrite: 13, + Commit: 14, + } + interval := &pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10} + downPeers := []*pdpb.PeerStats{{Peer: peers[1], DownSeconds: 100}, {Peer: peers[2], DownSeconds: 100}} + pendingPeers := []*metapb.Peer{peers[1], peers[2]} + request := &pdpb.RegionHeartbeatRequest{ + Region: &metapb.Region{Id: 10, Peers: peers, StartKey: []byte("a"), EndKey: []byte("b")}, + Leader: peers[0], + DownPeers: downPeers, + PendingPeers: pendingPeers, + BytesWritten: 10, + BytesRead: 20, + KeysWritten: 100, + KeysRead: 200, + ApproximateSize: 30 * units.MiB, + ApproximateKeys: 300, + Interval: interval, + QueryStats: queryStats, + Term: 1, + CpuUsage: 100, + } + + flowRound := opt.GetPDServerConfig().FlowRoundByDigit // Reset timer after setup b.ResetTimer() // Run HandleStatsAsync b.N times for i := 0; i < b.N; i++ { - cluster.HandleStatsAsync(c, region) + region := core.RegionFromHeartbeat(request, flowRound) + c.HandleRegionHeartbeat(region) } } diff --git a/server/grpc_service.go b/server/grpc_service.go index 2b3ee232686b..ba4235829ca5 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1169,7 +1169,7 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error { func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { var ( server = &heartbeatServer{stream: stream} - flowRoundOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) + flowRound = s.persistOptions.GetPDServerConfig().FlowRoundByDigit cancel context.CancelFunc lastBind time.Time errCh chan error @@ -1264,11 +1264,11 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc() s.hbStreams.BindStream(storeID, server) // refresh FlowRoundByDigit - flowRoundOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) + flowRound = s.persistOptions.GetPDServerConfig().FlowRoundByDigit lastBind = time.Now() } - region := core.RegionFromHeartbeat(request, flowRoundOption) + region := core.RegionFromHeartbeat(request, flowRound) if region.GetLeader() == nil { log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil)) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc() diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 61a4561c55a7..5b3f55c099e9 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -1333,22 +1333,22 @@ func TestStaleTermHeartbeat(t *testing.T) { Term: 5, ApproximateSize: 10, } - - region := core.RegionFromHeartbeat(regionReq) + flowRound := leaderServer.GetConfig().PDServerCfg.FlowRoundByDigit + region := core.RegionFromHeartbeat(regionReq, flowRound) err = rc.HandleRegionHeartbeat(region) re.NoError(err) // Transfer leader regionReq.Term = 6 regionReq.Leader = peers[1] - region = core.RegionFromHeartbeat(regionReq) + region = core.RegionFromHeartbeat(regionReq, flowRound) err = rc.HandleRegionHeartbeat(region) re.NoError(err) // issue #3379 regionReq.KeysWritten = uint64(18446744073709551615) // -1 regionReq.BytesWritten = uint64(18446744073709550602) // -1024 - region = core.RegionFromHeartbeat(regionReq) + region = core.RegionFromHeartbeat(regionReq, flowRound) re.Equal(uint64(0), region.GetKeysWritten()) re.Equal(uint64(0), region.GetBytesWritten()) err = rc.HandleRegionHeartbeat(region) @@ -1357,14 +1357,14 @@ func TestStaleTermHeartbeat(t *testing.T) { // Stale heartbeat, update check should fail regionReq.Term = 5 regionReq.Leader = peers[0] - region = core.RegionFromHeartbeat(regionReq) + region = core.RegionFromHeartbeat(regionReq, flowRound) err = rc.HandleRegionHeartbeat(region) re.Error(err) // Allow regions that are created by unsafe recover to send a heartbeat, even though they // are considered "stale" because their conf ver and version are both equal to 1. regionReq.Region.RegionEpoch.ConfVer = 1 - region = core.RegionFromHeartbeat(regionReq) + region = core.RegionFromHeartbeat(regionReq, flowRound) err = rc.HandleRegionHeartbeat(region) re.NoError(err) } diff --git a/tools/pd-ctl/tests/hot/hot_test.go b/tools/pd-ctl/tests/hot/hot_test.go index f65b811b36a8..dea49a1ffdd0 100644 --- a/tools/pd-ctl/tests/hot/hot_test.go +++ b/tools/pd-ctl/tests/hot/hot_test.go @@ -191,7 +191,13 @@ func (suite *hotTestSuite) checkHot(cluster *pdTests.TestCluster) { region := core.NewRegionInfo(&metapb.Region{ Id: hotRegionID, }, leader) - hotStat.CheckReadAsync(statistics.NewCheckReadPeerTask(region, []*metapb.Peer{leader}, loads, reportInterval)) + checkReadPeerTask := func(cache *statistics.HotPeerCache) { + stats := cache.CheckPeerFlow(region, []*metapb.Peer{leader}, loads, reportInterval) + for _, stat := range stats { + cache.UpdateStat(stat) + } + } + hotStat.CheckReadAsync(checkReadPeerTask) testutil.Eventually(re, func() bool { hotPeerStat := getHotPeerStat(utils.Read, hotRegionID, hotStoreID) return hotPeerStat != nil