From 2153bef5d42cd1544aa72e84657a0241ac7179e4 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 22 Nov 2024 15:55:43 -0800 Subject: [PATCH] Fix data race on expanded postings Cache (#6369) * Creating Test Signed-off-by: alanprot * fix Signed-off-by: alanprot --------- Signed-off-by: alanprot --- pkg/ingester/ingester_test.go | 39 ++++++++++++--------- pkg/storage/tsdb/expanded_postings_cache.go | 4 +-- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 6f1f145a9e..fa8616d868 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5085,12 +5085,16 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) { cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} cfg.LifecyclerConfig.JoinAfter = 0 cfg.BlocksStorageConfig.TSDB.PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig{ - SeedSize: 1, // lets make sure all metric names collide + SeedSize: 3, // lets make sure all metric names collide Head: cortex_tsdb.PostingsCacheConfig{ - Enabled: true, + Enabled: true, + Ttl: time.Hour, + MaxBytes: 1024 * 1024 * 1024, }, Blocks: cortex_tsdb.PostingsCacheConfig{ - Enabled: true, + Enabled: true, + Ttl: time.Hour, + MaxBytes: 1024 * 1024 * 1024, }, } @@ -5102,21 +5106,22 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) { numberOfTenants := 100 wg := sync.WaitGroup{} - wg.Add(numberOfTenants) - for j := 0; j < numberOfTenants; j++ { - go func() { - defer wg.Done() - userId := fmt.Sprintf("user%v", j) - ctx := user.InjectOrgID(context.Background(), userId) - _, err = i.Push(ctx, cortexpb.ToWriteRequest( - []labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userId)}, []cortexpb.Sample{{Value: 2, TimestampMs: 4 * 60 * 60 * 1000}}, nil, nil, cortexpb.API)) - require.NoError(t, err) - }() + for k := 0; k < 10; k++ { + wg.Add(numberOfTenants) + for j := 0; j < numberOfTenants; j++ { + go func() { + defer wg.Done() + userId := fmt.Sprintf("user%v", j) + ctx := user.InjectOrgID(context.Background(), userId) + _, err := i.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userId, "k", strconv.Itoa(k))}, []cortexpb.Sample{{Value: 2, TimestampMs: 4 * 60 * 60 * 1000}}, nil, nil, cortexpb.API)) + require.NoError(t, err) + }() + } + wg.Wait() } - wg.Wait() - wg.Add(numberOfTenants) for j := 0; j < numberOfTenants; j++ { go func() { @@ -5131,8 +5136,8 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) { Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: "foo"}}, }, s) require.NoError(t, err) - require.Len(t, s.series, 1) - require.Len(t, s.series[0].Labels, 2) + require.Len(t, s.series, 10) + require.Len(t, s.series[0].Labels, 3) require.Equal(t, userId, cortexpb.FromLabelAdaptersToLabels(s.series[0].Labels).Get("userId")) }() } diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index ae9cbea2ee..921881b5c5 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -302,7 +302,7 @@ func newSeedByHash(size int) *seedByHash { func (s *seedByHash) getSeed(userId string, v string) string { h := memHashString(userId, v) i := h % uint64(len(s.seedByHash)) - l := h % uint64(len(s.strippedLock)) + l := i % uint64(len(s.strippedLock)) s.strippedLock[l].RLock() defer s.strippedLock[l].RUnlock() return strconv.Itoa(s.seedByHash[i]) @@ -311,7 +311,7 @@ func (s *seedByHash) getSeed(userId string, v string) string { func (s *seedByHash) incrementSeed(userId string, v string) { h := memHashString(userId, v) i := h % uint64(len(s.seedByHash)) - l := h % uint64(len(s.strippedLock)) + l := i % uint64(len(s.strippedLock)) s.strippedLock[l].Lock() defer s.strippedLock[l].Unlock() s.seedByHash[i]++