Skip to content

Commit

Permalink
feat: propagate stream error to origin node for pushsync and retrieva…
Browse files Browse the repository at this point in the history
…l protocols (#4321)
  • Loading branch information
istae authored Sep 18, 2023
1 parent cf00cf5 commit a04109f
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 27 deletions.
15 changes: 15 additions & 0 deletions pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package p2p
import (
"context"
"errors"
"fmt"
"io"
"time"

Expand Down Expand Up @@ -222,3 +223,17 @@ const (
func NewSwarmStreamName(protocol, version, stream string) string {
return "/swarm/" + protocol + "/" + version + "/" + stream
}

type ChunkDeliveryError struct {
msg string
}

// Error implements the error interface.
func (e *ChunkDeliveryError) Error() string {
return fmt.Sprintf("delivery of chunk failed: %s", e.msg)
}

// NewChunkDeliveryError is a convenience constructor for ChunkDeliveryError.
func NewChunkDeliveryError(msg string) error {
return &ChunkDeliveryError{msg: msg}
}
64 changes: 58 additions & 6 deletions pkg/pushsync/pb/pushsync.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/pushsync/pb/pushsync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ message Receipt {
bytes Address = 1;
bytes Signature = 2;
bytes Nonce = 3;
string Err = 4;
}
13 changes: 12 additions & 1 deletion pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const loggerName = "pushsync"

const (
protocolName = "pushsync"
protocolVersion = "1.2.0"
protocolVersion = "1.3.0"
streamName = "pushsync"
)

Expand Down Expand Up @@ -157,6 +157,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
now := time.Now()

w, r := protobuf.NewWriterAndReader(stream)
var attemptedWrite bool

ctx, cancel := context.WithTimeout(ctx, defaultTTL)
defer cancel()
Expand All @@ -165,6 +166,9 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
if err != nil {
ps.metrics.TotalHandlerTime.WithLabelValues("failure").Observe(time.Since(now).Seconds())
ps.metrics.TotalHandlerErrors.Inc()
if !attemptedWrite {
_ = w.WriteMsgWithContext(ctx, &pb.Receipt{Err: err.Error()})
}
_ = stream.Reset()
} else {
ps.metrics.TotalHandlerTime.WithLabelValues("success").Observe(time.Since(now).Seconds())
Expand Down Expand Up @@ -225,6 +229,8 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
}
defer debit.Cleanup()

attemptedWrite = true

receipt := pb.Receipt{Address: chunkToPut.Address().Bytes(), Signature: signature, Nonce: ps.nonce}
if err := w.WriteMsgWithContext(ctx, &receipt); err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
Expand Down Expand Up @@ -255,6 +261,8 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
}
defer debit.Cleanup()

attemptedWrite = true

// pass back the receipt
if err := w.WriteMsgWithContext(ctx, receipt); err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
Expand Down Expand Up @@ -486,6 +494,9 @@ func (ps *PushSync) pushChunkToPeer(ctx context.Context, peer swarm.Address, ch
if err = r.ReadMsgWithContext(ctx, &rec); err != nil {
return nil, err
}
if rec.Err != "" {
return nil, p2p.NewChunkDeliveryError(rec.Err)
}

if !ch.Address().Equal(swarm.NewAddress(rec.Address)) {
return nil, fmt.Errorf("invalid receipt. chunk %s, peer %s", ch.Address(), peer)
Expand Down
46 changes: 43 additions & 3 deletions pkg/pushsync/pushsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"context"
"errors"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -430,7 +431,7 @@ func TestPushChunkToClosestErrorAttemptRetry(t *testing.T) {
}),
)

psPivot, pivotStorer := createPushSyncNodeWithAccounting(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, pivotAccounting, mock.WithPeers(peer1, peer2, peer3, peer4))
psPivot, pivotStorer := createPushSyncNodeWithAccounting(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, pivotAccounting, log.Noop, mock.WithPeers(peer1, peer2, peer3, peer4))

// Trigger the sending of chunk to the closest node
receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
Expand Down Expand Up @@ -589,6 +590,45 @@ func TestHandler(t *testing.T) {
}
}

func TestPropagateErrMsg(t *testing.T) {
t.Parallel()
// chunk data to upload
chunk := testingc.FixtureChunk("7000")

// create a pivot node and a mocked closest node
triggerPeer := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000")
pivotPeer := swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")

faultySigner := cryptomock.New(cryptomock.WithSignFunc(func([]byte) ([]byte, error) {
return nil, errors.New("simulated error")
}))

buf := new(bytes.Buffer)
captureLogger := log.NewLogger("test", log.WithSink(buf))

// Create the closest peer
psClosestPeer, _ := createPushSyncNodeWithAccounting(t, closestPeer, defaultPrices, nil, nil, faultySigner, accountingmock.NewAccounting(), log.Noop, mock.WithClosestPeerErr(topology.ErrWantSelf))

// creating the pivot peer
psPivot, _ := createPushSyncNodeWithAccounting(t, pivotPeer, defaultPrices, nil, nil, defaultSigner, accountingmock.NewAccounting(), log.Noop, mock.WithPeers(closestPeer))

combinedRecorder := streamtest.New(streamtest.WithProtocols(psPivot.Protocol(), psClosestPeer.Protocol()), streamtest.WithBaseAddr(triggerPeer))

// Creating the trigger peer
psTriggerPeer, _ := createPushSyncNodeWithAccounting(t, triggerPeer, defaultPrices, combinedRecorder, nil, defaultSigner, accountingmock.NewAccounting(), captureLogger, mock.WithPeers(pivotPeer))

_, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk)
if err == nil {
t.Fatal("should received error")
}

want := p2p.NewChunkDeliveryError("receipt signature: simulated error")
if got := buf.String(); !strings.Contains(got, want.Error()) {
t.Fatalf("got log %s, want %s", got, want)
}
}

func TestSignsReceipt(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -768,7 +808,7 @@ func createPushSyncNode(
) (*pushsync.PushSync, *testStorer, accounting.Interface) {
t.Helper()
mockAccounting := accountingmock.NewAccounting()
ps, mstorer := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, mockOpts...)
ps, mstorer := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, log.Noop, mockOpts...)
return ps, mstorer, mockAccounting
}

Expand All @@ -780,10 +820,10 @@ func createPushSyncNodeWithAccounting(
unwrap func(swarm.Chunk),
signer crypto.Signer,
acct accounting.Interface,
logger log.Logger,
mockOpts ...mock.Option,
) (*pushsync.PushSync, *testStorer) {
t.Helper()
logger := log.Noop
storer := &testStorer{
chunksPut: make(map[string]swarm.Chunk),
chunksReported: make(map[string]int),
Expand Down
66 changes: 59 additions & 7 deletions pkg/retrieval/pb/retrieval.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a04109f

Please sign in to comment.