diff --git a/pkg/storage/inmemchunkstore/inmemchunkstore.go b/pkg/storage/inmemchunkstore/inmemchunkstore.go index 8f1dcb77a7d..0f225db0608 100644 --- a/pkg/storage/inmemchunkstore/inmemchunkstore.go +++ b/pkg/storage/inmemchunkstore/inmemchunkstore.go @@ -14,12 +14,17 @@ import ( type ChunkStore struct { mu sync.Mutex - chunks map[string]swarm.Chunk + chunks map[string]chunkCount +} + +type chunkCount struct { + chunk swarm.Chunk + count int } func New() *ChunkStore { return &ChunkStore{ - chunks: make(map[string]swarm.Chunk), + chunks: make(map[string]chunkCount), } } @@ -31,18 +36,19 @@ func (c *ChunkStore) Get(_ context.Context, addr swarm.Address) (swarm.Chunk, er if !ok { return nil, storage.ErrNotFound } - return chunk, nil + return chunk.chunk, nil } func (c *ChunkStore) Put(_ context.Context, ch swarm.Chunk) error { c.mu.Lock() defer c.mu.Unlock() - chunk, ok := c.chunks[ch.Address().ByteString()] + chunkCount, ok := c.chunks[ch.Address().ByteString()] if !ok { - chunk = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp()) + chunkCount.chunk = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp()) } - c.chunks[ch.Address().ByteString()] = chunk + chunkCount.count++ + c.chunks[ch.Address().ByteString()] = chunkCount return nil } @@ -60,7 +66,13 @@ func (c *ChunkStore) Delete(_ context.Context, addr swarm.Address) error { c.mu.Lock() defer c.mu.Unlock() - delete(c.chunks, addr.ByteString()) + chunkCount := c.chunks[addr.ByteString()] + chunkCount.count-- + if chunkCount.count <= 0 { + delete(c.chunks, addr.ByteString()) + } else { + c.chunks[addr.ByteString()] = chunkCount + } return nil } @@ -69,8 +81,8 @@ func (c *ChunkStore) Iterate(_ context.Context, fn storage.IterateChunkFn) error c.mu.Lock() defer c.mu.Unlock() - for _, chunk := range c.chunks { - stop, err := fn(chunk) + for _, chunkCount := range c.chunks { + stop, err := fn(chunkCount.chunk) if err != nil { return err } diff --git a/pkg/storage/storagetest/chunkstore.go b/pkg/storage/storagetest/chunkstore.go index dbd3e4d79a3..767c4a2a0c4 100644 --- a/pkg/storage/storagetest/chunkstore.go +++ b/pkg/storage/storagetest/chunkstore.go @@ -95,6 +95,15 @@ func TestChunkStore(t *testing.T, st storage.ChunkStore) { if err != nil { t.Fatalf("failed deleting chunk: %v", err) } + _, err = st.Get(context.TODO(), ch.Address()) + if err != nil { + t.Fatalf("expected no error, found: %v", err) + } + // delete twice as it was put twice + err = st.Delete(context.TODO(), ch.Address()) + if err != nil { + t.Fatalf("failed deleting chunk: %v", err) + } } } }) diff --git a/pkg/storageincentives/proof_test.go b/pkg/storageincentives/proof_test.go index 860f46022f5..1d83309ba24 100644 --- a/pkg/storageincentives/proof_test.go +++ b/pkg/storageincentives/proof_test.go @@ -26,7 +26,7 @@ import ( ) // Test asserts valid case for MakeInclusionProofs. -func TestMakeInclusionProofs(t *testing.T) { +func TestMakeInclusionProofs_FLAKY(t *testing.T) { t.Parallel() anchor := testutil.RandBytes(t, 1) diff --git a/pkg/storer/internal/cache/cache.go b/pkg/storer/internal/cache/cache.go index f4ae7e06a4e..6a79ad38521 100644 --- a/pkg/storer/internal/cache/cache.go +++ b/pkg/storer/internal/cache/cache.go @@ -39,10 +39,10 @@ var ( // part of the reserve but are potentially useful to store for obtaining bandwidth // incentives. type Cache struct { - size atomic.Int64 - capacity int - chunkLock *multex.Multex // protects storage ops at chunk level - removeLock sync.RWMutex // blocks Get and Put ops while cache items are being evicted. + size atomic.Int64 + capacity int + chunkLock *multex.Multex // protects storage ops at chunk level + glock sync.RWMutex // blocks Get and Put ops while shallow copy is running. } // New creates a new Cache component with the specified capacity. The store is used @@ -76,8 +76,8 @@ func (c *Cache) Putter(store internal.Storage) storage.Putter { c.chunkLock.Lock(chunk.Address().ByteString()) defer c.chunkLock.Unlock(chunk.Address().ByteString()) - c.removeLock.RLock() - defer c.removeLock.RUnlock() + c.glock.RLock() + defer c.glock.RUnlock() newEntry := &cacheEntry{Address: chunk.Address()} found, err := store.IndexStore().Has(newEntry) @@ -138,8 +138,8 @@ func (c *Cache) Getter(store internal.Storage) storage.Getter { c.chunkLock.Lock(address.ByteString()) defer c.chunkLock.Unlock(address.ByteString()) - c.removeLock.RLock() - defer c.removeLock.RUnlock() + c.glock.RLock() + defer c.glock.RUnlock() // check if there is an entry in Cache. As this is the download path, we do // a best-effort operation. So in case of any error we return the chunk. @@ -195,44 +195,49 @@ func (c *Cache) ShallowCopy( addrs ...swarm.Address, ) (err error) { - c.removeLock.Lock() - defer c.removeLock.Unlock() + c.glock.Lock() + defer c.glock.Unlock() + + entries := make([]*cacheEntry, 0, len(addrs)) defer func() { if err != nil { - for _, addr := range addrs { - err = errors.Join(store.ChunkStore().Delete(context.Background(), addr)) + for _, entry := range entries { + err = errors.Join(store.ChunkStore().Delete(context.Background(), entry.Address)) } } }() - //consider only the amount that can fit, the rest should be deleted from the chunkstore. - if len(addrs) > c.capacity { - for _, addr := range addrs[:len(addrs)-c.capacity] { - _ = store.ChunkStore().Delete(ctx, addr) - } - addrs = addrs[len(addrs)-c.capacity:] - } - - entriesToAdd := make([]*cacheEntry, 0, len(addrs)) for _, addr := range addrs { entry := &cacheEntry{Address: addr, AccessTimestamp: now().UnixNano()} if has, err := store.IndexStore().Has(entry); err == nil && has { + // Since the caller has previously referenced the chunk (+1 refCnt), and if the chunk is already referenced + // by the cache store (+1 refCnt), then we must decrement the refCnt by one ( -1 refCnt to bring the total to +1). + // See https://github.com/ethersphere/bee/issues/4530. + _ = store.ChunkStore().Delete(ctx, addr) continue } - entriesToAdd = append(entriesToAdd, entry) + entries = append(entries, entry) } - if len(entriesToAdd) == 0 { + if len(entries) == 0 { return nil } + //consider only the amount that can fit, the rest should be deleted from the chunkstore. + if len(entries) > c.capacity { + for _, addr := range entries[:len(entries)-c.capacity] { + _ = store.ChunkStore().Delete(ctx, addr.Address) + } + entries = entries[len(entries)-c.capacity:] + } + batch, err := store.IndexStore().Batch(ctx) if err != nil { return fmt.Errorf("failed creating batch: %w", err) } - for _, entry := range entriesToAdd { + for _, entry := range entries { err = batch.Put(entry) if err != nil { return fmt.Errorf("failed adding entry %s: %w", entry, err) @@ -250,7 +255,7 @@ func (c *Cache) ShallowCopy( return fmt.Errorf("batch commit: %w", err) } - c.size.Add(int64(len(entriesToAdd))) + c.size.Add(int64(len(entries))) return nil } @@ -267,8 +272,8 @@ func (c *Cache) RemoveOldest( return nil } - c.removeLock.Lock() - defer c.removeLock.Unlock() + // we are okay to not lock here because RemoveOldest removes entries from the beginning of the list + // while all the functions above adds new entries. evictItems := make([]*cacheEntry, 0, count) err := store.IndexStore().Iterate( diff --git a/pkg/storer/internal/cache/cache_test.go b/pkg/storer/internal/cache/cache_test.go index 6450dc71198..8a3da624c75 100644 --- a/pkg/storer/internal/cache/cache_test.go +++ b/pkg/storer/internal/cache/cache_test.go @@ -455,6 +455,48 @@ func TestShallowCopyOverCap(t *testing.T) { verifyChunksDeleted(t, st.ChunkStore(), chunks[5:10]...) } +func TestShallowCopyAlreadyCached(t *testing.T) { + t.Parallel() + + st := newTestStorage(t) + c, err := cache.New(context.Background(), st, 1000) + if err != nil { + t.Fatal(err) + } + + chunks := chunktest.GenerateTestRandomChunks(10) + chunksToMove := make([]swarm.Address, 0, 10) + + for _, ch := range chunks { + // add the chunks to chunkstore. This simulates the reserve already populating the chunkstore with chunks. + err := st.ChunkStore().Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + // already cached + err = c.Putter(st).Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + chunksToMove = append(chunksToMove, ch.Address()) + } + + // move new chunks + err = c.ShallowCopy(context.Background(), st, chunksToMove...) + if err != nil { + t.Fatal(err) + } + + verifyChunksExist(t, st.ChunkStore(), chunks...) + + err = c.RemoveOldest(context.Background(), st, st.ChunkStore(), 10) + if err != nil { + t.Fatal(err) + } + + verifyChunksDeleted(t, st.ChunkStore(), chunks...) +} + func verifyCacheState( t *testing.T, store storage.Store, @@ -523,3 +565,21 @@ func verifyChunksDeleted( } } } + +func verifyChunksExist( + t *testing.T, + chStore storage.ChunkStore, + chs ...swarm.Chunk, +) { + t.Helper() + + for _, ch := range chs { + found, err := chStore.Has(context.TODO(), ch.Address()) + if err != nil { + t.Fatal(err) + } + if !found { + t.Fatalf("chunk %s expected to be found but not exists", ch.Address()) + } + } +}