Skip to content

Commit

Permalink
fix(gsoc): improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Nov 11, 2024
1 parent 8c416af commit 0aae083
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 120 deletions.
7 changes: 4 additions & 3 deletions pkg/api/gsoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("gsoc_subscribe").Build()

paths := struct {
Address []byte `map:"address" validate:"required"`
Address swarm.Address `map:"address,resolve" validate:"required"`
}{}

if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
response("invalid path params", logger, w)
return
Expand All @@ -43,7 +44,7 @@ func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) {
go s.gsocListeningWs(conn, paths.Address)
}

func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) {
func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address) {
defer s.wsWg.Done()

var (
Expand All @@ -56,7 +57,7 @@ func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) {
ticker.Stop()
_ = conn.Close()
}()
cleanup := s.gsoc.Subscribe([32]byte(socAddress), func(m []byte) {
cleanup := s.gsoc.Subscribe(socAddress, func(m []byte) {
select {
case dataC <- m:
case <-gone:
Expand Down
34 changes: 16 additions & 18 deletions pkg/gsoc/gsoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ import (
"github.com/ethersphere/bee/v2/pkg/swarm"
)

// Handler defines code to be executed upon reception of a GSOC sub message.
// it is used as a parameter definition.
type Handler func([]byte)

type Listener interface {
Subscribe(address [32]byte, handler Handler) (cleanup func())
Subscribe(address swarm.Address, handler Handler) (cleanup func())
Handle(c *soc.SOC)
Close() error
}

type listener struct {
handlers map[[32]byte][]*Handler
handlers map[string][]*Handler
handlersMu sync.Mutex
quit chan struct{}
logger log.Logger
Expand All @@ -29,26 +33,26 @@ type listener struct {
func New(logger log.Logger) Listener {
return &listener{
logger: logger,
handlers: make(map[[32]byte][]*Handler),
handlers: make(map[string][]*Handler),
quit: make(chan struct{}),
}
}

// Subscribe allows the definition of a Handler func on a specific GSOC address.
func (l *listener) Subscribe(address [32]byte, handler Handler) (cleanup func()) {
func (l *listener) Subscribe(address swarm.Address, handler Handler) (cleanup func()) {
l.handlersMu.Lock()
defer l.handlersMu.Unlock()

l.handlers[address] = append(l.handlers[address], &handler)
l.handlers[address.ByteString()] = append(l.handlers[address.ByteString()], &handler)

return func() {
l.handlersMu.Lock()
defer l.handlersMu.Unlock()

h := l.handlers[address]
h := l.handlers[address.ByteString()]
for i := 0; i < len(h); i++ {
if h[i] == &handler {
l.handlers[address] = append(h[:i], h[i+1:]...)
l.handlers[address.ByteString()] = append(h[:i], h[i+1:]...)
return
}
}
Expand All @@ -61,13 +65,11 @@ func (l *listener) Handle(c *soc.SOC) {
if err != nil {
return // no handler
}
h := l.getHandlers([32]byte(addr.Bytes()))
h := l.getHandlers(addr)
if h == nil {
return // no handler
}
l.logger.Info("new incoming GSOC message",
"GSOC Address", addr,
"wrapped chunk address", c.WrappedChunk().Address())
l.logger.Debug("new incoming GSOC message", "GSOC Address", addr, "wrapped chunk address", c.WrappedChunk().Address())

for _, hh := range h {
go func(hh Handler) {
Expand All @@ -76,23 +78,19 @@ func (l *listener) Handle(c *soc.SOC) {
}
}

func (p *listener) getHandlers(address [32]byte) []*Handler {
func (p *listener) getHandlers(address swarm.Address) []*Handler {
p.handlersMu.Lock()
defer p.handlersMu.Unlock()

return p.handlers[address]
return p.handlers[address.ByteString()]
}

func (l *listener) Close() error {
close(l.quit)
l.handlersMu.Lock()
defer l.handlersMu.Unlock()

l.handlers = make(map[[32]byte][]*Handler) //unset handlers on shutdown
l.handlers = make(map[string][]*Handler) //unset handlers on shutdown

return nil
}

// Handler defines code to be executed upon reception of a GSOC sub message.
// it is used as a parameter definition.
type Handler func([]byte)
8 changes: 4 additions & 4 deletions pkg/gsoc/gsoc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestRegister(t *testing.T) {
t.Parallel()

var (
g = gsoc.New(log.NewLogger("test"))
g = gsoc.New(log.Noop)
h1Calls = 0
h2Calls = 0
h3Calls = 0
Expand Down Expand Up @@ -52,8 +52,8 @@ func TestRegister(t *testing.T) {
msgChan <- struct{}{}
}
)
_ = g.Subscribe([32]byte(address1.Bytes()), h1)
_ = g.Subscribe([32]byte(address2.Bytes()), h2)
_ = g.Subscribe(address1, h1)
_ = g.Subscribe(address2, h2)

ch1, _ := cac.New(payload1)
socCh1 := soc.New(socId1, ch1)
Expand All @@ -74,7 +74,7 @@ func TestRegister(t *testing.T) {
ensureCalls(t, &h2Calls, 0)

// register another handler on the first address
cleanup := g.Subscribe([32]byte(address1.Bytes()), h3)
cleanup := g.Subscribe(address1, h3)

g.Handle(socCh1)

Expand Down
27 changes: 3 additions & 24 deletions pkg/storer/internal/chunkstore/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,8 @@ func Has(_ context.Context, r storage.Reader, addr swarm.Address) (bool, error)

func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error {
var (
rIdx = &RetrievalIndexItem{Address: ch.Address()}
loc sharky.Location
inserted bool
rIdx = &RetrievalIndexItem{Address: ch.Address()}
loc sharky.Location
)
err := s.Get(rIdx)
switch {
Expand All @@ -86,31 +85,11 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.
}
rIdx.Location = loc
rIdx.Timestamp = uint64(time.Now().Unix())
inserted = true
case err != nil:
return fmt.Errorf("chunk store: failed to read: %w", err)
}

// SOC will be replaced in the chunk store if it is already stored with the newer payload.
// Pull sync should sync the new SOC payload with the new stamp.
// TODO: remove this condition when postage stamping is refactored for GSOC.
chunkType := storage.ChunkType(ch)
if !inserted && chunkType == swarm.ChunkTypeSingleOwner {
// replace old payload
err = sh.Release(ctx, rIdx.Location)
if err != nil {
return fmt.Errorf("chunkstore: failed to release sharky location: %w", err)
}

loc, err := sh.Write(ctx, ch.Data())
if err != nil {
return fmt.Errorf("chunk store: write to sharky failed: %w", err)
}
rIdx.Location = loc
rIdx.Timestamp = uint64(time.Now().Unix())
} else {
rIdx.RefCnt++
}
rIdx.RefCnt++

return s.Put(rIdx)
}
Expand Down
61 changes: 0 additions & 61 deletions pkg/storer/internal/chunkstore/chunkstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package chunkstore_test

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -14,9 +13,7 @@ import (
"os"
"testing"

"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/sharky"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/storer/internal/transaction"

"github.com/ethersphere/bee/v2/pkg/storage"
Expand Down Expand Up @@ -339,64 +336,6 @@ func TestChunkStore(t *testing.T) {
}
})

// TODO: remove this when postage stamping is refactored for GSOC.
t.Run("put two SOCs with different payloads", func(t *testing.T) {
key, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(key)

// chunk data to upload
chunk1 := chunktest.FixtureChunk("7000")
chunk2 := chunktest.FixtureChunk("0033")
id := make([]byte, swarm.HashSize)
s1 := soc.New(id, chunk1)
s2 := soc.New(id, chunk2)
sch1, err := s1.Sign(signer)
if err != nil {
t.Fatal(err)
}
sch1 = sch1.WithStamp(chunk1.Stamp())
sch2, err := s2.Sign(signer)
if err != nil {
t.Fatal(err)
}
sch2 = sch2.WithStamp(chunk2.Stamp())

// Put the first SOC into the chunk store
err = st.Run(context.Background(), func(s transaction.Store) error {
return s.ChunkStore().Put(context.TODO(), sch1)
})
if err != nil {
t.Fatalf("failed putting first single owner chunk: %v", err)
}

// Put the second SOC into the chunk store
err = st.Run(context.Background(), func(s transaction.Store) error {
return s.ChunkStore().Put(context.TODO(), sch2)
})
if err != nil {
t.Fatalf("failed putting second single owner chunk: %v", err)
}

// Retrieve the chunk from the chunk store
var retrievedChunk swarm.Chunk
err = st.Run(context.Background(), func(s transaction.Store) error {
retrievedChunk, err = s.ChunkStore().Get(context.TODO(), sch1.Address())
return err
})
if err != nil {
t.Fatalf("failed retrieving chunk: %v", err)
}
schRetrieved, err := soc.FromChunk(retrievedChunk)
if err != nil {
t.Fatalf("failed converting chunk to SOC: %v", err)
}

// Verify that the retrieved chunk contains the latest payload
if !bytes.Equal(chunk2.Data(), schRetrieved.WrappedChunk().Data()) {
t.Fatalf("expected payload %s, got %s", chunk2.Data(), schRetrieved.WrappedChunk().Data())
}
})

t.Run("close store", func(t *testing.T) {
err := st.Close()
if err != nil {
Expand Down
36 changes: 26 additions & 10 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,25 +129,27 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
var shouldIncReserveSize bool

err = r.st.Run(ctx, func(s transaction.Store) error {
oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveScope, chunk)
if err != nil {
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
}

sameAddressOldStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveScope, chunk.Address(), chunk.Stamp().BatchID())
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return err
}

var sameAddressSoc = false

// same chunk address, same batch
if sameAddressOldStamp != nil {
sameAddressOldStampIndex, err := stampindex.Load(s.IndexStore(), reserveScope, sameAddressOldStamp)
if err != nil {
return err

if chunkType == swarm.ChunkTypeSingleOwner {
sameAddressSoc = true
}

// same index
if bytes.Equal(sameAddressOldStamp.Index(), chunk.Stamp().Index()) {
// index collision
if bytes.Equal(chunk.Stamp().Index(), sameAddressOldStamp.Index()) {
sameAddressOldStampIndex, err := stampindex.Load(s.IndexStore(), reserveScope, sameAddressOldStamp)
if err != nil {
return err
}
prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
Expand Down Expand Up @@ -214,6 +216,11 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
}
}

oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveScope, chunk)
if err != nil {
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
}

// different address, same batch, index collision
if loadedStampIndex && !chunk.Address().Equal(oldStampIndex.ChunkAddress) {
prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp)
Expand Down Expand Up @@ -269,12 +276,21 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
ChunkType: chunkType,
StampHash: stampHash,
}),
s.ChunkStore().Put(ctx, chunk),
)
if err != nil {
return err
}

if sameAddressSoc {
if err := s.ChunkStore().Replace(ctx, chunk); err != nil {
return err
}
} else {
if err := s.ChunkStore().Put(ctx, chunk); err != nil {
return err
}
}

if !loadedStampIndex {
shouldIncReserveSize = true
}
Expand Down

0 comments on commit 0aae083

Please sign in to comment.