Skip to content

Commit

Permalink
Changing Postings Cache from Fifo to LRU
Browse files Browse the repository at this point in the history
Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot committed Nov 21, 2024
1 parent 71dccee commit e493ae8
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 22 deletions.
43 changes: 26 additions & 17 deletions pkg/storage/tsdb/expanded_postings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ type ExpandedPostingsCache interface {
type BlocksPostingsForMatchersCache struct {
strippedLock []sync.RWMutex

headCache *fifoCache[[]storage.SeriesRef]
blocksCache *fifoCache[[]storage.SeriesRef]
headCache *lruCache[[]storage.SeriesRef]
blocksCache *lruCache[[]storage.SeriesRef]

headSeedByMetricName []int
postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
Expand All @@ -117,8 +117,8 @@ func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *Exp
}

return &BlocksPostingsForMatchersCache{
headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
headCache: newLruCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
blocksCache: newLruCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
headSeedByMetricName: make([]int, seedArraySize),
strippedLock: make([]sync.RWMutex, numOfSeedsStripes),
postingsForMatchersFunc: cfg.PostingsForMatchers,
Expand Down Expand Up @@ -272,7 +272,7 @@ func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) {
return "", false
}

type fifoCache[V any] struct {
type lruCache[V any] struct {
cfg PostingsCacheConfig
cachedValues *sync.Map
timeNow func() time.Time
Expand All @@ -285,8 +285,8 @@ type fifoCache[V any] struct {
cachedBytes int64
}

func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] {
return &fifoCache[V]{
func newLruCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *lruCache[V] {
return &lruCache[V]{
cachedValues: new(sync.Map),
cached: list.New(),
cfg: cfg,
Expand All @@ -296,7 +296,7 @@ func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *Expanded
}
}

func (c *fifoCache[V]) expire() {
func (c *lruCache[V]) expire() {
if c.cfg.Ttl <= 0 {
return
}
Expand All @@ -314,7 +314,7 @@ func (c *fifoCache[V]) expire() {
}
}

func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
func (c *lruCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
r := &cacheEntryPromise[V]{
done: make(chan struct{}),
}
Expand All @@ -331,13 +331,14 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
r.v, r.sizeBytes, r.err = fetch()
r.sizeBytes += int64(len(k))
r.ts = c.timeNow()
c.created(k, r.sizeBytes)
r.lElement = c.created(k, r.sizeBytes)
c.expire()
}

if ok {
// If the promise is already in the cache, lets wait it to fetch the data.
<-loaded.(*cacheEntryPromise[V]).done
c.moveBack(loaded.(*cacheEntryPromise[V]).lElement)

// If is cached but is expired, lets try to replace the cache value.
if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) {
Expand All @@ -354,12 +355,12 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
return loaded.(*cacheEntryPromise[V]), ok
}

func (c *fifoCache[V]) contains(k string) bool {
func (c *lruCache[V]) contains(k string) bool {
_, ok := c.cachedValues.Load(k)
return ok
}

func (c *fifoCache[V]) shouldEvictHead() (string, bool) {
func (c *lruCache[V]) shouldEvictHead() (string, bool) {
h := c.cached.Front()
if h == nil {
return "", false
Expand All @@ -380,7 +381,7 @@ func (c *fifoCache[V]) shouldEvictHead() (string, bool) {
return "", false
}

func (c *fifoCache[V]) evictHead() {
func (c *lruCache[V]) evictHead() {
front := c.cached.Front()
c.cached.Remove(front)
oldestKey := front.Value.(string)
Expand All @@ -389,18 +390,24 @@ func (c *fifoCache[V]) evictHead() {
}
}

func (c *fifoCache[V]) created(key string, sizeBytes int64) {
func (c *lruCache[V]) created(key string, sizeBytes int64) *list.Element {
if c.cfg.Ttl <= 0 {
c.cachedValues.Delete(key)
return
return nil
}
c.cachedMtx.Lock()
defer c.cachedMtx.Unlock()
c.cached.PushBack(key)
c.cachedBytes += sizeBytes
return c.cached.PushBack(key)
}

func (c *lruCache[V]) moveBack(ele *list.Element) {
c.cachedMtx.Lock()
defer c.cachedMtx.Unlock()
c.cached.MoveToBack(ele)
}

func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) {
func (c *lruCache[V]) updateSize(oldSize, newSizeBytes int64) {
if oldSize == newSizeBytes {
return
}
Expand All @@ -417,6 +424,8 @@ type cacheEntryPromise[V any] struct {
done chan struct{}
v V
err error

lElement *list.Element
}

func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool {
Expand Down
79 changes: 74 additions & 5 deletions pkg/storage/tsdb/expanded_postings_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsdb
import (
"bytes"
"fmt"
"math/rand"
"strings"
"sync"
"testing"
Expand All @@ -21,7 +22,7 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {
MaxBytes: 10 << 20,
}
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
cache := newFifoCache[int](cfg, "test", m, time.Now)
cache := newLruCache[int](cfg, "test", m, time.Now)
calls := atomic.Int64{}
concurrency := 100
wg := sync.WaitGroup{}
Expand All @@ -45,12 +46,12 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {

}

func TestFifoCacheDisabled(t *testing.T) {
func TestCacheDisabled(t *testing.T) {
cfg := PostingsCacheConfig{}
cfg.Enabled = false
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
timeNow := time.Now
cache := newFifoCache[int](cfg, "test", m, timeNow)
cache := newLruCache[int](cfg, "test", m, timeNow)
old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) {
return 1, 0, nil
})
Expand All @@ -59,8 +60,66 @@ func TestFifoCacheDisabled(t *testing.T) {
require.False(t, cache.contains("key1"))
}

func TestFifoCacheExpire(t *testing.T) {
func TestLru(t *testing.T) {
maxNumberOfCachedItems := 5
keySize := 20

cfg := PostingsCacheConfig{
Enabled: true,
MaxBytes: int64(maxNumberOfCachedItems + keySize*maxNumberOfCachedItems), // for this test each element has size of 1, to it will be 'maxNumberOfCachedItems' elements
Ttl: time.Hour,
}
r := prometheus.NewPedanticRegistry()
m := NewPostingCacheMetrics(r)
cache := newLruCache[int](cfg, "test", m, time.Now)

for i := 0; i < maxNumberOfCachedItems; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
_, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil })
require.False(t, hit)
require.Equal(t, key, cache.cached.Back().Value)
assertCacheItemsCount(t, cache, i+1)
}

for i := 0; i < maxNumberOfCachedItems; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
_, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil })
require.True(t, hit)
require.Equal(t, key, cache.cached.Back().Value)
assertCacheItemsCount(t, cache, maxNumberOfCachedItems)
}

// Lets now hit 2 random keys and make sure they will be the last to be expired
recentUsedKeys := make(map[string]struct{})
for i := 0; i < 2; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", rand.Int()%maxNumberOfCachedItems), keySize)
_, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil })
require.True(t, hit)
assertCacheItemsCount(t, cache, maxNumberOfCachedItems)
recentUsedKeys[key] = struct{}{}
}

// Create new keys and make sure the recentUsedKeys are still in the cache
for i := 0; i < maxNumberOfCachedItems-2; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key_new%d", i), keySize)
_, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil })
require.False(t, hit)
require.Equal(t, maxNumberOfCachedItems, cache.cached.Len())
}

for i := 0; i < maxNumberOfCachedItems; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
if _, ok := recentUsedKeys[key]; ok {
_, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil })
require.True(t, hit)
} else {
require.False(t, cache.contains(key))
}
}

}

func TestCacheExpire(t *testing.T) {
keySize := 20
numberOfKeys := 100

Expand Down Expand Up @@ -93,7 +152,7 @@ func TestFifoCacheExpire(t *testing.T) {
r := prometheus.NewPedanticRegistry()
m := NewPostingCacheMetrics(r)
timeNow := time.Now
cache := newFifoCache[int](c.cfg, "test", m, timeNow)
cache := newLruCache[int](c.cfg, "test", m, timeNow)

for i := 0; i < numberOfKeys; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
Expand Down Expand Up @@ -183,3 +242,13 @@ func RepeatStringIfNeeded(seed string, length int) string {

return strings.Repeat(seed, 1+length/len(seed))[:max(length, len(seed))]
}

func assertCacheItemsCount[T any](t *testing.T, cache *lruCache[T], size int) {
require.Equal(t, size, cache.cached.Len())
count := 0
cache.cachedValues.Range(func(k, v any) bool {
count++
return true
})
require.Equal(t, size, count)
}

0 comments on commit e493ae8

Please sign in to comment.