Skip to content

Commit

Permalink
Fix data race on expanded postings Cache (#6369)
Browse files Browse the repository at this point in the history
* Creating Test

Signed-off-by: alanprot <[email protected]>

* fix

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Nov 22, 2024
1 parent c46aec6 commit 2153bef
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 19 deletions.
39 changes: 22 additions & 17 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5085,12 +5085,16 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) {
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
cfg.LifecyclerConfig.JoinAfter = 0
cfg.BlocksStorageConfig.TSDB.PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig{
SeedSize: 1, // lets make sure all metric names collide
SeedSize: 3, // lets make sure all metric names collide
Head: cortex_tsdb.PostingsCacheConfig{
Enabled: true,
Enabled: true,
Ttl: time.Hour,
MaxBytes: 1024 * 1024 * 1024,
},
Blocks: cortex_tsdb.PostingsCacheConfig{
Enabled: true,
Enabled: true,
Ttl: time.Hour,
MaxBytes: 1024 * 1024 * 1024,
},
}

Expand All @@ -5102,21 +5106,22 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) {

numberOfTenants := 100
wg := sync.WaitGroup{}
wg.Add(numberOfTenants)

for j := 0; j < numberOfTenants; j++ {
go func() {
defer wg.Done()
userId := fmt.Sprintf("user%v", j)
ctx := user.InjectOrgID(context.Background(), userId)
_, err = i.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userId)}, []cortexpb.Sample{{Value: 2, TimestampMs: 4 * 60 * 60 * 1000}}, nil, nil, cortexpb.API))
require.NoError(t, err)
}()
for k := 0; k < 10; k++ {
wg.Add(numberOfTenants)
for j := 0; j < numberOfTenants; j++ {
go func() {
defer wg.Done()
userId := fmt.Sprintf("user%v", j)
ctx := user.InjectOrgID(context.Background(), userId)
_, err := i.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userId, "k", strconv.Itoa(k))}, []cortexpb.Sample{{Value: 2, TimestampMs: 4 * 60 * 60 * 1000}}, nil, nil, cortexpb.API))
require.NoError(t, err)
}()
}
wg.Wait()
}

wg.Wait()

wg.Add(numberOfTenants)
for j := 0; j < numberOfTenants; j++ {
go func() {
Expand All @@ -5131,8 +5136,8 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) {
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: "foo"}},
}, s)
require.NoError(t, err)
require.Len(t, s.series, 1)
require.Len(t, s.series[0].Labels, 2)
require.Len(t, s.series, 10)
require.Len(t, s.series[0].Labels, 3)
require.Equal(t, userId, cortexpb.FromLabelAdaptersToLabels(s.series[0].Labels).Get("userId"))
}()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/tsdb/expanded_postings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func newSeedByHash(size int) *seedByHash {
func (s *seedByHash) getSeed(userId string, v string) string {
h := memHashString(userId, v)
i := h % uint64(len(s.seedByHash))
l := h % uint64(len(s.strippedLock))
l := i % uint64(len(s.strippedLock))
s.strippedLock[l].RLock()
defer s.strippedLock[l].RUnlock()
return strconv.Itoa(s.seedByHash[i])
Expand All @@ -311,7 +311,7 @@ func (s *seedByHash) getSeed(userId string, v string) string {
func (s *seedByHash) incrementSeed(userId string, v string) {
h := memHashString(userId, v)
i := h % uint64(len(s.seedByHash))
l := h % uint64(len(s.strippedLock))
l := i % uint64(len(s.strippedLock))
s.strippedLock[l].Lock()
defer s.strippedLock[l].Unlock()
s.seedByHash[i]++
Expand Down

0 comments on commit 2153bef

Please sign in to comment.