From d8b8091ab3eaffa5b0074f5c6f1afa4f3786b792 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 14 Oct 2024 22:54:51 +0800 Subject: [PATCH 1/5] statistics: add gc in hot peer cache Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/cluster.go | 2 +- pkg/mock/mockcluster/mockcluster.go | 5 +- pkg/statistics/hot_cache.go | 8 +- pkg/statistics/hot_cache_test.go | 6 +- pkg/statistics/hot_peer_cache.go | 42 +++++- pkg/statistics/hot_peer_cache_test.go | 163 +++++++++++++++--------- pkg/statistics/hot_stat.go | 5 +- pkg/statistics/utils/topn.go | 11 +- server/cluster/scheduling_controller.go | 2 +- 9 files changed, 165 insertions(+), 79 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 0dcb26a1a1f..4e5777665ac 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -93,7 +93,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, ruleManager: ruleManager, labelerManager: labelerManager, persistConfig: persistConfig, - hotStat: statistics.NewHotStat(ctx), + hotStat: statistics.NewHotStat(ctx, basicCluster), labelStats: statistics.NewLabelStatistics(), regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), storage: storage, diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index bbd4fbb6811..ebbec692191 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -63,11 +63,12 @@ type Cluster struct { // NewCluster creates a new Cluster func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { + bc := core.NewBasicCluster() c := &Cluster{ ctx: ctx, - BasicCluster: core.NewBasicCluster(), + BasicCluster: bc, IDAllocator: mockid.NewIDAllocator(), - HotStat: statistics.NewHotStat(ctx), + HotStat: statistics.NewHotStat(ctx, bc), HotBucketCache: buckets.NewBucketsCache(ctx), PersistOptions: opts, pendingProcessedRegions: map[uint64]struct{}{}, diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index 86f7d7d6b08..046313b4d1d 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -39,11 +39,11 @@ type HotCache struct { } // NewHotCache creates a new hot spot cache. -func NewHotCache(ctx context.Context) *HotCache { +func NewHotCache(ctx context.Context, cluster *core.BasicCluster) *HotCache { w := &HotCache{ ctx: ctx, - writeCache: NewHotPeerCache(ctx, utils.Write), - readCache: NewHotPeerCache(ctx, utils.Read), + writeCache: NewHotPeerCache(ctx, cluster, utils.Write), + readCache: NewHotPeerCache(ctx, cluster, utils.Read), } go w.updateItems(w.readCache.taskQueue, w.runReadTask) go w.updateItems(w.writeCache.taskQueue, w.runWriteTask) @@ -80,7 +80,7 @@ func (w *HotCache) CheckReadAsync(task func(cache *HotPeerCache)) bool { func (w *HotCache) RegionStats(kind utils.RWType, minHotDegree int) map[uint64][]*HotPeerStat { ret := make(chan map[uint64][]*HotPeerStat, 1) collectRegionStatsTask := func(cache *HotPeerCache) { - ret <- cache.RegionStats(minHotDegree) + ret <- cache.PeerStats(minHotDegree) } var succ bool switch kind { diff --git a/pkg/statistics/hot_cache_test.go b/pkg/statistics/hot_cache_test.go index fbd28c94683..1e7ac0ecc68 100644 --- a/pkg/statistics/hot_cache_test.go +++ b/pkg/statistics/hot_cache_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/statistics/utils" ) @@ -27,8 +28,9 @@ func TestIsHot(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() for i := utils.RWType(0); i < utils.RWTypeLen; i++ { - cache := NewHotCache(ctx) - region := buildRegion(i, 3, 60) + cluster := core.NewBasicCluster() + cache := NewHotCache(ctx, cluster) + region := buildRegion(cluster, i, 3, 60) stats := cache.CheckReadPeerSync(region, region.GetPeers(), []float64{100000000, 1000, 1000}, 60) cache.Update(stats[0], i) for i := 0; i < 100; i++ { diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 8d1f64ca540..90b106394c1 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -60,6 +60,7 @@ type thresholds struct { // HotPeerCache saves the hot peer's statistics. type HotPeerCache struct { kind utils.RWType + cluster *core.BasicCluster peersOfStore map[uint64]*utils.TopN // storeID -> hot peers storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs @@ -67,13 +68,14 @@ type HotPeerCache struct { taskQueue *chanx.UnboundedChan[func(*HotPeerCache)] thresholdsOfStore map[uint64]*thresholds // storeID -> thresholds metrics map[uint64][utils.ActionTypeLen]prometheus.Gauge // storeID -> metrics - // TODO: consider to remove store info when store is offline. + lastGCTime time.Time } // NewHotPeerCache creates a HotPeerCache -func NewHotPeerCache(ctx context.Context, kind utils.RWType) *HotPeerCache { +func NewHotPeerCache(ctx context.Context, cluster *core.BasicCluster, kind utils.RWType) *HotPeerCache { return &HotPeerCache{ kind: kind, + cluster: cluster, peersOfStore: make(map[uint64]*utils.TopN), storesOfRegion: make(map[uint64]map[uint64]struct{}), regionsOfStore: make(map[uint64]map[uint64]struct{}), @@ -84,9 +86,8 @@ func NewHotPeerCache(ctx context.Context, kind utils.RWType) *HotPeerCache { } } -// TODO: rename RegionStats as PeerStats -// RegionStats returns hot items -func (f *HotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { +// PeerStats returns hot items +func (f *HotPeerCache) PeerStats(minHotDegree int) map[uint64][]*HotPeerStat { res := make(map[uint64][]*HotPeerStat) defaultAntiCount := f.kind.DefaultAntiCount() for storeID, peers := range f.peersOfStore { @@ -115,6 +116,7 @@ func (f *HotPeerCache) UpdateStat(item *HotPeerStat) { return } f.incMetrics(item.actionType, item.StoreID) + f.removeExpiredItems() } func (f *HotPeerCache) incMetrics(action utils.ActionType, storeID uint64) { @@ -548,6 +550,36 @@ func (f *HotPeerCache) removeItem(item *HotPeerStat) { } } +func (f *HotPeerCache) removeExpiredItems() { + if time.Since(f.lastGCTime) < f.topNTTL { + return + } + f.lastGCTime = time.Now() + // remove tombstone stores + stores := make(map[uint64]struct{}) + for _, storeID := range f.cluster.GetStores() { + stores[storeID.GetID()] = struct{}{} + } + for storeID := range f.peersOfStore { + if _, ok := stores[storeID]; !ok { + delete(f.peersOfStore, storeID) + delete(f.regionsOfStore, storeID) + delete(f.thresholdsOfStore, storeID) + delete(f.metrics, storeID) + } + } + // remove expired items + for _, peers := range f.peersOfStore { + regions := peers.RemoveExpired() + for _, regionID := range regions { + delete(f.storesOfRegion, regionID) + for storeID := range f.regionsOfStore { + delete(f.regionsOfStore[storeID], regionID) + } + } + } +} + // removeAllItem removes all items of the cache. // It is used for test. func (f *HotPeerCache) removeAllItem() { diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index ce4e352bc3d..9de0ec1160f 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -18,6 +18,7 @@ import ( "context" "math/rand" "sort" + "sync" "testing" "time" @@ -26,28 +27,12 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/movingaverage" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/utils/typeutil" ) -func TestStoreTimeUnsync(t *testing.T) { - re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Write) - intervals := []uint64{120, 60} - for _, interval := range intervals { - region := buildRegion(utils.Write, 3, interval) - checkAndUpdate(re, cache, region, 3) - { - stats := cache.RegionStats(0) - re.Len(stats, 3) - for _, s := range stats { - re.Len(s, 1) - } - } - } -} - type operator int const ( @@ -79,8 +64,9 @@ func TestCache(t *testing.T) { utils.Read: 3, // all peers utils.Write: 3, // all peers } - cache := NewHotPeerCache(context.Background(), test.kind) - region := buildRegion(test.kind, 3, 60) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, test.kind) + region := buildRegion(cluster, test.kind, 3, 60) checkAndUpdate(re, cache, region, defaultSize[test.kind]) checkHit(re, cache, region, test.kind, utils.Add) // all peers are new @@ -245,12 +231,31 @@ func pickFollower(region *core.RegionInfo) (index int, peer *metapb.Peer) { return dst, meta.Peers[dst] } -func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.RegionInfo { - peers := newPeers(peerCount, - func(i int) uint64 { return uint64(10000 + i) }, - func(i int) uint64 { return uint64(i) }) +var ( + idAllocator *mockid.IDAllocator + once sync.Once +) + +func getIDAllocator() *mockid.IDAllocator { + once.Do(func() { + idAllocator = mockid.NewIDAllocator() + }) + return idAllocator +} + +func buildRegion(cluster *core.BasicCluster, kind utils.RWType, peerCount int, interval uint64) (region *core.RegionInfo) { + peers := make([]*metapb.Peer, 0, peerCount) + for i := 0; i < peerCount; i++ { + id, _ := getIDAllocator().Alloc() + storeID, _ := getIDAllocator().Alloc() + peers = append(peers, &metapb.Peer{ + Id: id, + StoreId: storeID, + }) + } + id, _ := getIDAllocator().Alloc() meta := &metapb.Region{ - Id: 1000, + Id: id, Peers: peers, StartKey: []byte(""), EndKey: []byte(""), @@ -261,7 +266,7 @@ func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.Region switch kind { case utils.Read: - return core.NewRegionInfo( + region = core.NewRegionInfo( meta, leader, core.SetReportInterval(0, interval), @@ -270,7 +275,7 @@ func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.Region core.SetReadQuery(1024*interval), ) case utils.Write: - return core.NewRegionInfo( + region = core.NewRegionInfo( meta, leader, core.SetReportInterval(0, interval), @@ -278,31 +283,21 @@ func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.Region core.SetWrittenKeys(10*units.MiB*interval), core.SetWrittenQuery(1024*interval), ) - default: - return nil } -} - -type genID func(i int) uint64 - -func newPeers(n int, pid genID, sid genID) []*metapb.Peer { - peers := make([]*metapb.Peer, 0, n) - for i := 1; i <= n; i++ { - peer := &metapb.Peer{ - Id: pid(i), - } - peer.StoreId = sid(i) - peers = append(peers, peer) + for _, peer := range region.GetPeers() { + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: peer.GetStoreId()}, core.SetLastHeartbeatTS(time.Now()))) } - return peers + return region } func TestUpdateHotPeerStat(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Read) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) storeID, regionID := uint64(1), uint64(2) peer := &metapb.Peer{StoreId: storeID} region := core.NewRegionInfo(&metapb.Region{Id: regionID, Peers: []*metapb.Peer{peer}}, peer) + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) // we statistic read peer info from store heartbeat rather than region heartbeat m := utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval ThresholdsUpdateInterval = 0 @@ -392,8 +387,10 @@ func TestThresholdWithUpdateHotPeerStat(t *testing.T) { } func testMetrics(re *require.Assertions, interval, byteRate, expectThreshold float64) { - cache := NewHotPeerCache(context.Background(), utils.Read) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) storeID := uint64(1) + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) re.GreaterOrEqual(byteRate, utils.MinHotThresholds[utils.RegionReadBytes]) ThresholdsUpdateInterval = 0 defer func() { @@ -439,8 +436,9 @@ func TestRemoveFromCache(t *testing.T) { interval := uint64(5) checkers := []check{checkAndUpdate, checkAndUpdateWithOrdering} for _, checker := range checkers { - cache := NewHotPeerCache(context.Background(), utils.Write) - region := buildRegion(utils.Write, peerCount, interval) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + region := buildRegion(cluster, utils.Write, peerCount, interval) // prepare intervalSums := make(map[uint64]int) for i := 1; i <= 200; i++ { @@ -474,8 +472,9 @@ func TestRemoveFromCacheRandom(t *testing.T) { for _, peerCount := range peerCounts { for _, interval := range intervals { for _, checker := range checkers { - cache := NewHotPeerCache(context.Background(), utils.Write) - region := buildRegion(utils.Write, peerCount, interval) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + region := buildRegion(cluster, utils.Write, peerCount, interval) target := uint64(10) intervalSums := make(map[uint64]int) @@ -528,8 +527,9 @@ func checkCoolDown(re *require.Assertions, cache *HotPeerCache, region *core.Reg func TestCoolDownTransferLeader(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Read) - region := buildRegion(utils.Read, 3, 60) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) + region := buildRegion(cluster, utils.Read, 3, 60) moveLeader := func() { _, region = schedule(re, movePeer, region, 10) @@ -561,8 +561,9 @@ func TestCoolDownTransferLeader(t *testing.T) { } testCases := []func(){moveLeader, transferLeader, movePeer, addReplica, removeReplica} for _, testCase := range testCases { - cache = NewHotPeerCache(context.Background(), utils.Read) - region = buildRegion(utils.Read, 3, 60) + cluster = core.NewBasicCluster() + cache = NewHotPeerCache(context.Background(), cluster, utils.Read) + region = buildRegion(cluster, utils.Read, 3, 60) for i := 1; i <= 200; i++ { checkAndUpdate(re, cache, region) } @@ -574,8 +575,9 @@ func TestCoolDownTransferLeader(t *testing.T) { // See issue #4510 func TestCacheInherit(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Read) - region := buildRegion(utils.Read, 3, 10) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) + region := buildRegion(cluster, utils.Read, 3, 10) // prepare for i := 1; i <= 200; i++ { checkAndUpdate(re, cache, region) @@ -665,13 +667,16 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { re := require.New(t) testWithUpdateInterval := func(interval time.Duration) { ThresholdsUpdateInterval = interval - cache := NewHotPeerCache(context.Background(), utils.Write) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) now := time.Now() + storeID := uint64(1) for id := uint64(0); id < 100; id++ { meta := &metapb.Region{ Id: id, - Peers: []*metapb.Peer{{Id: id, StoreId: 1}}, + Peers: []*metapb.Peer{{Id: id, StoreId: storeID}}, } + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) region := core.NewRegionInfo(meta, meta.Peers[0], core.SetWrittenBytes(id*6000), core.SetWrittenKeys(id*6000), core.SetWrittenQuery(id*6000)) for i := 0; i < 10; i++ { start := uint64(now.Add(time.Minute * time.Duration(i)).Unix()) @@ -705,9 +710,53 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { testWithUpdateInterval(0) } +func TestRemoveExpireItems(t *testing.T) { + re := require.New(t) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + cache.topNTTL = 100 * time.Millisecond + // case1: remove expired items + region1 := buildRegion(cluster, utils.Write, 3, 10) + checkAndUpdate(re, cache, region1) + re.NotEmpty(cache.storesOfRegion[region1.GetID()]) + time.Sleep(cache.topNTTL) + region2 := buildRegion(cluster, utils.Write, 3, 10) + checkAndUpdate(re, cache, region2) + re.Empty(cache.storesOfRegion[region1.GetID()]) + re.NotEmpty(cache.storesOfRegion[region2.GetID()]) + time.Sleep(cache.topNTTL) + // case2: remove items when the store is not exist + re.NotNil(cache.peersOfStore[region1.GetLeader().GetStoreId()]) + re.NotNil(cache.peersOfStore[region2.GetLeader().GetStoreId()]) + cluster.ResetStores() + re.Empty(cluster.GetStores()) + region3 := buildRegion(cluster, utils.Write, 3, 10) + checkAndUpdate(re, cache, region3) + re.Nil(cache.peersOfStore[region1.GetLeader().GetStoreId()]) + re.Nil(cache.peersOfStore[region2.GetLeader().GetStoreId()]) + re.NotEmpty(cache.regionsOfStore[region3.GetLeader().GetStoreId()]) +} + +func TestDifferentReportInterval(t *testing.T) { + re := require.New(t) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + region := buildRegion(cluster, utils.Write, 3, 5) + for _, interval := range []uint64{120, 60, 30} { + region = region.Clone(core.SetReportInterval(0, interval)) + checkAndUpdate(re, cache, region, 3) + stats := cache.PeerStats(0) + re.Len(stats, 3) + for _, s := range stats { + re.Len(s, 1) + } + } +} + func BenchmarkCheckRegionFlow(b *testing.B) { - cache := NewHotPeerCache(context.Background(), utils.Read) - region := buildRegion(utils.Read, 3, 10) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) + region := buildRegion(cluster, utils.Read, 3, 10) b.ResetTimer() for i := 0; i < b.N; i++ { stats := cache.CheckPeerFlow(region, region.GetPeers(), region.GetLoads(), 10) diff --git a/pkg/statistics/hot_stat.go b/pkg/statistics/hot_stat.go index 3cda52aba89..c9276f11b20 100644 --- a/pkg/statistics/hot_stat.go +++ b/pkg/statistics/hot_stat.go @@ -17,6 +17,7 @@ package statistics import ( "context" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/statistics/buckets" ) @@ -28,9 +29,9 @@ type HotStat struct { } // NewHotStat creates the container to hold cluster's hotspot statistics. -func NewHotStat(ctx context.Context) *HotStat { +func NewHotStat(ctx context.Context, cluster *core.BasicCluster) *HotStat { return &HotStat{ - HotCache: NewHotCache(ctx), + HotCache: NewHotCache(ctx, cluster), StoresStats: NewStoresStats(), HotBucketCache: buckets.NewBucketsCache(ctx), } diff --git a/pkg/statistics/utils/topn.go b/pkg/statistics/utils/topn.go index cb97251edd9..4731f50d729 100644 --- a/pkg/statistics/utils/topn.go +++ b/pkg/statistics/utils/topn.go @@ -97,15 +97,14 @@ func (tn *TopN) Put(item TopNItem) (isUpdate bool) { isUpdate = stn.put(item) } tn.ttlLst.put(item.ID()) - tn.maintain() return } // RemoveExpired deletes all expired items. -func (tn *TopN) RemoveExpired() { +func (tn *TopN) RemoveExpired() []uint64 { tn.rw.Lock() defer tn.rw.Unlock() - tn.maintain() + return tn.maintain() } // Remove deletes the item by given ID and returns it. @@ -116,16 +115,18 @@ func (tn *TopN) Remove(id uint64) (item TopNItem) { item = stn.remove(id) } _ = tn.ttlLst.remove(id) - tn.maintain() return } -func (tn *TopN) maintain() { +func (tn *TopN) maintain() []uint64 { + ids := make([]uint64, 0) for _, id := range tn.ttlLst.takeExpired() { for _, stn := range tn.topns { stn.remove(id) + ids = append(ids, id) } } + return ids } type singleTopN struct { diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 5d617700804..794304999ab 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -68,7 +68,7 @@ func newSchedulingController(parentCtx context.Context, basicCluster *core.Basic BasicCluster: basicCluster, opt: opt, labelStats: statistics.NewLabelStatistics(), - hotStat: statistics.NewHotStat(parentCtx), + hotStat: statistics.NewHotStat(parentCtx, basicCluster), slowStat: statistics.NewSlowStat(), regionStats: statistics.NewRegionStatistics(basicCluster, opt, ruleManager), } From 9266c2467d68d7e1e65d57d026ce58311ba2f726 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 14 Oct 2024 23:28:02 +0800 Subject: [PATCH 2/5] fix test Signed-off-by: lhy1024 --- pkg/schedule/schedulers/hot_region_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index a4b3225312d..0f2be699154 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -1614,6 +1614,9 @@ func TestHotCacheUpdateCache(t *testing.T) { re := require.New(t) cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } // For read flow addRegionInfo(tc, utils.Read, []testRegionInfo{ @@ -1680,6 +1683,9 @@ func TestHotCacheKeyThresholds(t *testing.T) { { // only a few regions cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := 0; i < 6; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } addRegionInfo(tc, utils.Read, []testRegionInfo{ {1, []uint64{1, 2, 3}, 0, 1, 0}, {2, []uint64{1, 2, 3}, 0, 1 * units.KiB, 0}, @@ -1698,6 +1704,9 @@ func TestHotCacheKeyThresholds(t *testing.T) { { // many regions cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } regions := []testRegionInfo{} for i := 1; i <= 1000; i += 2 { regions = append(regions, @@ -1751,6 +1760,9 @@ func TestHotCacheByteAndKey(t *testing.T) { re := require.New(t) cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } statistics.ThresholdsUpdateInterval = 0 defer func() { statistics.ThresholdsUpdateInterval = 8 * time.Second @@ -1877,6 +1889,9 @@ func TestHotCacheCheckRegionFlow(t *testing.T) { func checkHotCacheCheckRegionFlow(re *require.Assertions, testCase testHotCacheCheckRegionFlowCase, enablePlacementRules bool) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} @@ -1953,6 +1968,9 @@ func TestHotCacheCheckRegionFlowWithDifferentThreshold(t *testing.T) { func checkHotCacheCheckRegionFlowWithDifferentThreshold(re *require.Assertions, enablePlacementRules bool) { cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} From fe4ebc41cdbc11071ce17a2c925aeb0efe240d83 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 14 Oct 2024 23:37:39 +0800 Subject: [PATCH 3/5] fix test Signed-off-by: lhy1024 --- pkg/statistics/utils/topn_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/statistics/utils/topn_test.go b/pkg/statistics/utils/topn_test.go index f92d5a61f34..514e2bfd8e8 100644 --- a/pkg/statistics/utils/topn_test.go +++ b/pkg/statistics/utils/topn_test.go @@ -217,6 +217,7 @@ func TestTTL(t *testing.T) { } re.True(tn.Put(item)) } + tn.RemoveExpired() for i := 3; i < Total; i += 3 { item := &item{id: uint64(i), values: []float64{float64(-i) + 100}} for k := 1; k < DimLen; k++ { From f2d03a9d2f0441a4e9e67723da30384f085c72ec Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 15 Oct 2024 12:34:54 +0800 Subject: [PATCH 4/5] fix test Signed-off-by: lhy1024 --- server/cluster/cluster_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index fa1b14d107e..94456f236f6 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -609,7 +609,10 @@ func TestRegionHeartbeatHotStat(t *testing.T) { re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) - newTestStores(4, "2.0.0") + stores := newTestStores(4, "2.0.0") + for _, store := range stores { + cluster.PutStore(store) + } peers := []*metapb.Peer{ { Id: 1, From 9481fb7558f2c72f2c3cfa9cb2714783cea019b3 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 25 Oct 2024 17:41:35 +0800 Subject: [PATCH 5/5] address comments Signed-off-by: lhy1024 --- pkg/statistics/hot_peer_cache.go | 4 ++-- pkg/statistics/utils/topn_test.go | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 19fc586e6e6..b62248062bd 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -117,7 +117,7 @@ func (f *HotPeerCache) UpdateStat(item *HotPeerStat) { return } f.incMetrics(item.actionType, item.StoreID) - f.removeExpiredItems() + f.gc() } func (f *HotPeerCache) incMetrics(action utils.ActionType, storeID uint64) { @@ -551,7 +551,7 @@ func (f *HotPeerCache) removeItem(item *HotPeerStat) { } } -func (f *HotPeerCache) removeExpiredItems() { +func (f *HotPeerCache) gc() { if time.Since(f.lastGCTime) < f.topNTTL { return } diff --git a/pkg/statistics/utils/topn_test.go b/pkg/statistics/utils/topn_test.go index e3930b7f9a2..8b291360d53 100644 --- a/pkg/statistics/utils/topn_test.go +++ b/pkg/statistics/utils/topn_test.go @@ -208,6 +208,7 @@ func TestTTL(t *testing.T) { putPerm(re, tn, Total, func(x int) float64 { return float64(-x) }, false /*insert*/) + re.Len(tn.GetAll(), Total) time.Sleep(900 * time.Millisecond) { @@ -217,7 +218,8 @@ func TestTTL(t *testing.T) { } re.True(tn.Put(item)) } - tn.RemoveExpired() + re.Len(tn.RemoveExpired(), (Total-1)*DimLen) + for i := 3; i < Total; i += 3 { item := &item{id: uint64(i), values: []float64{float64(-i) + 100}} for k := 1; k < DimLen; k++ { @@ -225,7 +227,6 @@ func TestTTL(t *testing.T) { } re.False(tn.Put(item)) } - tn.RemoveExpired() re.Equal(Total/3+1, tn.Len()) items := tn.GetAllTopN(0)