Skip to content

Commit

Permalink
reduce handle region heartbeat memory
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 28, 2024
1 parent 9d580d0 commit 89eb56d
Show file tree
Hide file tree
Showing 16 changed files with 293 additions and 404 deletions.
22 changes: 19 additions & 3 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,25 @@ type Cluster interface {

// HandleStatsAsync handles the flow asynchronously.
func HandleStatsAsync(c Cluster, region *core.RegionInfo) {
c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckWriteAsync(statistics.NewCheckWritePeerTask(region))
checkWritePeerTask := func(cache *statistics.HotPeerCache) {
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
stats := cache.CheckPeerFlow(region, region.GetPeers(), region.GetWriteLoads(), interval)
for _, stat := range stats {
cache.UpdateStat(stat)
}
}

checkExpiredTask := func(cache *statistics.HotPeerCache) {
expiredStats := cache.CollectExpiredItems(region)
for _, stat := range expiredStats {
cache.UpdateStat(stat)
}
}

c.GetHotStat().CheckWriteAsync(checkExpiredTask)
c.GetHotStat().CheckReadAsync(checkExpiredTask)
c.GetHotStat().CheckWriteAsync(checkWritePeerTask)
c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region)
}

Expand Down
96 changes: 40 additions & 56 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error {
// the properties are Read-Only once created except buckets.
// the `buckets` could be modified by the request `report buckets` with greater version.
type RegionInfo struct {
term uint64
meta *metapb.Region
learners []*metapb.Peer
witnesses []*metapb.Peer
voters []*metapb.Peer
leader *metapb.Peer
downPeers []*pdpb.PeerStats
pendingPeers []*metapb.Peer
term uint64
cpuUsage uint64
writtenBytes uint64
writtenKeys uint64
Expand Down Expand Up @@ -137,26 +137,23 @@ func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCre

// classifyVoterAndLearner sorts out voter and learner from peers into different slice.
func classifyVoterAndLearner(region *RegionInfo) {
learners := make([]*metapb.Peer, 0, 1)
voters := make([]*metapb.Peer, 0, len(region.meta.Peers))
witnesses := make([]*metapb.Peer, 0, 1)
// Reset slices
region.learners = region.learners[:0]
region.voters = region.voters[:0]
region.witnesses = region.witnesses[:0]
for _, p := range region.meta.Peers {
if IsLearner(p) {
learners = append(learners, p)
region.learners = append(region.learners, p)
} else {
voters = append(voters, p)
region.voters = append(region.voters, p)
}
// Whichever peer role can be a witness
if IsWitness(p) {
witnesses = append(witnesses, p)
region.witnesses = append(region.witnesses, p)
}
}
sort.Sort(peerSlice(learners))
sort.Sort(peerSlice(voters))
sort.Sort(peerSlice(witnesses))
region.learners = learners
region.voters = voters
region.witnesses = witnesses
sort.Sort(peerSlice(region.learners))
sort.Sort(peerSlice(region.voters))
sort.Sort(peerSlice(region.witnesses))
}

// peersEqualTo returns true when the peers are not changed, which may caused by: the region leader not changed,
Expand Down Expand Up @@ -214,7 +211,7 @@ type RegionHeartbeatRequest interface {
}

// RegionFromHeartbeat constructs a Region from region heartbeat.
func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, flowRound int) *RegionInfo {
// Convert unit to MB.
// If region isn't empty and less than 1MB, use 1MB instead.
// The size of empty region will be correct by the previous RegionInfo.
Expand All @@ -224,20 +221,21 @@ func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateO
}

region := &RegionInfo{
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
queryStats: heartbeat.GetQueryStats(),
source: Heartbeat,
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
queryStats: heartbeat.GetQueryStats(),
source: Heartbeat,
flowRoundDivisor: uint64(flowRound),
}

// scheduling service doesn't need the following fields.
Expand All @@ -247,10 +245,6 @@ func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateO
region.cpuUsage = h.GetCpuUsage()
}

for _, opt := range opts {
opt(region)
}

