Skip to content

Commit

Permalink
fix: chunkstamp load with hash, replace to increase refCnt
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Nov 11, 2024
1 parent a21c988 commit 9e2d66f
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,7 @@ func (c *chunkStore) Put(_ context.Context, ch swarm.Chunk) error {
return nil
}

func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk) error {
func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk, emplace bool) error {
c.mu.Lock()
defer c.mu.Unlock()
c.chunks[ch.Address().ByteString()] = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp())
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Hasser interface {
// Replacer is the interface that wraps the basic Replace method.
type Replacer interface {
// Replace a chunk in the store.
Replace(context.Context, swarm.Chunk) error
Replace(context.Context, swarm.Chunk, bool) error
}

// PutterFunc type is an adapter to allow the use of
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/inmemchunkstore/inmemchunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,17 @@ func (c *ChunkStore) Delete(_ context.Context, addr swarm.Address) error {
return nil
}

func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk) error {
func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk, emplace bool) error {
c.mu.Lock()
defer c.mu.Unlock()

chunkCount := c.chunks[ch.Address().ByteString()]
chunkCount.chunk = ch
if emplace {
chunkCount.count++
}
c.chunks[ch.Address().ByteString()] = chunkCount

return nil
}

Expand Down
38 changes: 38 additions & 0 deletions pkg/storer/internal/chunkstamp/chunkstamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,44 @@ func LoadWithBatchID(s storage.Reader, scope string, addr swarm.Address, batchID
return stamp, nil
}

// LoadWithBatchID returns swarm.Stamp related to the given address and batchID.
func LoadWithStampHash(s storage.Reader, scope string, addr swarm.Address, hash []byte) (swarm.Stamp, error) {
var stamp swarm.Stamp

found := false
err := s.Iterate(
storage.Query{
Factory: func() storage.Item {
return &Item{
scope: []byte(scope),
address: addr,
}
},
},
func(res storage.Result) (bool, error) {
item := res.Entry.(*Item)
h, err := item.stamp.Hash()
if err != nil {
return false, err
}
if bytes.Equal(hash, h) {
stamp = item.stamp
found = true
return true, nil
}
return false, nil
},
)
if err != nil {
return nil, err
}
if !found {
return nil, fmt.Errorf("stamp not found for hash %x: %w", hash, storage.ErrNotFound)
}

return stamp, nil
}

// Store creates new or updated an existing stamp index
// record related to the given scope and chunk.
func Store(s storage.IndexStore, scope string, chunk swarm.Chunk) error {
Expand Down
16 changes: 15 additions & 1 deletion pkg/storer/internal/chunkstamp/chunkstamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestStoreLoadDelete(t *testing.T) {
}
})

t.Run("load stored chunk stamp with batch id", func(t *testing.T) {
t.Run("load stored chunk stamp with batch id and hash", func(t *testing.T) {
want := chunk.Stamp()

have, err := chunkstamp.LoadWithBatchID(ts.IndexStore(), ns, chunk.Address(), chunk.Stamp().BatchID())
Expand All @@ -196,6 +196,20 @@ func TestStoreLoadDelete(t *testing.T) {
if diff := cmp.Diff(want, have, cmp.AllowUnexported(postage.Stamp{})); diff != "" {
t.Fatalf("LoadWithBatchID(...): mismatch (-want +have):\n%s", diff)
}

h, err := want.Hash()
if err != nil {
t.Fatal(err)
}

have, err = chunkstamp.LoadWithStampHash(ts.IndexStore(), ns, chunk.Address(), h)
if err != nil {
t.Fatalf("LoadWithBatchID(...): unexpected error: %v", err)
}

if diff := cmp.Diff(want, have, cmp.AllowUnexported(postage.Stamp{})); diff != "" {
t.Fatalf("LoadWithBatchID(...): mismatch (-want +have):\n%s", diff)
}
})

t.Run("delete stored stamp", func(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/storer/internal/chunkstore/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.
return s.Put(rIdx)
}

func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error {
func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk, emplace bool) error {
rIdx := &RetrievalIndexItem{Address: ch.Address()}
err := s.Get(rIdx)
if err != nil {
Expand All @@ -112,6 +112,9 @@ func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch sw
}
rIdx.Location = loc
rIdx.Timestamp = uint64(time.Now().Unix())
if emplace {
rIdx.RefCnt++
}
return s.Put(rIdx)
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
}

r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address())
return s.ChunkStore().Replace(ctx, chunk)
return s.ChunkStore().Replace(ctx, chunk, false)
}
}

Expand Down Expand Up @@ -282,7 +282,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
}

