Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: propagate stream error to origin node for pushsync and retrieval protocols #4321

Merged
merged 10 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package p2p
import (
"context"
"errors"
"fmt"
"io"
"time"

Expand Down Expand Up @@ -222,3 +223,17 @@ const (
func NewSwarmStreamName(protocol, version, stream string) string {
return "/swarm/" + protocol + "/" + version + "/" + stream
}

type ChunkDeliveryError struct {
msg string
}

// Error implements the error interface.
func (e *ChunkDeliveryError) Error() string {
return fmt.Sprintf("delivery of chunk failed: %s", e.msg)
}

// NewChunkDeliveryError is a convenience constructor for ChunkDeliveryError.
func NewChunkDeliveryError(msg string) error {
return &ChunkDeliveryError{msg: msg}
}
64 changes: 58 additions & 6 deletions pkg/pushsync/pb/pushsync.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/pushsync/pb/pushsync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ message Receipt {
bytes Address = 1;
bytes Signature = 2;
bytes Nonce = 3;
string Err = 4;
}
12 changes: 11 additions & 1 deletion pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const loggerName = "pushsync"

const (
protocolName = "pushsync"
protocolVersion = "1.2.0"
protocolVersion = "1.3.0"
streamName = "pushsync"
)

Expand Down Expand Up @@ -157,6 +157,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
now := time.Now()

w, r := protobuf.NewWriterAndReader(stream)
var attemptedWrite bool

ctx, cancel := context.WithTimeout(ctx, defaultTTL)
defer cancel()
Expand All @@ -165,6 +166,9 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
if err != nil {
ps.metrics.TotalHandlerTime.WithLabelValues("failure").Observe(time.Since(now).Seconds())
ps.metrics.TotalHandlerErrors.Inc()
if !attemptedWrite {
_ = w.WriteMsgWithContext(ctx, &pb.Receipt{Err: err.Error()})
}
_ = stream.Reset()
} else {
ps.metrics.TotalHandlerTime.WithLabelValues("success").Observe(time.Since(now).Seconds())
Expand Down Expand Up @@ -227,6 +231,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)

receipt := pb.Receipt{Address: chunkToPut.Address().Bytes(), Signature: signature, Nonce: ps.nonce}
if err := w.WriteMsgWithContext(ctx, &receipt); err != nil {
attemptedWrite = true
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
}

Expand Down Expand Up @@ -255,6 +260,8 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
}
defer debit.Cleanup()

attemptedWrite = true

// pass back the receipt
if err := w.WriteMsgWithContext(ctx, receipt); err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
Expand Down Expand Up @@ -486,6 +493,9 @@ func (ps *PushSync) pushChunkToPeer(ctx context.Context, peer swarm.Address, ch
if err = r.ReadMsgWithContext(ctx, &rec); err != nil {
return nil, err
}
if rec.Err != "" {
return nil, p2p.NewChunkDeliveryError(rec.Err)
}

if !ch.Address().Equal(swarm.NewAddress(rec.Address)) {
return nil, fmt.Errorf("invalid receipt. chunk %s, peer %s", ch.Address(), peer)
Expand Down
46 changes: 43 additions & 3 deletions pkg/pushsync/pushsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"context"
"errors"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -430,7 +431,7 @@ func TestPushChunkToClosestErrorAttemptRetry(t *testing.T) {
}),
)

psPivot, pivotStorer := createPushSyncNodeWithAccounting(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, pivotAccounting, mock.WithPeers(peer1, peer2, peer3, peer4))
psPivot, pivotStorer := createPushSyncNodeWithAccounting(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, pivotAccounting, log.Noop, mock.WithPeers(peer1, peer2, peer3, peer4))

// Trigger the sending of chunk to the closest node
receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
Expand Down Expand Up @@ -589,6 +590,45 @@ func TestHandler(t *testing.T) {
}
}

func TestPropagateErrMsg(t *testing.T) {
t.Parallel()
// chunk data to upload
chunk := testingc.FixtureChunk("7000")

// create a pivot node and a mocked closest node
triggerPeer := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000")
pivotPeer := swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")

faultySigner := cryptomock.New(cryptomock.WithSignFunc(func([]byte) ([]byte, error) {
return nil, errors.New("simulated error")
}))

buf := new(bytes.Buffer)
captureLogger := log.NewLogger("test", log.WithSink(buf))

// Create the closest peer
psClosestPeer, _ := createPushSyncNodeWithAccounting(t, closestPeer, defaultPrices, nil, nil, faultySigner, accountingmock.NewAccounting(), log.Noop, mock.WithClosestPeerErr(topology.ErrWantSelf))

// creating the pivot peer
psPivot, _ := createPushSyncNodeWithAccounting(t, pivotPeer, defaultPrices, nil, nil, defaultSigner, accountingmock.NewAccounting(), log.Noop, mock.WithPeers(closestPeer))

combinedRecorder := streamtest.New(streamtest.WithProtocols(psPivot.Protocol(), psClosestPeer.Protocol()), streamtest.WithBaseAddr(triggerPeer))

// Creating the trigger peer
psTriggerPeer, _ := createPushSyncNodeWithAccounting(t, triggerPeer, defaultPrices, combinedRecorder, nil, defaultSigner, accountingmock.NewAccounting(), captureLogger, mock.WithPeers(pivotPeer))

_, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk)
if err == nil {
t.Fatal("should received error")
}

want := p2p.NewChunkDeliveryError("receipt signature: simulated error")
if got := buf.String(); !strings.Contains(got, want.Error()) {
t.Fatalf("got log %s, want %s", got, want)
}
}

func TestSignsReceipt(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -768,7 +808,7 @@ func createPushSyncNode(
) (*pushsync.PushSync, *testStorer, accounting.Interface) {
t.Helper()
mockAccounting := accountingmock.NewAccounting()
ps, mstorer := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, mockOpts...)
ps, mstorer := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, log.Noop, mockOpts...)
return ps, mstorer, mockAccounting
}

Expand All @@ -780,10 +820,10 @@ func createPushSyncNodeWithAccounting(
unwrap func(swarm.Chunk),
signer crypto.Signer,
acct accounting.Interface,
logger log.Logger,
mockOpts ...mock.Option,
) (*pushsync.PushSync, *testStorer) {
t.Helper()
logger := log.Noop
storer := &testStorer{
chunksPut: make(map[string]swarm.Chunk),
chunksReported: make(map[string]int),
Expand Down
66 changes: 59 additions & 7 deletions pkg/retrieval/pb/retrieval.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading