Skip to content

Commit

Permalink
fix: chunkstamp load with hash, replace to increase refCnt
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Nov 11, 2024
1 parent a21c988 commit f2056a4
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 66 deletions.
2 changes: 1 addition & 1 deletion pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,7 @@ func (c *chunkStore) Put(_ context.Context, ch swarm.Chunk) error {
return nil
}

func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk) error {
func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk, emplace bool) error {
c.mu.Lock()
defer c.mu.Unlock()
c.chunks[ch.Address().ByteString()] = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp())
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Hasser interface {
// Replacer is the interface that wraps the basic Replace method.
type Replacer interface {
// Replace a chunk in the store.
Replace(context.Context, swarm.Chunk) error
Replace(context.Context, swarm.Chunk, bool) error
}

// PutterFunc type is an adapter to allow the use of
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/inmemchunkstore/inmemchunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,17 @@ func (c *ChunkStore) Delete(_ context.Context, addr swarm.Address) error {
return nil
}

func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk) error {
func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk, emplace bool) error {
c.mu.Lock()
defer c.mu.Unlock()

chunkCount := c.chunks[ch.Address().ByteString()]
chunkCount.chunk = ch
if emplace {
chunkCount.count++
}
c.chunks[ch.Address().ByteString()] = chunkCount

return nil
}

Expand Down
38 changes: 38 additions & 0 deletions pkg/storer/internal/chunkstamp/chunkstamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,44 @@ func LoadWithBatchID(s storage.Reader, scope string, addr swarm.Address, batchID
return stamp, nil
}

// LoadWithBatchID returns swarm.Stamp related to the given address and batchID.
func LoadWithStampHash(s storage.Reader, scope string, addr swarm.Address, hash []byte) (swarm.Stamp, error) {
var stamp swarm.Stamp

found := false
err := s.Iterate(
storage.Query{
Factory: func() storage.Item {
return &Item{
scope: []byte(scope),
address: addr,
}
},
},
func(res storage.Result) (bool, error) {
item := res.Entry.(*Item)
h, err := item.stamp.Hash()
if err != nil {
return false, err
}
if bytes.Equal(hash, h) {
stamp = item.stamp
found = true
return true, nil
}
return false, nil
},
)
if err != nil {
return nil, err
}
if !found {
return nil, fmt.Errorf("stamp not found for hash %x: %w", hash, storage.ErrNotFound)
}

return stamp, nil
}

// Store creates new or updated an existing stamp index
// record related to the given scope and chunk.
func Store(s storage.IndexStore, scope string, chunk swarm.Chunk) error {
Expand Down
16 changes: 15 additions & 1 deletion pkg/storer/internal/chunkstamp/chunkstamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestStoreLoadDelete(t *testing.T) {
}
})

