Skip to content

Commit

Permalink
load remote only with peer
Browse files Browse the repository at this point in the history
  • Loading branch information
cheggaaa committed Nov 13, 2023
1 parent c69946f commit 9a37dc8
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 22 deletions.
18 changes: 9 additions & 9 deletions commonspace/object/tree/synctree/treeremotegetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"errors"

"github.com/gogo/protobuf/proto"
"go.uber.org/zap"

"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/net/peer"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
)

var (
Expand All @@ -22,18 +23,17 @@ type treeRemoteGetter struct {
treeId string
}

func newRemoteGetter(treeId string, deps BuildDeps) treeRemoteGetter {
return treeRemoteGetter{treeId: treeId, deps: deps}
}

func (t treeRemoteGetter) getPeers(ctx context.Context) (peerIds []string, err error) {
peerId, err := peer.CtxPeerId(ctx)
if err == nil {
if err != nil {
return nil, err
}
if peerId != peer.CtxResponsiblePeers {
peerIds = []string{peerId}
return
}
err = nil
log.WarnCtx(ctx, "peer not found in context, use responsible")

log.InfoCtx(ctx, "use responsible peers")
respPeers, err := t.deps.PeerGetter.GetResponsiblePeers(ctx)
if err != nil {
return
Expand Down
23 changes: 13 additions & 10 deletions commonspace/object/tree/synctree/treeremotegetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"fmt"
"testing"

"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/peermanager/mock_peermanager"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/peer/mock_peer"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

type treeRemoteGetterFixture struct {
Expand Down Expand Up @@ -59,29 +60,31 @@ func TestTreeRemoteGetter(t *testing.T) {
Payload: marshalled,
}

t.Run("request works", func(t *testing.T) {
t.Run("responsible peers", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)

tCtx := peer.CtxWithPeerId(ctx, "*")
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Return([]peer.Peer{mockPeer}, nil)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(tCtx).Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil)
resp, err := fx.treeGetter.treeRequestLoop(ctx)
fx.syncClientMock.EXPECT().SendRequest(tCtx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil)
resp, err := fx.treeGetter.treeRequestLoop(tCtx)
require.NoError(t, err)
require.Equal(t, "id", resp.RootChange.Id)
})

t.Run("request fails", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
tCtx := peer.CtxWithPeerId(ctx, peerId)
treeRequest := &treechangeproto.TreeSyncMessage{}
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).AnyTimes().Return(nil, fmt.Errorf("some"))
_, err := fx.treeGetter.treeRequestLoop(ctx)
fx.syncClientMock.EXPECT().SendRequest(tCtx, peerId, fx.treeGetter.treeId, treeRequest).AnyTimes().Return(nil, fmt.Errorf("some"))
_, err := fx.treeGetter.treeRequestLoop(tCtx)
require.Error(t, err)
})
}
4 changes: 2 additions & 2 deletions commonspace/spaceservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,13 @@ func (s *spaceService) getSpaceStorageFromRemote(ctx context.Context, id string)
return
}

sm, err := s.peerManagerProvider.NewPeerManager(ctx, id)
pm, err := s.peerManagerProvider.NewPeerManager(ctx, id)
if err != nil {
return nil, err
}
var peers []peer.Peer
for {
peers, err = sm.GetResponsiblePeers(ctx)
peers, err = pm.GetResponsiblePeers(ctx)
if err != nil && !errors.Is(err, net.ErrUnableToConnect) {
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion net/peer/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package peer
import (
"context"
"errors"
"github.com/anyproto/any-sync/util/crypto"

"github.com/libp2p/go-libp2p/core/sec"
"storj.io/drpc/drpcctx"

"github.com/anyproto/any-sync/util/crypto"
)

type contextKey uint
Expand All @@ -22,6 +24,8 @@ var (
ErrIdentityNotFoundInContext = errors.New("identity not found in context")
)

const CtxResponsiblePeers = "*"

// CtxPeerId first tries to get peer id under our own key, but if it is not found tries to get through DRPC key
func CtxPeerId(ctx context.Context) (string, error) {
if peerId, ok := ctx.Value(contextKeyPeerId).(string); ok {
Expand Down

0 comments on commit 9a37dc8

Please sign in to comment.