From 0aae08323882bff12e4d3b73fbd62452b41b279d Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 11 Nov 2024 16:54:38 +0300 Subject: [PATCH] fix(gsoc): improvements --- pkg/api/gsoc.go | 7 ++- pkg/gsoc/gsoc.go | 34 +++++------ pkg/gsoc/gsoc_test.go | 8 +-- pkg/storer/internal/chunkstore/chunkstore.go | 27 +------- .../internal/chunkstore/chunkstore_test.go | 61 ------------------- pkg/storer/internal/reserve/reserve.go | 36 ++++++++--- 6 files changed, 53 insertions(+), 120 deletions(-) diff --git a/pkg/api/gsoc.go b/pkg/api/gsoc.go index ea9aad5271e..60d048ffdc0 100644 --- a/pkg/api/gsoc.go +++ b/pkg/api/gsoc.go @@ -18,8 +18,9 @@ func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) { logger := s.logger.WithName("gsoc_subscribe").Build() paths := struct { - Address []byte `map:"address" validate:"required"` + Address swarm.Address `map:"address,resolve" validate:"required"` }{} + if response := s.mapStructure(mux.Vars(r), &paths); response != nil { response("invalid path params", logger, w) return @@ -43,7 +44,7 @@ func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) { go s.gsocListeningWs(conn, paths.Address) } -func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) { +func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address) { defer s.wsWg.Done() var ( @@ -56,7 +57,7 @@ func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) { ticker.Stop() _ = conn.Close() }() - cleanup := s.gsoc.Subscribe([32]byte(socAddress), func(m []byte) { + cleanup := s.gsoc.Subscribe(socAddress, func(m []byte) { select { case dataC <- m: case <-gone: diff --git a/pkg/gsoc/gsoc.go b/pkg/gsoc/gsoc.go index 91d5dcf925d..6497c9b0c63 100644 --- a/pkg/gsoc/gsoc.go +++ b/pkg/gsoc/gsoc.go @@ -12,14 +12,18 @@ import ( "github.com/ethersphere/bee/v2/pkg/swarm" ) +// Handler defines code to be executed upon reception of a GSOC sub message. +// it is used as a parameter definition. +type Handler func([]byte) + type Listener interface { - Subscribe(address [32]byte, handler Handler) (cleanup func()) + Subscribe(address swarm.Address, handler Handler) (cleanup func()) Handle(c *soc.SOC) Close() error } type listener struct { - handlers map[[32]byte][]*Handler + handlers map[string][]*Handler handlersMu sync.Mutex quit chan struct{} logger log.Logger @@ -29,26 +33,26 @@ type listener struct { func New(logger log.Logger) Listener { return &listener{ logger: logger, - handlers: make(map[[32]byte][]*Handler), + handlers: make(map[string][]*Handler), quit: make(chan struct{}), } } // Subscribe allows the definition of a Handler func on a specific GSOC address. -func (l *listener) Subscribe(address [32]byte, handler Handler) (cleanup func()) { +func (l *listener) Subscribe(address swarm.Address, handler Handler) (cleanup func()) { l.handlersMu.Lock() defer l.handlersMu.Unlock() - l.handlers[address] = append(l.handlers[address], &handler) + l.handlers[address.ByteString()] = append(l.handlers[address.ByteString()], &handler) return func() { l.handlersMu.Lock() defer l.handlersMu.Unlock() - h := l.handlers[address] + h := l.handlers[address.ByteString()] for i := 0; i < len(h); i++ { if h[i] == &handler { - l.handlers[address] = append(h[:i], h[i+1:]...) + l.handlers[address.ByteString()] = append(h[:i], h[i+1:]...) return } } @@ -61,13 +65,11 @@ func (l *listener) Handle(c *soc.SOC) { if err != nil { return // no handler } - h := l.getHandlers([32]byte(addr.Bytes())) + h := l.getHandlers(addr) if h == nil { return // no handler } - l.logger.Info("new incoming GSOC message", - "GSOC Address", addr, - "wrapped chunk address", c.WrappedChunk().Address()) + l.logger.Debug("new incoming GSOC message", "GSOC Address", addr, "wrapped chunk address", c.WrappedChunk().Address()) for _, hh := range h { go func(hh Handler) { @@ -76,11 +78,11 @@ func (l *listener) Handle(c *soc.SOC) { } } -func (p *listener) getHandlers(address [32]byte) []*Handler { +func (p *listener) getHandlers(address swarm.Address) []*Handler { p.handlersMu.Lock() defer p.handlersMu.Unlock() - return p.handlers[address] + return p.handlers[address.ByteString()] } func (l *listener) Close() error { @@ -88,11 +90,7 @@ func (l *listener) Close() error { l.handlersMu.Lock() defer l.handlersMu.Unlock() - l.handlers = make(map[[32]byte][]*Handler) //unset handlers on shutdown + l.handlers = make(map[string][]*Handler) //unset handlers on shutdown return nil } - -// Handler defines code to be executed upon reception of a GSOC sub message. -// it is used as a parameter definition. -type Handler func([]byte) diff --git a/pkg/gsoc/gsoc_test.go b/pkg/gsoc/gsoc_test.go index 989225df068..dfbe8e03a5c 100644 --- a/pkg/gsoc/gsoc_test.go +++ b/pkg/gsoc/gsoc_test.go @@ -21,7 +21,7 @@ func TestRegister(t *testing.T) { t.Parallel() var ( - g = gsoc.New(log.NewLogger("test")) + g = gsoc.New(log.Noop) h1Calls = 0 h2Calls = 0 h3Calls = 0 @@ -52,8 +52,8 @@ func TestRegister(t *testing.T) { msgChan <- struct{}{} } ) - _ = g.Subscribe([32]byte(address1.Bytes()), h1) - _ = g.Subscribe([32]byte(address2.Bytes()), h2) + _ = g.Subscribe(address1, h1) + _ = g.Subscribe(address2, h2) ch1, _ := cac.New(payload1) socCh1 := soc.New(socId1, ch1) @@ -74,7 +74,7 @@ func TestRegister(t *testing.T) { ensureCalls(t, &h2Calls, 0) // register another handler on the first address - cleanup := g.Subscribe([32]byte(address1.Bytes()), h3) + cleanup := g.Subscribe(address1, h3) g.Handle(socCh1) diff --git a/pkg/storer/internal/chunkstore/chunkstore.go b/pkg/storer/internal/chunkstore/chunkstore.go index f17eed79af3..67fee1e6d77 100644 --- a/pkg/storer/internal/chunkstore/chunkstore.go +++ b/pkg/storer/internal/chunkstore/chunkstore.go @@ -71,9 +71,8 @@ func Has(_ context.Context, r storage.Reader, addr swarm.Address) (bool, error) func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error { var ( - rIdx = &RetrievalIndexItem{Address: ch.Address()} - loc sharky.Location - inserted bool + rIdx = &RetrievalIndexItem{Address: ch.Address()} + loc sharky.Location ) err := s.Get(rIdx) switch { @@ -86,31 +85,11 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm. } rIdx.Location = loc rIdx.Timestamp = uint64(time.Now().Unix()) - inserted = true case err != nil: return fmt.Errorf("chunk store: failed to read: %w", err) } - // SOC will be replaced in the chunk store if it is already stored with the newer payload. - // Pull sync should sync the new SOC payload with the new stamp. - // TODO: remove this condition when postage stamping is refactored for GSOC. - chunkType := storage.ChunkType(ch) - if !inserted && chunkType == swarm.ChunkTypeSingleOwner { - // replace old payload - err = sh.Release(ctx, rIdx.Location) - if err != nil { - return fmt.Errorf("chunkstore: failed to release sharky location: %w", err) - } - - loc, err := sh.Write(ctx, ch.Data()) - if err != nil { - return fmt.Errorf("chunk store: write to sharky failed: %w", err) - } - rIdx.Location = loc - rIdx.Timestamp = uint64(time.Now().Unix()) - } else { - rIdx.RefCnt++ - } + rIdx.RefCnt++ return s.Put(rIdx) } diff --git a/pkg/storer/internal/chunkstore/chunkstore_test.go b/pkg/storer/internal/chunkstore/chunkstore_test.go index c86fbbb6b91..9e30c1af876 100644 --- a/pkg/storer/internal/chunkstore/chunkstore_test.go +++ b/pkg/storer/internal/chunkstore/chunkstore_test.go @@ -5,7 +5,6 @@ package chunkstore_test import ( - "bytes" "context" "errors" "fmt" @@ -14,9 +13,7 @@ import ( "os" "testing" - "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/sharky" - "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "github.com/ethersphere/bee/v2/pkg/storage" @@ -339,64 +336,6 @@ func TestChunkStore(t *testing.T) { } }) - // TODO: remove this when postage stamping is refactored for GSOC. - t.Run("put two SOCs with different payloads", func(t *testing.T) { - key, _ := crypto.GenerateSecp256k1Key() - signer := crypto.NewDefaultSigner(key) - - // chunk data to upload - chunk1 := chunktest.FixtureChunk("7000") - chunk2 := chunktest.FixtureChunk("0033") - id := make([]byte, swarm.HashSize) - s1 := soc.New(id, chunk1) - s2 := soc.New(id, chunk2) - sch1, err := s1.Sign(signer) - if err != nil { - t.Fatal(err) - } - sch1 = sch1.WithStamp(chunk1.Stamp()) - sch2, err := s2.Sign(signer) - if err != nil { - t.Fatal(err) - } - sch2 = sch2.WithStamp(chunk2.Stamp()) - - // Put the first SOC into the chunk store - err = st.Run(context.Background(), func(s transaction.Store) error { - return s.ChunkStore().Put(context.TODO(), sch1) - }) - if err != nil { - t.Fatalf("failed putting first single owner chunk: %v", err) - } - - // Put the second SOC into the chunk store - err = st.Run(context.Background(), func(s transaction.Store) error { - return s.ChunkStore().Put(context.TODO(), sch2) - }) - if err != nil { - t.Fatalf("failed putting second single owner chunk: %v", err) - } - - // Retrieve the chunk from the chunk store - var retrievedChunk swarm.Chunk - err = st.Run(context.Background(), func(s transaction.Store) error { - retrievedChunk, err = s.ChunkStore().Get(context.TODO(), sch1.Address()) - return err - }) - if err != nil { - t.Fatalf("failed retrieving chunk: %v", err) - } - schRetrieved, err := soc.FromChunk(retrievedChunk) - if err != nil { - t.Fatalf("failed converting chunk to SOC: %v", err) - } - - // Verify that the retrieved chunk contains the latest payload - if !bytes.Equal(chunk2.Data(), schRetrieved.WrappedChunk().Data()) { - t.Fatalf("expected payload %s, got %s", chunk2.Data(), schRetrieved.WrappedChunk().Data()) - } - }) - t.Run("close store", func(t *testing.T) { err := st.Close() if err != nil { diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 853454034d8..fc97615558e 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -129,25 +129,27 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { var shouldIncReserveSize bool err = r.st.Run(ctx, func(s transaction.Store) error { - oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveScope, chunk) - if err != nil { - return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) - } sameAddressOldStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveScope, chunk.Address(), chunk.Stamp().BatchID()) if err != nil && !errors.Is(err, storage.ErrNotFound) { return err } + var sameAddressSoc = false + // same chunk address, same batch if sameAddressOldStamp != nil { - sameAddressOldStampIndex, err := stampindex.Load(s.IndexStore(), reserveScope, sameAddressOldStamp) - if err != nil { - return err + + if chunkType == swarm.ChunkTypeSingleOwner { + sameAddressSoc = true } - // same index - if bytes.Equal(sameAddressOldStamp.Index(), chunk.Stamp().Index()) { + // index collision + if bytes.Equal(chunk.Stamp().Index(), sameAddressOldStamp.Index()) { + sameAddressOldStampIndex, err := stampindex.Load(s.IndexStore(), reserveScope, sameAddressOldStamp) + if err != nil { + return err + } prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp) curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) if prev >= curr { @@ -214,6 +216,11 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { } } + oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveScope, chunk) + if err != nil { + return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) + } + // different address, same batch, index collision if loadedStampIndex && !chunk.Address().Equal(oldStampIndex.ChunkAddress) { prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp) @@ -269,12 +276,21 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { ChunkType: chunkType, StampHash: stampHash, }), - s.ChunkStore().Put(ctx, chunk), ) if err != nil { return err } + if sameAddressSoc { + if err := s.ChunkStore().Replace(ctx, chunk); err != nil { + return err + } + } else { + if err := s.ChunkStore().Put(ctx, chunk); err != nil { + return err + } + } + if !loadedStampIndex { shouldIncReserveSize = true }