diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 3a0b17100fd..647706b0afd 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -75,7 +75,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, ruleManager: ruleManager, labelerManager: labelerManager, persistConfig: persistConfig, - hotStat: statistics.NewHotStat(ctx), + hotStat: statistics.NewHotStat(ctx, basicCluster), labelStats: statistics.NewLabelStatistics(), regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), storage: storage, diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 6cf7ae143df..1ca5b5cc665 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -63,7 +63,9 @@ type Cluster struct { // NewCluster creates a new Cluster func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { + bc := core.NewBasicCluster() c := &Cluster{ +<<<<<<< HEAD ctx: ctx, BasicCluster: core.NewBasicCluster(), IDAllocator: mockid.NewIDAllocator(), @@ -72,6 +74,16 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { PersistOptions: opts, suspectRegions: map[uint64]struct{}{}, Storage: storage.NewStorageWithMemoryBackend(), +======= + ctx: ctx, + BasicCluster: bc, + IDAllocator: mockid.NewIDAllocator(), + HotStat: statistics.NewHotStat(ctx, bc), + HotBucketCache: buckets.NewBucketsCache(ctx), + PersistOptions: opts, + pendingProcessedRegions: map[uint64]struct{}{}, + Storage: storage.NewStorageWithMemoryBackend(), +>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) } if c.PersistOptions.GetReplicationConfig().EnablePlacementRules { c.initRuleManager() diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index c9b138e3b32..090f430b2ab 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -1657,7 +1657,13 @@ func TestHotCacheUpdateCache(t *testing.T) { re := require.New(t) cancel, _, tc, _ := prepareSchedulersTest() defer cancel() +<<<<<<< HEAD tc.SetHotRegionCacheHitsThreshold(0) +======= + for i := range 3 { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } +>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) // For read flow addRegionInfo(tc, utils.Read, []testRegionInfo{ @@ -1724,7 +1730,13 @@ func TestHotCacheKeyThresholds(t *testing.T) { { // only a few regions cancel, _, tc, _ := prepareSchedulersTest() defer cancel() +<<<<<<< HEAD tc.SetHotRegionCacheHitsThreshold(0) +======= + for i := range 6 { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } +>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) addRegionInfo(tc, utils.Read, []testRegionInfo{ {1, []uint64{1, 2, 3}, 0, 1, 0}, {2, []uint64{1, 2, 3}, 0, 1 * units.KiB, 0}, @@ -1743,6 +1755,9 @@ func TestHotCacheKeyThresholds(t *testing.T) { { // many regions cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := range 3 { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } regions := []testRegionInfo{} for i := 1; i <= 1000; i += 2 { regions = append(regions, @@ -1796,7 +1811,13 @@ func TestHotCacheByteAndKey(t *testing.T) { re := require.New(t) cancel, _, tc, _ := prepareSchedulersTest() defer cancel() +<<<<<<< HEAD tc.SetHotRegionCacheHitsThreshold(0) +======= + for i := range 3 { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } +>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) statistics.ThresholdsUpdateInterval = 0 defer func() { statistics.ThresholdsUpdateInterval = 8 * time.Second @@ -1923,6 +1944,9 @@ func TestHotCacheCheckRegionFlow(t *testing.T) { func checkHotCacheCheckRegionFlow(re *require.Assertions, testCase testHotCacheCheckRegionFlowCase, enablePlacementRules bool) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() + for i := range 3 { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} @@ -1998,6 +2022,9 @@ func TestHotCacheCheckRegionFlowWithDifferentThreshold(t *testing.T) { func checkHotCacheCheckRegionFlowWithDifferentThreshold(re *require.Assertions, enablePlacementRules bool) { cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := range 3 { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index 799fb240d10..68b496c26fa 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -38,11 +38,11 @@ type HotCache struct { } // NewHotCache creates a new hot spot cache. -func NewHotCache(ctx context.Context) *HotCache { +func NewHotCache(ctx context.Context, cluster *core.BasicCluster) *HotCache { w := &HotCache{ ctx: ctx, - writeCache: NewHotPeerCache(ctx, utils.Write), - readCache: NewHotPeerCache(ctx, utils.Read), + writeCache: NewHotPeerCache(ctx, cluster, utils.Write), + readCache: NewHotPeerCache(ctx, cluster, utils.Read), } go w.updateItems(w.readCache.taskQueue, w.runReadTask) go w.updateItems(w.writeCache.taskQueue, w.runWriteTask) diff --git a/pkg/statistics/hot_cache_test.go b/pkg/statistics/hot_cache_test.go new file mode 100644 index 00000000000..b0232b658d4 --- /dev/null +++ b/pkg/statistics/hot_cache_test.go @@ -0,0 +1,40 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/statistics/utils" +) + +func TestIsHot(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for i := utils.RWType(0); i < utils.RWTypeLen; i++ { + cluster := core.NewBasicCluster() + cache := NewHotCache(ctx, cluster) + region := buildRegion(cluster, i, 3, 60) + stats := cache.CheckReadPeerSync(region, region.GetPeers(), []float64{100000000, 1000, 1000}, 60) + cache.Update(stats[0], i) + for range 100 { + re.True(cache.IsRegionHot(region, 1)) + } + } +} diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 0e35e0e23be..d64c0d58319 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -60,6 +60,7 @@ type thresholds struct { // hotPeerCache saves the hot peer's statistics. type hotPeerCache struct { kind utils.RWType + cluster *core.BasicCluster peersOfStore map[uint64]*utils.TopN // storeID -> hot peers storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs @@ -67,13 +68,20 @@ type hotPeerCache struct { taskQueue *chanx.UnboundedChan[FlowItemTask] thresholdsOfStore map[uint64]*thresholds // storeID -> thresholds metrics map[uint64][utils.ActionTypeLen]prometheus.Gauge // storeID -> metrics - // TODO: consider to remove store info when store is offline. + lastGCTime time.Time } +<<<<<<< HEAD // NewHotPeerCache creates a hotPeerCache func NewHotPeerCache(ctx context.Context, kind utils.RWType) *hotPeerCache { return &hotPeerCache{ +======= +// NewHotPeerCache creates a HotPeerCache +func NewHotPeerCache(ctx context.Context, cluster *core.BasicCluster, kind utils.RWType) *HotPeerCache { + return &HotPeerCache{ +>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) kind: kind, + cluster: cluster, peersOfStore: make(map[uint64]*utils.TopN), storesOfRegion: make(map[uint64]map[uint64]struct{}), regionsOfStore: make(map[uint64]map[uint64]struct{}), @@ -114,6 +122,7 @@ func (f *hotPeerCache) updateStat(item *HotPeerStat) { return } f.incMetrics(item.actionType, item.StoreID) + f.gc() } func (f *hotPeerCache) incMetrics(action utils.ActionType, storeID uint64) { @@ -544,6 +553,36 @@ func (f *hotPeerCache) removeItem(item *HotPeerStat) { } } +func (f *HotPeerCache) gc() { + if time.Since(f.lastGCTime) < f.topNTTL { + return + } + f.lastGCTime = time.Now() + // remove tombstone stores + stores := make(map[uint64]struct{}) + for _, storeID := range f.cluster.GetStores() { + stores[storeID.GetID()] = struct{}{} + } + for storeID := range f.peersOfStore { + if _, ok := stores[storeID]; !ok { + delete(f.peersOfStore, storeID) + delete(f.regionsOfStore, storeID) + delete(f.thresholdsOfStore, storeID) + delete(f.metrics, storeID) + } + } + // remove expired items + for _, peers := range f.peersOfStore { + regions := peers.RemoveExpired() + for _, regionID := range regions { + delete(f.storesOfRegion, regionID) + for storeID := range f.regionsOfStore { + delete(f.regionsOfStore[storeID], regionID) + } + } + } +} + // removeAllItem removes all items of the cache. // It is used for test. func (f *hotPeerCache) removeAllItem() { diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index 36f922d3830..93484089dc6 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -18,6 +18,7 @@ import ( "context" "math/rand" "sort" + "sync" "testing" "time" @@ -26,11 +27,13 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/movingaverage" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/utils/typeutil" ) +<<<<<<< HEAD func TestStoreTimeUnsync(t *testing.T) { re := require.New(t) cache := NewHotPeerCache(context.Background(), utils.Write) @@ -48,6 +51,8 @@ func TestStoreTimeUnsync(t *testing.T) { } } +======= +>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) type operator int const ( @@ -79,8 +84,9 @@ func TestCache(t *testing.T) { utils.Read: 3, // all peers utils.Write: 3, // all peers } - cache := NewHotPeerCache(context.Background(), test.kind) - region := buildRegion(test.kind, 3, 60) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, test.kind) + region := buildRegion(cluster, test.kind, 3, 60) checkAndUpdate(re, cache, region, defaultSize[test.kind]) checkHit(re, cache, region, test.kind, utils.Add) // all peers are new @@ -252,12 +258,31 @@ func pickFollower(region *core.RegionInfo) (index int, peer *metapb.Peer) { return dst, meta.Peers[dst] } -func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.RegionInfo { - peers := newPeers(peerCount, - func(i int) uint64 { return uint64(10000 + i) }, - func(i int) uint64 { return uint64(i) }) +var ( + idAllocator *mockid.IDAllocator + once sync.Once +) + +func getIDAllocator() *mockid.IDAllocator { + once.Do(func() { + idAllocator = mockid.NewIDAllocator() + }) + return idAllocator +} + +func buildRegion(cluster *core.BasicCluster, kind utils.RWType, peerCount int, interval uint64) (region *core.RegionInfo) { + peers := make([]*metapb.Peer, 0, peerCount) + for range peerCount { + id, _ := getIDAllocator().Alloc() + storeID, _ := getIDAllocator().Alloc() + peers = append(peers, &metapb.Peer{ + Id: id, + StoreId: storeID, + }) + } + id, _ := getIDAllocator().Alloc() meta := &metapb.Region{ - Id: 1000, + Id: id, Peers: peers, StartKey: []byte(""), EndKey: []byte(""), @@ -268,7 +293,7 @@ func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.Region switch kind { case utils.Read: - return core.NewRegionInfo( + region = core.NewRegionInfo( meta, leader, core.SetReportInterval(0, interval), @@ -277,7 +302,7 @@ func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.Region core.SetReadQuery(1024*interval), ) case utils.Write: - return core.NewRegionInfo( + region = core.NewRegionInfo( meta, leader, core.SetReportInterval(0, interval), @@ -285,31 +310,21 @@ func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.Region core.SetWrittenKeys(10*units.MiB*interval), core.SetWrittenQuery(1024*interval), ) - default: - return nil } -} - -type genID func(i int) uint64 - -func newPeers(n int, pid genID, sid genID) []*metapb.Peer { - peers := make([]*metapb.Peer, 0, n) - for i := 1; i <= n; i++ { - peer := &metapb.Peer{ - Id: pid(i), - } - peer.StoreId = sid(i) - peers = append(peers, peer) + for _, peer := range region.GetPeers() { + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: peer.GetStoreId()}, core.SetLastHeartbeatTS(time.Now()))) } - return peers + return region } func TestUpdateHotPeerStat(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Read) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) storeID, regionID := uint64(1), uint64(2) peer := &metapb.Peer{StoreId: storeID} region := core.NewRegionInfo(&metapb.Region{Id: regionID, Peers: []*metapb.Peer{peer}}, peer) + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) // we statistic read peer info from store heartbeat rather than region heartbeat m := utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval ThresholdsUpdateInterval = 0 @@ -400,8 +415,10 @@ func TestThresholdWithUpdateHotPeerStat(t *testing.T) { } func testMetrics(re *require.Assertions, interval, byteRate, expectThreshold float64) { - cache := NewHotPeerCache(context.Background(), utils.Read) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) storeID := uint64(1) + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) re.GreaterOrEqual(byteRate, utils.MinHotThresholds[utils.RegionReadBytes]) ThresholdsUpdateInterval = 0 defer func() { @@ -447,8 +464,9 @@ func TestRemoveFromCache(t *testing.T) { interval := uint64(5) checkers := []check{checkAndUpdate, checkAndUpdateWithOrdering} for _, checker := range checkers { - cache := NewHotPeerCache(context.Background(), utils.Write) - region := buildRegion(utils.Write, peerCount, interval) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + region := buildRegion(cluster, utils.Write, peerCount, interval) // prepare intervalSums := make(map[uint64]int) for i := 1; i <= 200; i++ { @@ -482,8 +500,9 @@ func TestRemoveFromCacheRandom(t *testing.T) { for _, peerCount := range peerCounts { for _, interval := range intervals { for _, checker := range checkers { - cache := NewHotPeerCache(context.Background(), utils.Write) - region := buildRegion(utils.Write, peerCount, interval) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + region := buildRegion(cluster, utils.Write, peerCount, interval) target := uint64(10) intervalSums := make(map[uint64]int) @@ -536,8 +555,9 @@ func checkCoolDown(re *require.Assertions, cache *hotPeerCache, region *core.Reg func TestCoolDownTransferLeader(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Read) - region := buildRegion(utils.Read, 3, 60) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) + region := buildRegion(cluster, utils.Read, 3, 60) moveLeader := func() { _, region = schedule(re, movePeer, region, 10) @@ -569,8 +589,9 @@ func TestCoolDownTransferLeader(t *testing.T) { } testCases := []func(){moveLeader, transferLeader, movePeer, addReplica, removeReplica} for _, testCase := range testCases { - cache = NewHotPeerCache(context.Background(), utils.Read) - region = buildRegion(utils.Read, 3, 60) + cluster = core.NewBasicCluster() + cache = NewHotPeerCache(context.Background(), cluster, utils.Read) + region = buildRegion(cluster, utils.Read, 3, 60) for i := 1; i <= 200; i++ { checkAndUpdate(re, cache, region) } @@ -582,8 +603,9 @@ func TestCoolDownTransferLeader(t *testing.T) { // See issue #4510 func TestCacheInherit(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Read) - region := buildRegion(utils.Read, 3, 10) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) + region := buildRegion(cluster, utils.Read, 3, 10) // prepare for i := 1; i <= 200; i++ { checkAndUpdate(re, cache, region) @@ -673,13 +695,20 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { re := require.New(t) testWithUpdateInterval := func(interval time.Duration) { ThresholdsUpdateInterval = interval - cache := NewHotPeerCache(context.Background(), utils.Write) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) now := time.Now() +<<<<<<< HEAD for id := uint64(0); id < 100; id++ { +======= + storeID := uint64(1) + for id := range uint64(100) { +>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) meta := &metapb.Region{ Id: id, - Peers: []*metapb.Peer{{Id: id, StoreId: 1}}, + Peers: []*metapb.Peer{{Id: id, StoreId: storeID}}, } + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) region := core.NewRegionInfo(meta, meta.Peers[0], core.SetWrittenBytes(id*6000), core.SetWrittenKeys(id*6000), core.SetWrittenQuery(id*6000)) for i := 0; i < 10; i++ { start := uint64(now.Add(time.Minute * time.Duration(i)).Unix()) @@ -714,7 +743,51 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { testWithUpdateInterval(0) } +func TestRemoveExpireItems(t *testing.T) { + re := require.New(t) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + cache.topNTTL = 100 * time.Millisecond + // case1: remove expired items + region1 := buildRegion(cluster, utils.Write, 3, 10) + checkAndUpdate(re, cache, region1) + re.NotEmpty(cache.storesOfRegion[region1.GetID()]) + time.Sleep(cache.topNTTL) + region2 := buildRegion(cluster, utils.Write, 3, 10) + checkAndUpdate(re, cache, region2) + re.Empty(cache.storesOfRegion[region1.GetID()]) + re.NotEmpty(cache.storesOfRegion[region2.GetID()]) + time.Sleep(cache.topNTTL) + // case2: remove items when the store is not exist + re.NotNil(cache.peersOfStore[region1.GetLeader().GetStoreId()]) + re.NotNil(cache.peersOfStore[region2.GetLeader().GetStoreId()]) + cluster.ResetStores() + re.Empty(cluster.GetStores()) + region3 := buildRegion(cluster, utils.Write, 3, 10) + checkAndUpdate(re, cache, region3) + re.Nil(cache.peersOfStore[region1.GetLeader().GetStoreId()]) + re.Nil(cache.peersOfStore[region2.GetLeader().GetStoreId()]) + re.NotEmpty(cache.regionsOfStore[region3.GetLeader().GetStoreId()]) +} + +func TestDifferentReportInterval(t *testing.T) { + re := require.New(t) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + region := buildRegion(cluster, utils.Write, 3, 5) + for _, interval := range []uint64{120, 60, 30} { + region = region.Clone(core.SetReportInterval(0, interval)) + checkAndUpdate(re, cache, region, 3) + stats := cache.GetHotPeerStats(0) + re.Len(stats, 3) + for _, s := range stats { + re.Len(s, 1) + } + } +} + func BenchmarkCheckRegionFlow(b *testing.B) { +<<<<<<< HEAD cache := NewHotPeerCache(context.Background(), utils.Read) region := buildRegion(utils.Read, 3, 10) peerInfos := make([]*core.PeerInfo, 0) @@ -722,6 +795,11 @@ func BenchmarkCheckRegionFlow(b *testing.B) { peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10) peerInfos = append(peerInfos, peerInfo) } +======= + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) + region := buildRegion(cluster, utils.Read, 3, 10) +>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) b.ResetTimer() for i := 0; i < b.N; i++ { items := make([]*HotPeerStat, 0) diff --git a/pkg/statistics/hot_stat.go b/pkg/statistics/hot_stat.go index 3cda52aba89..c9276f11b20 100644 --- a/pkg/statistics/hot_stat.go +++ b/pkg/statistics/hot_stat.go @@ -17,6 +17,7 @@ package statistics import ( "context" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/statistics/buckets" ) @@ -28,9 +29,9 @@ type HotStat struct { } // NewHotStat creates the container to hold cluster's hotspot statistics. -func NewHotStat(ctx context.Context) *HotStat { +func NewHotStat(ctx context.Context, cluster *core.BasicCluster) *HotStat { return &HotStat{ - HotCache: NewHotCache(ctx), + HotCache: NewHotCache(ctx, cluster), StoresStats: NewStoresStats(), HotBucketCache: buckets.NewBucketsCache(ctx), } diff --git a/pkg/statistics/utils/topn.go b/pkg/statistics/utils/topn.go index 7ab6c6eaf3e..bb3029f1ace 100644 --- a/pkg/statistics/utils/topn.go +++ b/pkg/statistics/utils/topn.go @@ -96,16 +96,20 @@ func (tn *TopN) Put(item TopNItem) (isUpdate bool) { for _, stn := range tn.topns { isUpdate = stn.Put(item) } +<<<<<<< HEAD tn.ttlLst.Put(item.ID()) tn.maintain() +======= + tn.ttlLst.put(item.ID()) +>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) return } // RemoveExpired deletes all expired items. -func (tn *TopN) RemoveExpired() { +func (tn *TopN) RemoveExpired() []uint64 { tn.rw.Lock() defer tn.rw.Unlock() - tn.maintain() + return tn.maintain() } // Remove deletes the item by given ID and returns it. @@ -115,6 +119,7 @@ func (tn *TopN) Remove(id uint64) (item TopNItem) { for _, stn := range tn.topns { item = stn.Remove(id) } +<<<<<<< HEAD _ = tn.ttlLst.Remove(id) tn.maintain() return @@ -124,8 +129,21 @@ func (tn *TopN) maintain() { for _, id := range tn.ttlLst.TakeExpired() { for _, stn := range tn.topns { stn.Remove(id) +======= + _ = tn.ttlLst.remove(id) + return +} + +func (tn *TopN) maintain() []uint64 { + ids := make([]uint64, 0) + for _, id := range tn.ttlLst.takeExpired() { + for _, stn := range tn.topns { + stn.remove(id) + ids = append(ids, id) +>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) } } + return ids } type singleTopN struct { diff --git a/pkg/statistics/utils/topn_test.go b/pkg/statistics/utils/topn_test.go index f92d5a61f34..4f2bc5e4f3b 100644 --- a/pkg/statistics/utils/topn_test.go +++ b/pkg/statistics/utils/topn_test.go @@ -208,6 +208,7 @@ func TestTTL(t *testing.T) { putPerm(re, tn, Total, func(x int) float64 { return float64(-x) }, false /*insert*/) + re.Len(tn.GetAll(), Total) time.Sleep(900 * time.Millisecond) { @@ -217,6 +218,8 @@ func TestTTL(t *testing.T) { } re.True(tn.Put(item)) } + re.Len(tn.RemoveExpired(), (Total-1)*DimLen) + for i := 3; i < Total; i += 3 { item := &item{id: uint64(i), values: []float64{float64(-i) + 100}} for k := 1; k < DimLen; k++ { @@ -224,7 +227,6 @@ func TestTTL(t *testing.T) { } re.False(tn.Put(item)) } - tn.RemoveExpired() re.Equal(Total/3+1, tn.Len()) items := tn.GetAllTopN(0) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index dc0f7966761..fb381a4f63f 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -602,7 +602,10 @@ func TestRegionHeartbeatHotStat(t *testing.T) { re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) - newTestStores(4, "2.0.0") + stores := newTestStores(4, "2.0.0") + for _, store := range stores { + cluster.PutStore(store) + } peers := []*metapb.Peer{ { Id: 1, diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 322ccc94d0e..6f73fbf2f6b 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -67,8 +67,13 @@ func newSchedulingController(parentCtx context.Context, basicCluster *core.Basic BasicCluster: basicCluster, opt: opt, labelStats: statistics.NewLabelStatistics(), +<<<<<<< HEAD hotStat: statistics.NewHotStat(parentCtx), slowStat: statistics.NewSlowStat(parentCtx), +======= + hotStat: statistics.NewHotStat(parentCtx, basicCluster), + slowStat: statistics.NewSlowStat(), +>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) regionStats: statistics.NewRegionStatistics(basicCluster, opt, ruleManager), } }