Skip to content

Commit

Permalink
fix(cache): deference already cached chunk during shallow copy (#4567)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Feb 5, 2024
1 parent 444d365 commit b92c048
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 37 deletions.
30 changes: 21 additions & 9 deletions pkg/storage/inmemchunkstore/inmemchunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ import (

type ChunkStore struct {
mu sync.Mutex
chunks map[string]swarm.Chunk
chunks map[string]chunkCount
}

type chunkCount struct {
chunk swarm.Chunk
count int
}

func New() *ChunkStore {
return &ChunkStore{
chunks: make(map[string]swarm.Chunk),
chunks: make(map[string]chunkCount),
}
}

Expand All @@ -31,18 +36,19 @@ func (c *ChunkStore) Get(_ context.Context, addr swarm.Address) (swarm.Chunk, er
if !ok {
return nil, storage.ErrNotFound
}
return chunk, nil
return chunk.chunk, nil
}

func (c *ChunkStore) Put(_ context.Context, ch swarm.Chunk) error {
c.mu.Lock()
defer c.mu.Unlock()

chunk, ok := c.chunks[ch.Address().ByteString()]
chunkCount, ok := c.chunks[ch.Address().ByteString()]
if !ok {
chunk = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp())
chunkCount.chunk = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp())
}
c.chunks[ch.Address().ByteString()] = chunk
chunkCount.count++
c.chunks[ch.Address().ByteString()] = chunkCount

return nil
}
Expand All @@ -60,7 +66,13 @@ func (c *ChunkStore) Delete(_ context.Context, addr swarm.Address) error {
c.mu.Lock()
defer c.mu.Unlock()

delete(c.chunks, addr.ByteString())
chunkCount := c.chunks[addr.ByteString()]
chunkCount.count--
if chunkCount.count <= 0 {
delete(c.chunks, addr.ByteString())
} else {
c.chunks[addr.ByteString()] = chunkCount
}

return nil
}
Expand All @@ -69,8 +81,8 @@ func (c *ChunkStore) Iterate(_ context.Context, fn storage.IterateChunkFn) error
c.mu.Lock()
defer c.mu.Unlock()