t.Run("load stored chunk stamp with batch id", func(t *testing.T) {
t.Run("load stored chunk stamp with batch id and hash", func(t *testing.T) {
want := chunk.Stamp()

have, err := chunkstamp.LoadWithBatchID(ts.IndexStore(), ns, chunk.Address(), chunk.Stamp().BatchID())
Expand All @@ -196,6 +196,20 @@ func TestStoreLoadDelete(t *testing.T) {
if diff := cmp.Diff(want, have, cmp.AllowUnexported(postage.Stamp{})); diff != "" {
t.Fatalf("LoadWithBatchID(...): mismatch (-want +have):\n%s", diff)
}

h, err := want.Hash()
if err != nil {
t.Fatal(err)
}

have, err = chunkstamp.LoadWithStampHash(ts.IndexStore(), ns, chunk.Address(), h)
if err != nil {
t.Fatalf("LoadWithBatchID(...): unexpected error: %v", err)
}

if diff := cmp.Diff(want, have, cmp.AllowUnexported(postage.Stamp{})); diff != "" {
t.Fatalf("LoadWithBatchID(...): mismatch (-want +have):\n%s", diff)
}
})

t.Run("delete stored stamp", func(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/storer/internal/chunkstore/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.
return s.Put(rIdx)
}

func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error {
func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk, emplace bool) error {
rIdx := &RetrievalIndexItem{Address: ch.Address()}
err := s.Get(rIdx)
if err != nil {
Expand All @@ -112,6 +112,9 @@ func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch sw
}
rIdx.Location = loc
rIdx.Timestamp = uint64(time.Now().Unix())
if emplace {
rIdx.RefCnt++
}
return s.Put(rIdx)
}

Expand Down
97 changes: 44 additions & 53 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package reserve

import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
Expand Down Expand Up @@ -130,37 +129,40 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {

err = r.st.Run(ctx, func(s transaction.Store) error {

sameAddressOldStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveScope, chunk.Address(), chunk.Stamp().BatchID())
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return err
oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveScope, chunk)
if err != nil {
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
}

var sameAddressSoc = false
// index collision
if loadedStampIndex {

// same chunk address, same batch
if sameAddressOldStamp != nil {

if chunkType == swarm.ChunkTypeSingleOwner {
sameAddressSoc = true
prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}

// index collision
if bytes.Equal(chunk.Stamp().Index(), sameAddressOldStamp.Index()) {
sameAddressOldStampIndex, err := stampindex.Load(s.IndexStore(), reserveScope, sameAddressOldStamp)
r.logger.Debug(
"replacing chunk stamp index",
"old_chunk", oldStampIndex.ChunkAddress,
"new_chunk", chunk.Address(),
"batch_id", hex.EncodeToString(chunk.Stamp().BatchID()),
)

// same chunk address
if oldStampIndex.ChunkAddress.Equal(chunk.Address()) {

oldStamp, err := chunkstamp.LoadWithStampHash(s.IndexStore(), reserveScope, oldStampIndex.ChunkAddress, oldStampIndex.StampHash)
if err != nil {
return err
}
prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}

oldBatchRadiusItem := &BatchRadiusItem{
Bin: bin,
Address: chunk.Address(),
BatchID: sameAddressOldStampIndex.BatchID,
StampHash: sameAddressOldStampIndex.StampHash,
Address: oldStampIndex.ChunkAddress,
BatchID: oldStampIndex.BatchID,
StampHash: oldStampIndex.StampHash,
}
// load item to get the binID
err = s.IndexStore().Get(oldBatchRadiusItem)
Expand All @@ -172,8 +174,8 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
err = errors.Join(
s.IndexStore().Delete(oldBatchRadiusItem),
s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}),
stampindex.Delete(s.IndexStore(), reserveScope, sameAddressOldStamp),
chunkstamp.DeleteWithStamp(s.IndexStore(), reserveScope, oldBatchRadiusItem.Address, sameAddressOldStamp),
stampindex.Delete(s.IndexStore(), reserveScope, oldStamp),
chunkstamp.DeleteWithStamp(s.IndexStore(), reserveScope, oldBatchRadiusItem.Address, oldStamp),
)
if err != nil {
return err
Expand Down Expand Up @@ -207,28 +209,15 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
return err
}

if chunkType != swarm.ChunkTypeSingleOwner {
return nil
if chunkType == swarm.ChunkTypeSingleOwner {
r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address())
return s.ChunkStore().Replace(ctx, chunk, false)
}

r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address())
return s.ChunkStore().Replace(ctx, chunk)
return nil
}
}

oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveScope, chunk)
if err != nil {
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
}

// different address, same batch, index collision
if loadedStampIndex && !chunk.Address().Equal(oldStampIndex.ChunkAddress) {
prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}
// An older (same or different) chunk with the same batchID and stamp index has been previously
// An older and different chunk with the same batchID and stamp index has been previously
// saved to the reserve. We must do the below before saving the new chunk:
// 1. Delete the old chunk from the chunkstore.
// 2. Delete the old chunk's stamp data.
Expand All @@ -240,13 +229,6 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
return fmt.Errorf("failed removing older chunk %s: %w", oldStampIndex.ChunkAddress, err)
}

r.logger.Warning(
"replacing chunk stamp index",
"old_chunk", oldStampIndex.ChunkAddress,
"new_chunk", chunk.Address(),
"batch_id", hex.EncodeToString(chunk.Stamp().BatchID()),
)

// replace old stamp index.
err = stampindex.Store(s.IndexStore(), reserveScope, chunk)
if err != nil {
Expand Down Expand Up @@ -281,8 +263,17 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
return err
}

if sameAddressSoc {
err = s.ChunkStore().Replace(ctx, chunk)
var has bool
if chunkType == swarm.ChunkTypeSingleOwner {
has, err = s.ChunkStore().Has(ctx, chunk.Address())
if err != nil {
return err
}
if has {
err = s.ChunkStore().Replace(ctx, chunk, true)
} else {
err = s.ChunkStore().Put(ctx, chunk)
}
} else {
err = s.ChunkStore().Put(ctx, chunk)
}
Expand Down Expand Up @@ -321,7 +312,7 @@ func (r *Reserve) Get(ctx context.Context, addr swarm.Address, batchID []byte, s
return nil, err
}

stamp, err := chunkstamp.LoadWithBatchID(r.st.IndexStore(), reserveScope, addr, item.BatchID)
stamp, err := chunkstamp.LoadWithStampHash(r.st.IndexStore(), reserveScope, addr, stampHash)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -423,7 +414,7 @@ func RemoveChunkWithItem(
) error {
var errs error

stamp, _ := chunkstamp.LoadWithBatchID(trx.IndexStore(), reserveScope, item.Address, item.BatchID)
stamp, _ := chunkstamp.LoadWithStampHash(trx.IndexStore(), reserveScope, item.Address, item.StampHash)
if stamp != nil {
errs = errors.Join(
stampindex.Delete(trx.IndexStore(), reserveScope, stamp),
Expand Down Expand Up @@ -473,7 +464,7 @@ func (r *Reserve) IterateChunks(startBin uint8, cb func(swarm.Chunk) (bool, erro
return false, err
}

stamp, err := chunkstamp.LoadWithBatchID(r.st.IndexStore(), reserveScope, item.Address, item.BatchID)
stamp, err := chunkstamp.LoadWithStampHash(r.st.IndexStore(), reserveScope, item.Address, item.StampHash)
if err != nil {
return false, err
}
Expand Down
Loading

0 comments on commit f2056a4

Please sign in to comment.