diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 59af6d879f..4ca2ec6787 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -97,8 +97,8 @@ type ExpandedPostingsCache interface { type BlocksPostingsForMatchersCache struct { strippedLock []sync.RWMutex - headCache *fifoCache[[]storage.SeriesRef] - blocksCache *fifoCache[[]storage.SeriesRef] + headCache *lruCache[[]storage.SeriesRef] + blocksCache *lruCache[[]storage.SeriesRef] headSeedByMetricName []int postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) @@ -117,8 +117,8 @@ func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *Exp } return &BlocksPostingsForMatchersCache{ - headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow), - blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow), + headCache: newLruCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow), + blocksCache: newLruCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow), headSeedByMetricName: make([]int, seedArraySize), strippedLock: make([]sync.RWMutex, numOfSeedsStripes), postingsForMatchersFunc: cfg.PostingsForMatchers, @@ -272,7 +272,7 @@ func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) { return "", false } -type fifoCache[V any] struct { +type lruCache[V any] struct { cfg PostingsCacheConfig cachedValues *sync.Map timeNow func() time.Time @@ -285,8 +285,8 @@ type fifoCache[V any] struct { cachedBytes int64 } -func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] { - return &fifoCache[V]{ +func newLruCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *lruCache[V] { + return &lruCache[V]{ cachedValues: new(sync.Map), cached: list.New(), cfg: cfg, @@ -296,7 +296,7 @@ func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *Expanded } } -func (c *fifoCache[V]) expire() { +func (c *lruCache[V]) expire() { if c.cfg.Ttl <= 0 { return } @@ -314,7 +314,7 @@ func (c *fifoCache[V]) expire() { } } -func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) { +func (c *lruCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) { r := &cacheEntryPromise[V]{ done: make(chan struct{}), } @@ -331,13 +331,14 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error) r.v, r.sizeBytes, r.err = fetch() r.sizeBytes += int64(len(k)) r.ts = c.timeNow() - c.created(k, r.sizeBytes) + r.lElement = c.created(k, r.sizeBytes) c.expire() } if ok { // If the promise is already in the cache, lets wait it to fetch the data. <-loaded.(*cacheEntryPromise[V]).done + c.moveBack(loaded.(*cacheEntryPromise[V]).lElement) // If is cached but is expired, lets try to replace the cache value. if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) { @@ -354,12 +355,12 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error) return loaded.(*cacheEntryPromise[V]), ok } -func (c *fifoCache[V]) contains(k string) bool { +func (c *lruCache[V]) contains(k string) bool { _, ok := c.cachedValues.Load(k) return ok } -func (c *fifoCache[V]) shouldEvictHead() (string, bool) { +func (c *lruCache[V]) shouldEvictHead() (string, bool) { h := c.cached.Front() if h == nil { return "", false @@ -380,7 +381,7 @@ func (c *fifoCache[V]) shouldEvictHead() (string, bool) { return "", false } -func (c *fifoCache[V]) evictHead() { +func (c *lruCache[V]) evictHead() { front := c.cached.Front() c.cached.Remove(front) oldestKey := front.Value.(string) @@ -389,18 +390,24 @@ func (c *fifoCache[V]) evictHead() { } } -func (c *fifoCache[V]) created(key string, sizeBytes int64) { +func (c *lruCache[V]) created(key string, sizeBytes int64) *list.Element { if c.cfg.Ttl <= 0 { c.cachedValues.Delete(key) - return + return nil } c.cachedMtx.Lock() defer c.cachedMtx.Unlock() - c.cached.PushBack(key) c.cachedBytes += sizeBytes + return c.cached.PushBack(key) +} + +func (c *lruCache[V]) moveBack(ele *list.Element) { + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + c.cached.MoveToBack(ele) } -func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) { +func (c *lruCache[V]) updateSize(oldSize, newSizeBytes int64) { if oldSize == newSizeBytes { return } @@ -417,6 +424,8 @@ type cacheEntryPromise[V any] struct { done chan struct{} v V err error + + lElement *list.Element } func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool { diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index db821736a3..0480b1e3c2 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -3,6 +3,7 @@ package tsdb import ( "bytes" "fmt" + "math/rand" "strings" "sync" "testing" @@ -21,7 +22,7 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) { MaxBytes: 10 << 20, } m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) - cache := newFifoCache[int](cfg, "test", m, time.Now) + cache := newLruCache[int](cfg, "test", m, time.Now) calls := atomic.Int64{} concurrency := 100 wg := sync.WaitGroup{} @@ -45,12 +46,12 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) { } -func TestFifoCacheDisabled(t *testing.T) { +func TestCacheDisabled(t *testing.T) { cfg := PostingsCacheConfig{} cfg.Enabled = false m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) timeNow := time.Now - cache := newFifoCache[int](cfg, "test", m, timeNow) + cache := newLruCache[int](cfg, "test", m, timeNow) old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) { return 1, 0, nil }) @@ -59,8 +60,66 @@ func TestFifoCacheDisabled(t *testing.T) { require.False(t, cache.contains("key1")) } -func TestFifoCacheExpire(t *testing.T) { +func TestLru(t *testing.T) { + maxNumberOfCachedItems := 5 + keySize := 20 + + cfg := PostingsCacheConfig{ + Enabled: true, + MaxBytes: int64(maxNumberOfCachedItems + keySize*maxNumberOfCachedItems), // for this test each element has size of 1, to it will be 'maxNumberOfCachedItems' elements + Ttl: time.Hour, + } + r := prometheus.NewPedanticRegistry() + m := NewPostingCacheMetrics(r) + cache := newLruCache[int](cfg, "test", m, time.Now) + + for i := 0; i < maxNumberOfCachedItems; i++ { + key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + _, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil }) + require.False(t, hit) + require.Equal(t, key, cache.cached.Back().Value) + assertCacheItemsCount(t, cache, i+1) + } + + for i := 0; i < maxNumberOfCachedItems; i++ { + key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + _, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil }) + require.True(t, hit) + require.Equal(t, key, cache.cached.Back().Value) + assertCacheItemsCount(t, cache, maxNumberOfCachedItems) + } + + // Lets now hit 2 random keys and make sure they will be the last to be expired + recentUsedKeys := make(map[string]struct{}) + for i := 0; i < 2; i++ { + key := RepeatStringIfNeeded(fmt.Sprintf("key%d", rand.Int()%maxNumberOfCachedItems), keySize) + _, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil }) + require.True(t, hit) + assertCacheItemsCount(t, cache, maxNumberOfCachedItems) + recentUsedKeys[key] = struct{}{} + } + + // Create new keys and make sure the recentUsedKeys are still in the cache + for i := 0; i < maxNumberOfCachedItems-2; i++ { + key := RepeatStringIfNeeded(fmt.Sprintf("key_new%d", i), keySize) + _, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil }) + require.False(t, hit) + require.Equal(t, maxNumberOfCachedItems, cache.cached.Len()) + } + + for i := 0; i < maxNumberOfCachedItems; i++ { + key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + if _, ok := recentUsedKeys[key]; ok { + _, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil }) + require.True(t, hit) + } else { + require.False(t, cache.contains(key)) + } + } +} + +func TestCacheExpire(t *testing.T) { keySize := 20 numberOfKeys := 100 @@ -93,7 +152,7 @@ func TestFifoCacheExpire(t *testing.T) { r := prometheus.NewPedanticRegistry() m := NewPostingCacheMetrics(r) timeNow := time.Now - cache := newFifoCache[int](c.cfg, "test", m, timeNow) + cache := newLruCache[int](c.cfg, "test", m, timeNow) for i := 0; i < numberOfKeys; i++ { key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) @@ -183,3 +242,13 @@ func RepeatStringIfNeeded(seed string, length int) string { return strings.Repeat(seed, 1+length/len(seed))[:max(length, len(seed))] } + +func assertCacheItemsCount[T any](t *testing.T, cache *lruCache[T], size int) { + require.Equal(t, size, cache.cached.Len()) + count := 0 + cache.cachedValues.Range(func(k, v any) bool { + count++ + return true + }) + require.Equal(t, size, count) +}