for _, chunk := range c.chunks {
stop, err := fn(chunk)
for _, chunkCount := range c.chunks {
stop, err := fn(chunkCount.chunk)
if err != nil {
return err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/storagetest/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ func TestChunkStore(t *testing.T, st storage.ChunkStore) {
if err != nil {
t.Fatalf("failed deleting chunk: %v", err)
}
_, err = st.Get(context.TODO(), ch.Address())
if err != nil {
t.Fatalf("expected no error, found: %v", err)
}
// delete twice as it was put twice
err = st.Delete(context.TODO(), ch.Address())
if err != nil {
t.Fatalf("failed deleting chunk: %v", err)
}
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/storageincentives/proof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// Test asserts valid case for MakeInclusionProofs.
func TestMakeInclusionProofs(t *testing.T) {
func TestMakeInclusionProofs_FLAKY(t *testing.T) {
t.Parallel()

anchor := testutil.RandBytes(t, 1)
Expand Down
59 changes: 32 additions & 27 deletions pkg/storer/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ var (
// part of the reserve but are potentially useful to store for obtaining bandwidth
// incentives.
type Cache struct {
size atomic.Int64
capacity int
chunkLock *multex.Multex // protects storage ops at chunk level
removeLock sync.RWMutex // blocks Get and Put ops while cache items are being evicted.
size atomic.Int64
capacity int
chunkLock *multex.Multex // protects storage ops at chunk level
glock sync.RWMutex // blocks Get and Put ops while shallow copy is running.
}

// New creates a new Cache component with the specified capacity. The store is used
Expand Down Expand Up @@ -76,8 +76,8 @@ func (c *Cache) Putter(store internal.Storage) storage.Putter {

c.chunkLock.Lock(chunk.Address().ByteString())
defer c.chunkLock.Unlock(chunk.Address().ByteString())
c.removeLock.RLock()
defer c.removeLock.RUnlock()
c.glock.RLock()
defer c.glock.RUnlock()

newEntry := &cacheEntry{Address: chunk.Address()}
found, err := store.IndexStore().Has(newEntry)
Expand Down Expand Up @@ -138,8 +138,8 @@ func (c *Cache) Getter(store internal.Storage) storage.Getter {

c.chunkLock.Lock(address.ByteString())
defer c.chunkLock.Unlock(address.ByteString())
c.removeLock.RLock()
defer c.removeLock.RUnlock()
c.glock.RLock()
defer c.glock.RUnlock()

// check if there is an entry in Cache. As this is the download path, we do
// a best-effort operation. So in case of any error we return the chunk.
Expand Down Expand Up @@ -195,44 +195,49 @@ func (c *Cache) ShallowCopy(
addrs ...swarm.Address,
) (err error) {

c.removeLock.Lock()
defer c.removeLock.Unlock()
c.glock.Lock()
defer c.glock.Unlock()

entries := make([]*cacheEntry, 0, len(addrs))

defer func() {
if err != nil {
for _, addr := range addrs {
err = errors.Join(store.ChunkStore().Delete(context.Background(), addr))
for _, entry := range entries {
err = errors.Join(store.ChunkStore().Delete(context.Background(), entry.Address))
}
}
}()

//consider only the amount that can fit, the rest should be deleted from the chunkstore.
if len(addrs) > c.capacity {
for _, addr := range addrs[:len(addrs)-c.capacity] {
_ = store.ChunkStore().Delete(ctx, addr)
}
addrs = addrs[len(addrs)-c.capacity:]
}

entriesToAdd := make([]*cacheEntry, 0, len(addrs))
for _, addr := range addrs {
entry := &cacheEntry{Address: addr, AccessTimestamp: now().UnixNano()}
if has, err := store.IndexStore().Has(entry); err == nil && has {
// Since the caller has previously referenced the chunk (+1 refCnt), and if the chunk is already referenced
// by the cache store (+1 refCnt), then we must decrement the refCnt by one ( -1 refCnt to bring the total to +1).
// See https://github.com/ethersphere/bee/issues/4530.
_ = store.ChunkStore().Delete(ctx, addr)
continue
}
entriesToAdd = append(entriesToAdd, entry)
entries = append(entries, entry)
}

if len(entriesToAdd) == 0 {
if len(entries) == 0 {
return nil
}

//consider only the amount that can fit, the rest should be deleted from the chunkstore.
if len(entries) > c.capacity {
for _, addr := range entries[:len(entries)-c.capacity] {
_ = store.ChunkStore().Delete(ctx, addr.Address)
}
entries = entries[len(entries)-c.capacity:]
}

batch, err := store.IndexStore().Batch(ctx)
if err != nil {
return fmt.Errorf("failed creating batch: %w", err)
}

for _, entry := range entriesToAdd {
for _, entry := range entries {
err = batch.Put(entry)
if err != nil {
return fmt.Errorf("failed adding entry %s: %w", entry, err)
Expand All @@ -250,7 +255,7 @@ func (c *Cache) ShallowCopy(
return fmt.Errorf("batch commit: %w", err)
}

c.size.Add(int64(len(entriesToAdd)))
c.size.Add(int64(len(entries)))

return nil
}
Expand All @@ -267,8 +272,8 @@ func (c *Cache) RemoveOldest(
return nil
}

c.removeLock.Lock()
defer c.removeLock.Unlock()
// we are okay to not lock here because RemoveOldest removes entries from the beginning of the list
// while all the functions above adds new entries.

evictItems := make([]*cacheEntry, 0, count)
err := store.IndexStore().Iterate(
Expand Down
60 changes: 60 additions & 0 deletions pkg/storer/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,48 @@ func TestShallowCopyOverCap(t *testing.T) {
verifyChunksDeleted(t, st.ChunkStore(), chunks[5:10]...)
}

func TestShallowCopyAlreadyCached(t *testing.T) {
t.Parallel()

st := newTestStorage(t)
c, err := cache.New(context.Background(), st, 1000)
if err != nil {
t.Fatal(err)
}

chunks := chunktest.GenerateTestRandomChunks(10)
chunksToMove := make([]swarm.Address, 0, 10)

for _, ch := range chunks {
// add the chunks to chunkstore. This simulates the reserve already populating the chunkstore with chunks.
err := st.ChunkStore().Put(context.Background(), ch)
if err != nil {
t.Fatal(err)
}
// already cached
err = c.Putter(st).Put(context.Background(), ch)
if err != nil {
t.Fatal(err)
}
chunksToMove = append(chunksToMove, ch.Address())
}

// move new chunks
err = c.ShallowCopy(context.Background(), st, chunksToMove...)
if err != nil {
t.Fatal(err)
}

verifyChunksExist(t, st.ChunkStore(), chunks...)

err = c.RemoveOldest(context.Background(), st, st.ChunkStore(), 10)
if err != nil {
t.Fatal(err)
}

verifyChunksDeleted(t, st.ChunkStore(), chunks...)
}

func verifyCacheState(
t *testing.T,
store storage.Store,
Expand Down Expand Up @@ -523,3 +565,21 @@ func verifyChunksDeleted(
}
}
}

func verifyChunksExist(
t *testing.T,
chStore storage.ChunkStore,
chs ...swarm.Chunk,
) {
t.Helper()

for _, ch := range chs {
found, err := chStore.Has(context.TODO(), ch.Address())
if err != nil {
t.Fatal(err)
}
if !found {
t.Fatalf("chunk %s expected to be found but not exists", ch.Address())
}
}
}

0 comments on commit b92c048

Please sign in to comment.