if region.writtenKeys >= ImpossibleFlowSize || region.writtenBytes >= ImpossibleFlowSize {
region.writtenKeys = 0
region.writtenBytes = 0
Expand Down Expand Up @@ -957,11 +951,11 @@ func (r *RegionsInfo) getRegionLocked(regionID uint64) *RegionInfo {
func (r *RegionsInfo) CheckAndPutRegion(region *RegionInfo) []*RegionInfo {
r.t.Lock()
origin := r.getRegionLocked(region.GetID())
var ols []*regionItem
var ols []*RegionInfo
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
err := check(region, origin, convertItemsToRegions(ols))
err := check(region, origin, ols)
if err != nil {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
// return the state region to delete.
Expand All @@ -988,25 +982,17 @@ func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*Reg
return origin, overlaps, err
}

func convertItemsToRegions(items []*regionItem) []*RegionInfo {
regions := make([]*RegionInfo, 0, len(items))
for _, item := range items {
regions = append(regions, item.RegionInfo)
}
return regions
}

// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) {
tracer := ctx.Tracer
r.t.Lock()
var ols []*regionItem
var ols []*RegionInfo
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
err := check(region, origin, ols)
if err != nil {
r.t.Unlock()
tracer.OnValidateRegionFinished()
Expand All @@ -1026,13 +1012,13 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *R
func (r *RegionsInfo) CheckAndPutRootTree(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) {
tracer := ctx.Tracer
r.t.Lock()
var ols []*regionItem
var ols []*RegionInfo
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
err := check(region, origin, ols)
if err != nil {
r.t.Unlock()
tracer.OnValidateRegionFinished()
Expand Down Expand Up @@ -1123,7 +1109,7 @@ func (r *RegionsInfo) updateSubTreeLocked(rangeChanged bool, overlaps []*RegionI
if len(overlaps) == 0 {
// If the range has changed but the overlapped regions are not provided, collect them by `[]*regionItem`.
for _, item := range r.getOverlapRegionFromOverlapTreeLocked(region) {
r.removeRegionFromSubTreeLocked(item.RegionInfo)
r.removeRegionFromSubTreeLocked(item)
}
} else {
// Remove all provided overlapped regions from the subtrees.
Expand Down Expand Up @@ -1164,7 +1150,7 @@ func (r *RegionsInfo) updateSubTreeLocked(rangeChanged bool, overlaps []*RegionI
setPeers(r.pendingPeers, region.GetPendingPeers())
}

func (r *RegionsInfo) getOverlapRegionFromOverlapTreeLocked(region *RegionInfo) []*regionItem {
func (r *RegionsInfo) getOverlapRegionFromOverlapTreeLocked(region *RegionInfo) []*RegionInfo {
return r.overlapTree.overlaps(&regionItem{RegionInfo: region})
}

Expand All @@ -1174,9 +1160,7 @@ func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo
defer r.t.RUnlock()
origin = r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
for _, item := range r.tree.overlaps(&regionItem{RegionInfo: region}) {
overlaps = append(overlaps, item.RegionInfo)
}
return origin, r.tree.overlaps(&regionItem{RegionInfo: region})
}
return
}
Expand Down Expand Up @@ -1211,7 +1195,7 @@ func (r *RegionsInfo) SetRegion(region *RegionInfo) (*RegionInfo, []*RegionInfo,
return r.setRegionLocked(region, false)
}

func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol ...*regionItem) (*RegionInfo, []*RegionInfo, bool) {
func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol ...*RegionInfo) (*RegionInfo, []*RegionInfo, bool) {
var (
item *regionItem // Pointer to the *RegionInfo of this ID.
origin *RegionInfo
Expand Down Expand Up @@ -1240,7 +1224,7 @@ func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol
item.RegionInfo = region
} else {
// If the range is not changed, only the statistical on the regionTree needs to be updated.
r.tree.updateStat(origin, region)
r.tree.UpdateStat(origin, region)
// Update the RegionInfo in the regionItem.
item.RegionInfo = region
return origin, nil, rangeChanged
Expand Down Expand Up @@ -1281,7 +1265,7 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi
func (r *RegionsInfo) updateSubTreeStat(origin *RegionInfo, region *RegionInfo) {
updatePeerStat := func(peersMap map[uint64]*regionTree, storeID uint64) {
if tree, ok := peersMap[storeID]; ok {
tree.updateStat(origin, region)
tree.UpdateStat(origin, region)
}
}
for _, peer := range region.GetVoters() {
Expand Down Expand Up @@ -1311,7 +1295,7 @@ func (r *RegionsInfo) TreeLen() int {
}

// GetOverlaps returns the regions which are overlapped with the specified region range.
func (r *RegionsInfo) GetOverlaps(region *RegionInfo) []*regionItem {
func (r *RegionsInfo) GetOverlaps(region *RegionInfo) []*RegionInfo {
r.t.RLock()
defer r.t.RUnlock()
return r.tree.overlaps(&regionItem{RegionInfo: region})
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ func TestSortedEqual(t *testing.T) {
Region: &metapb.Region{Id: 100, Peers: pickPeers(testCase.idsA)},
DownPeers: pickPeerStats(testCase.idsA),
PendingPeers: pickPeers(testCase.idsA),
})
}, 3)
regionB := RegionFromHeartbeat(&pdpb.RegionHeartbeatRequest{
Region: &metapb.Region{Id: 100, Peers: pickPeers(testCase.idsB)},
DownPeers: pickPeerStats(testCase.idsB),
PendingPeers: pickPeers(testCase.idsB),
})
}, 3)
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetVoters(), regionB.GetVoters()))
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetVoters(), regionB.GetVoters()))
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetPendingPeers(), regionB.GetPendingPeers()))
Expand Down Expand Up @@ -952,7 +952,7 @@ func BenchmarkRegionFromHeartbeat(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
RegionFromHeartbeat(regionReq)
RegionFromHeartbeat(regionReq, 3)
}
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (t *regionTree) notFromStorageRegionsCount() int {
}

// GetOverlaps returns the range items that has some intersections with the given items.
func (t *regionTree) overlaps(item *regionItem) []*regionItem {
func (t *regionTree) overlaps(item *regionItem) []*RegionInfo {
// note that Find() gets the last item that is less or equal than the item.
// in the case: |_______a_______|_____b_____|___c___|
// new item is |______d______|
Expand All @@ -116,12 +116,12 @@ func (t *regionTree) overlaps(item *regionItem) []*regionItem {
result = item
}
endKey := item.GetEndKey()
var overlaps []*regionItem
var overlaps []*RegionInfo
t.tree.AscendGreaterOrEqual(result, func(i *regionItem) bool {
if len(endKey) > 0 && bytes.Compare(endKey, i.GetStartKey()) <= 0 {
return false
}
overlaps = append(overlaps, i)
overlaps = append(overlaps, i.RegionInfo)
return true
})
return overlaps
Expand All @@ -130,7 +130,7 @@ func (t *regionTree) overlaps(item *regionItem) []*regionItem {
// update updates the tree with the region.
// It finds and deletes all the overlapped regions first, and then
// insert the region.
func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*regionItem) []*RegionInfo {
func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*RegionInfo) []*RegionInfo {
region := item.RegionInfo
t.totalSize += region.approximateSize
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
Expand All @@ -145,15 +145,15 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
}

for _, old := range overlaps {
t.tree.Delete(old)
t.tree.Delete(&regionItem{RegionInfo: old})
}
t.tree.ReplaceOrInsert(item)
if t.countRef {
item.RegionInfo.IncRef()
}
result := make([]*RegionInfo, len(overlaps))
for i, overlap := range overlaps {
old := overlap.RegionInfo
old := overlap
result[i] = old
log.Debug("overlapping region",
zap.Uint64("region-id", old.GetID()),
Expand All @@ -174,8 +174,8 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
return result
}

// updateStat is used to update statistics when regionItem.RegionInfo is directly replaced.
func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
// UpdateStat is used to update statistics when RegionInfo is directly replaced.
func (t *regionTree) UpdateStat(origin *RegionInfo, region *RegionInfo) {
t.totalSize += region.approximateSize
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
t.totalWriteBytesRate += regionWriteBytesRate
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/region_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ func TestRegionTree(t *testing.T) {
updateNewItem(tree, regionA)
updateNewItem(tree, regionC)
re.Nil(tree.overlaps(newRegionItem([]byte("b"), []byte("c"))))
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("c"), []byte("d")))[0].RegionInfo)
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1].RegionInfo)
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("c"), []byte("d")))[0])
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1])
re.Nil(tree.search([]byte{}))
re.Equal(regionA, tree.search([]byte("a")))
re.Nil(tree.search([]byte("b")))
Expand Down
16 changes: 14 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,23 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
utils.RegionWriteKeys: 0,
utils.RegionWriteQueryNum: 0,
}
c.hotStat.CheckReadAsync(statistics.NewCheckReadPeerTask(region, []*metapb.Peer{peer}, loads, interval))
checkReadPeerTask := func(cache *statistics.HotPeerCache) {
stats := cache.CheckPeerFlow(region, []*metapb.Peer{peer}, loads, interval)
for _, stat := range stats {
cache.UpdateStat(stat)
}
}
c.hotStat.CheckReadAsync(checkReadPeerTask)
}

// Here we will compare the reported regions with the previous hot peers to decide if it is still hot.
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
collectUnReportedPeerTask := func(cache *statistics.HotPeerCache) {
stats := cache.CheckColdPeer(storeID, regions, interval)
for _, stat := range stats {
cache.UpdateStat(stat)
}
}
c.hotStat.CheckReadAsync(collectUnReportedPeerTask)
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
s.hbStreams.BindStream(storeID, server)
lastBind = time.Now()
}
region := core.RegionFromHeartbeat(request)
// scheduling service doesn't sync the pd server config, so we use 0 here
region := core.RegionFromHeartbeat(request, 0)
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to API server.
Expand Down
Loading

0 comments on commit 89eb56d

Please sign in to comment.