if sameAddressSoc {
err = s.ChunkStore().Replace(ctx, chunk)
err = s.ChunkStore().Replace(ctx, chunk, true)
} else {
err = s.ChunkStore().Put(ctx, chunk)
}
Expand Down Expand Up @@ -321,7 +321,7 @@ func (r *Reserve) Get(ctx context.Context, addr swarm.Address, batchID []byte, s
return nil, err
}

stamp, err := chunkstamp.LoadWithBatchID(r.st.IndexStore(), reserveScope, addr, item.BatchID)
stamp, err := chunkstamp.LoadWithStampHash(r.st.IndexStore(), reserveScope, addr, stampHash)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -423,7 +423,7 @@ func RemoveChunkWithItem(
) error {
var errs error

stamp, _ := chunkstamp.LoadWithBatchID(trx.IndexStore(), reserveScope, item.Address, item.BatchID)
stamp, _ := chunkstamp.LoadWithStampHash(trx.IndexStore(), reserveScope, item.Address, item.StampHash)
if stamp != nil {
errs = errors.Join(
stampindex.Delete(trx.IndexStore(), reserveScope, stamp),
Expand Down Expand Up @@ -473,7 +473,7 @@ func (r *Reserve) IterateChunks(startBin uint8, cb func(swarm.Chunk) (bool, erro
return false, err
}

stamp, err := chunkstamp.LoadWithBatchID(r.st.IndexStore(), reserveScope, item.Address, item.BatchID)
stamp, err := chunkstamp.LoadWithStampHash(r.st.IndexStore(), reserveScope, item.Address, item.StampHash)
if err != nil {
return false, err
}
Expand Down
67 changes: 67 additions & 0 deletions pkg/storer/internal/reserve/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,73 @@ func TestEvict(t *testing.T) {
}
}

func TestEvictSOC(t *testing.T) {
t.Parallel()

baseAddr := swarm.RandAddress(t)
ts := internal.NewInmemStorage()

r, err := reserve.New(
baseAddr,
ts,
0, kademlia.NewTopologyDriver(),
log.Noop,
)
if err != nil {
t.Fatal(err)
}

batch := postagetesting.MustNewBatch()
signer := getSigner(t)

var chunks []swarm.Chunk

for i := 0; i < 10; i++ {
ch := soctesting.GenerateMockSocWithSigner(t, []byte{byte(i)}, signer).Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, uint64(i), uint64(i)))
chunks = append(chunks, ch)
err := r.Put(context.Background(), ch)
if err != nil {
t.Fatal(err)
}
}

bin := swarm.Proximity(baseAddr.Bytes(), chunks[0].Address().Bytes())

for i, ch := range chunks {
stampHash, err := ch.Stamp().Hash()
if err != nil {
t.Fatal(err)
}
checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch.Stamp().BatchID(), Address: ch.Address(), StampHash: stampHash}, false)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: uint64(i + 1), StampHash: stampHash}, false)
checkChunk(t, ts, ch, false)
}

_, err = r.EvictBatchBin(context.Background(), batch.ID, 1, swarm.MaxBins)
if err != nil {
t.Fatal(err)
}
checkChunk(t, ts, chunks[9], false) // chunk should still persist, eg refCnt > 0

evicted, err := r.EvictBatchBin(context.Background(), batch.ID, 10, swarm.MaxBins)
if err != nil {
t.Fatal(err)
}
if evicted != 9 {
t.Fatalf("wanted evicted count 10, got %d", evicted)
}

for i, ch := range chunks {
stampHash, err := ch.Stamp().Hash()
if err != nil {
t.Fatal(err)
}
checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch.Stamp().BatchID(), Address: ch.Address(), StampHash: stampHash}, true)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: uint64(i + 1), StampHash: stampHash}, true)
checkChunk(t, ts, ch, true)
}
}

func TestEvictMaxCount(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 2 additions & 2 deletions pkg/storer/internal/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,11 @@ func (c *chunkStoreTrx) Iterate(ctx context.Context, fn storage.IterateChunkFn)
return chunkstore.Iterate(ctx, c.indexStore, c.sharkyTrx, fn)
}

func (c *chunkStoreTrx) Replace(ctx context.Context, ch swarm.Chunk) (err error) {
func (c *chunkStoreTrx) Replace(ctx context.Context, ch swarm.Chunk, emplace bool) (err error) {
defer handleMetric("chunkstore_replace", c.metrics)(&err)
unlock := c.lock(ch.Address())
defer unlock()
return chunkstore.Replace(ctx, c.indexStore, c.sharkyTrx, ch)
return chunkstore.Replace(ctx, c.indexStore, c.sharkyTrx, ch, emplace)
}

func (c *chunkStoreTrx) lock(addr swarm.Address) func() {
Expand Down

0 comments on commit 9e2d66f

Please sign in to comment.