Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lru cache #6354

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 36 additions & 23 deletions pkg/storage/tsdb/expanded_postings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ type ExpandedPostingsCache interface {
type BlocksPostingsForMatchersCache struct {
strippedLock []sync.RWMutex

headCache *fifoCache[[]storage.SeriesRef]
blocksCache *fifoCache[[]storage.SeriesRef]
headCache *lruCache[[]storage.SeriesRef]
blocksCache *lruCache[[]storage.SeriesRef]

headSeedByMetricName []int
postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
Expand All @@ -117,8 +117,8 @@ func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *Exp
}

return &BlocksPostingsForMatchersCache{
headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
headCache: newLruCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
blocksCache: newLruCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
headSeedByMetricName: make([]int, seedArraySize),
strippedLock: make([]sync.RWMutex, numOfSeedsStripes),
postingsForMatchersFunc: cfg.PostingsForMatchers,
Expand Down Expand Up @@ -272,31 +272,33 @@ func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) {
return "", false
}

type fifoCache[V any] struct {
type lruCache[V any] struct {
cfg PostingsCacheConfig
cachedValues *sync.Map
timeNow func() time.Time
name string
metrics ExpandedPostingsCacheMetrics

// Fields from here should be locked
cachedMtx sync.RWMutex
cached *list.List
cachedMtx sync.RWMutex
// Keeps tracks of the last recent used keys.
// The most recent key used is placed in the back of the list while items should be evicted from the front of the list
lruList *list.List
cachedBytes int64
}

func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] {
return &fifoCache[V]{
func newLruCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *lruCache[V] {
return &lruCache[V]{
cachedValues: new(sync.Map),
cached: list.New(),
lruList: list.New(),
cfg: cfg,
timeNow: timeNow,
name: name,
metrics: *metrics,
}
}

func (c *fifoCache[V]) expire() {
func (c *lruCache[V]) expire() {
if c.cfg.Ttl <= 0 {
return
}
Expand All @@ -314,7 +316,7 @@ func (c *fifoCache[V]) expire() {
}
}

func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
func (c *lruCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
r := &cacheEntryPromise[V]{
done: make(chan struct{}),
}
Expand All @@ -331,13 +333,14 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
r.v, r.sizeBytes, r.err = fetch()
r.sizeBytes += int64(len(k))
r.ts = c.timeNow()
c.created(k, r.sizeBytes)
r.lElement = c.created(k, r.sizeBytes)
c.expire()
}

if ok {
// If the promise is already in the cache, lets wait it to fetch the data.
<-loaded.(*cacheEntryPromise[V]).done
c.moveBack(loaded.(*cacheEntryPromise[V]).lElement)

// If is cached but is expired, lets try to replace the cache value.
if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) {
Expand All @@ -354,13 +357,13 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
return loaded.(*cacheEntryPromise[V]), ok
}

func (c *fifoCache[V]) contains(k string) bool {
func (c *lruCache[V]) contains(k string) bool {
_, ok := c.cachedValues.Load(k)
return ok
}

func (c *fifoCache[V]) shouldEvictHead() (string, bool) {
h := c.cached.Front()
func (c *lruCache[V]) shouldEvictHead() (string, bool) {
h := c.lruList.Front()
if h == nil {
return "", false
}
Expand All @@ -380,27 +383,33 @@ func (c *fifoCache[V]) shouldEvictHead() (string, bool) {
return "", false
}

func (c *fifoCache[V]) evictHead() {
front := c.cached.Front()
c.cached.Remove(front)
func (c *lruCache[V]) evictHead() {
front := c.lruList.Front()
c.lruList.Remove(front)
oldestKey := front.Value.(string)
if oldest, loaded := c.cachedValues.LoadAndDelete(oldestKey); loaded {
c.cachedBytes -= oldest.(*cacheEntryPromise[V]).sizeBytes
}
}

func (c *fifoCache[V]) created(key string, sizeBytes int64) {
func (c *lruCache[V]) created(key string, sizeBytes int64) *list.Element {
if c.cfg.Ttl <= 0 {
c.cachedValues.Delete(key)
return
return nil
}
c.cachedMtx.Lock()
defer c.cachedMtx.Unlock()
c.cached.PushBack(key)
c.cachedBytes += sizeBytes
return c.lruList.PushBack(key)
}

func (c *lruCache[V]) moveBack(ele *list.Element) {
c.cachedMtx.Lock()
defer c.cachedMtx.Unlock()
c.lruList.MoveToBack(ele)
}

func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) {
func (c *lruCache[V]) updateSize(oldSize, newSizeBytes int64) {
if oldSize == newSizeBytes {
return
}
Expand All @@ -417,6 +426,10 @@ type cacheEntryPromise[V any] struct {
done chan struct{}
v V
err error

// reference for the element in the LRU list
// This is used to push this cache entry to the back of the list as result as a cache hit
lElement *list.Element
}

func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool {
Expand Down
79 changes: 74 additions & 5 deletions pkg/storage/tsdb/expanded_postings_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsdb
import (
"bytes"
"fmt"
"math/rand"
"strings"
"sync"
"testing"
Expand All @@ -21,7 +22,7 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {
MaxBytes: 10 << 20,
}
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
cache := newFifoCache[int](cfg, "test", m, time.Now)
cache := newLruCache[int](cfg, "test", m, time.Now)
calls := atomic.Int64{}
concurrency := 100
wg := sync.WaitGroup{}
Expand All @@ -45,12 +46,12 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {

}

func TestFifoCacheDisabled(t *testing.T) {
func TestCacheDisabled(t *testing.T) {
cfg := PostingsCacheConfig{}
cfg.Enabled = false
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
timeNow := time.Now
cache := newFifoCache[int](cfg, "test", m, timeNow)
cache := newLruCache[int](cfg, "test", m, timeNow)
old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) {
return 1, 0, nil
})
Expand All @@ -59,8 +60,66 @@ func TestFifoCacheDisabled(t *testing.T) {
require.False(t, cache.contains("key1"))
}

func TestFifoCacheExpire(t *testing.T) {
func TestLru(t *testing.T) {
maxNumberOfCachedItems := 5
keySize := 20

cfg := PostingsCacheConfig{
Enabled: true,
MaxBytes: int64(maxNumberOfCachedItems + keySize*maxNumberOfCachedItems), // for this test each element has size of 1, to it will be 'maxNumberOfCachedItems' elements
Ttl: time.Hour,
}
r := prometheus.NewPedanticRegistry()
m := NewPostingCacheMetrics(r)
cache := newLruCache[int](cfg, "test", m, time.Now)

for i := 0; i < maxNumberOfCachedItems; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
_, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil })
require.False(t, hit)
require.Equal(t, key, cache.lruList.Back().Value)
assertCacheItemsCount(t, cache, i+1)
}

for i := 0; i < maxNumberOfCachedItems; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
_, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil })
require.True(t, hit)
require.Equal(t, key, cache.lruList.Back().Value)
assertCacheItemsCount(t, cache, maxNumberOfCachedItems)
}

// Lets now hit 2 random keys and make sure they will be the last to be expired
recentUsedKeys := make(map[string]struct{})
for i := 0; i < 2; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", rand.Int()%maxNumberOfCachedItems), keySize)
_, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil })
require.True(t, hit)
assertCacheItemsCount(t, cache, maxNumberOfCachedItems)
recentUsedKeys[key] = struct{}{}
}

// Create new keys and make sure the recentUsedKeys are still in the cache
for i := 0; i < maxNumberOfCachedItems-2; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key_new%d", i), keySize)
_, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil })
require.False(t, hit)
require.Equal(t, maxNumberOfCachedItems, cache.lruList.Len())
}

for i := 0; i < maxNumberOfCachedItems; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
if _, ok := recentUsedKeys[key]; ok {
_, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil })
require.True(t, hit)
} else {
require.False(t, cache.contains(key))
}
}

}

func TestCacheExpire(t *testing.T) {
keySize := 20
numberOfKeys := 100

Expand Down Expand Up @@ -93,7 +152,7 @@ func TestFifoCacheExpire(t *testing.T) {
r := prometheus.NewPedanticRegistry()
m := NewPostingCacheMetrics(r)
timeNow := time.Now
cache := newFifoCache[int](c.cfg, "test", m, timeNow)
cache := newLruCache[int](c.cfg, "test", m, timeNow)

for i := 0; i < numberOfKeys; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
Expand Down Expand Up @@ -183,3 +242,13 @@ func RepeatStringIfNeeded(seed string, length int) string {

return strings.Repeat(seed, 1+length/len(seed))[:max(length, len(seed))]
}

func assertCacheItemsCount[T any](t *testing.T, cache *lruCache[T], size int) {
require.Equal(t, size, cache.lruList.Len())
count := 0
cache.cachedValues.Range(func(k, v any) bool {
count++
return true
})
require.Equal(t, size, count)
}
Loading