diff --git a/cmd/waku/server/rest/store.go b/cmd/waku/server/rest/store.go index 4e491368e..19f3e0489 100644 --- a/cmd/waku/server/rest/store.go +++ b/cmd/waku/server/rest/store.go @@ -14,7 +14,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" - "github.com/waku-org/go-waku/waku/v2/utils" ) type StoreService struct { @@ -56,7 +55,7 @@ func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService { return s } -func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store.HistoryRequestOption, error) { +func getStoreParams(r *http.Request) (*store.Query, []store.HistoryRequestOption, error) { query := &store.Query{} var options []store.HistoryRequestOption var err error @@ -65,15 +64,9 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store if peerAddrStr != "" { m, err = multiaddr.NewMultiaddr(peerAddrStr) if err != nil { - return nil, nil, nil, err + return nil, nil, err } - - peerID, err := utils.GetPeerID(m) - if err != nil { - return nil, nil, nil, err - } - - options = append(options, store.WithPeer(peerID)) + options = append(options, store.WithPeerAddr(m)) } query.PubsubTopic = r.URL.Query().Get("pubsubTopic") @@ -86,7 +79,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store if startTimeStr != "" { startTime, err := strconv.ParseInt(startTimeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } query.StartTime = &startTime } @@ -95,7 +88,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store if endTimeStr != "" { endTime, err := strconv.ParseInt(endTimeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } query.EndTime = &endTime } @@ -112,21 +105,21 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store if senderTimeStr != "" { cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } if storeTimeStr != "" { cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } if digestStr != "" { cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } @@ -143,21 +136,21 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store if ascendingStr != "" { ascending, err = strconv.ParseBool(ascendingStr) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } if pageSizeStr != "" { pageSize, err = strconv.ParseUint(pageSizeStr, 10, 64) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } options = append(options, store.WithPaging(ascending, pageSize)) } - return m, query, options, nil + return query, options, nil } func writeStoreError(w http.ResponseWriter, code int, err error) { @@ -191,7 +184,7 @@ func toStoreResponse(result *store.Result) StoreResponse { } func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { - peerAddr, query, options, err := getStoreParams(r) + query, options, err := getStoreParams(r) if err != nil { writeStoreError(w, http.StatusBadRequest, err) return @@ -199,9 +192,7 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() - if peerAddr != nil { - options = append(options, store.WithPeerAddr(peerAddr)) - } + result, err := d.node.Store().Query(ctx, *query, options...) if err != nil { writeStoreError(w, http.StatusInternalServerError, err) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 1a3d0515d..a9f226575 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -714,7 +714,11 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro // AddPeer is used to add a peer and the protocols it support to the node peerstore // TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics. func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { - return w.peermanager.AddPeer(address, origin, pubSubTopics, false, protocols...) + pData, err := w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...) + if err != nil { + return "", err + } + return pData.AddrInfo.ID, nil } // AddDiscoveredPeer to add a discovered peer to the node peerStore @@ -725,7 +729,7 @@ func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wp ID: ID, Addrs: addrs, }, - PubSubTopics: pubsubTopics, + PubsubTopics: pubsubTopics, } w.peermanager.AddDiscoveredPeer(p, connectNow) } diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 430a2760c..c11691297 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -321,11 +321,11 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID { logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) } else { if shards != nil { - p.PubSubTopics = make([]string, 0) + p.PubsubTopics = make([]string, 0) topics := shards.Topics() for _, topic := range topics { topicStr := topic.String() - p.PubSubTopics = append(p.PubSubTopics, topicStr) + p.PubsubTopics = append(p.PubsubTopics, topicStr) } } else { pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID)) @@ -361,12 +361,12 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { return } supportedProtos := []protocol.ID{} - if len(p.PubSubTopics) == 0 && p.ENR != nil { + if len(p.PubsubTopics) == 0 && p.ENR != nil { // Try to fetch shard info and supported protocols from ENR to arrive at pubSub topics. supportedProtos = pm.processPeerENR(&p) } - _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubSubTopics, supportedProtos...) + _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubsubTopics, supportedProtos...) if p.ENR != nil { err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) @@ -419,12 +419,29 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig return nil } +func AddrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host, pubsubTopics ...string) *service.PeerData { + addrs := host.Peerstore().Addrs(peerID) + if len(addrs) == 0 { + //Addresses expired, remove peer from peerStore + host.Peerstore().RemovePeer(peerID) + return nil + } + return &service.PeerData{ + Origin: origin, + AddrInfo: peer.AddrInfo{ + ID: peerID, + Addrs: addrs, + }, + PubsubTopics: pubsubTopics, + } +} + // AddPeer adds peer to the peerStore and also to service slots -func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, connectNow bool, protocols ...protocol.ID) (peer.ID, error) { +func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) { //Assuming all addresses have peerId info, err := peer.AddrInfoFromP2pAddr(address) if err != nil { - return "", err + return nil, err } //Add Service peers to serviceSlots. @@ -433,20 +450,26 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTo } //Add to the peer-store - err = pm.addPeer(info.ID, info.Addrs, origin, pubSubTopics, protocols...) + err = pm.addPeer(info.ID, info.Addrs, origin, pubsubTopics, protocols...) if err != nil { - return "", err + return nil, err } - if connectNow { - go pm.peerConnector.PushToChan(service.PeerData{ - Origin: origin, - AddrInfo: *info, - PubSubTopics: pubSubTopics, - }) + pData := &service.PeerData{ + Origin: origin, + AddrInfo: peer.AddrInfo{ + ID: info.ID, + Addrs: info.Addrs, + }, + PubsubTopics: pubsubTopics, } - return info.ID, nil + return pData, nil +} + +// Connect establishes a connection to a peer. +func (pm *PeerManager) Connect(pData *service.PeerData) { + go pm.peerConnector.PushToChan(*pData) } // RemovePeer deletes peer from the peerStore after disconnecting it. diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 4af828cef..5b2890930 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -71,7 +71,7 @@ func TestServiceSlots(t *testing.T) { // add h2 peer to peer manager t.Log(h2.ID()) - _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, libp2pProtocol.ID(protocol)) require.NoError(t, err) /////////////// @@ -84,7 +84,7 @@ func TestServiceSlots(t *testing.T) { require.Equal(t, peerID, h2.ID()) // add h3 peer to peer manager - _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, libp2pProtocol.ID(protocol)) require.NoError(t, err) // check that returned peer is h2 or h3 peer @@ -108,7 +108,7 @@ func TestServiceSlots(t *testing.T) { require.Error(t, err, ErrNoPeersAvailable) // add h4 peer for protocol1 - _, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol1)) + _, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1)) require.NoError(t, err) //Test peer selection for protocol1 @@ -134,10 +134,10 @@ func TestPeerSelection(t *testing.T) { defer h3.Close() protocol := libp2pProtocol.ID("test/protocol") - _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, false, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) - _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, false, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) @@ -176,7 +176,7 @@ func TestDefaultProtocol(t *testing.T) { defer h5.Close() //Test peer selection for relay protocol from peer store - _, err = pm.AddPeer(getAddr(h5), wps.Static, []string{""}, false, relay.WakuRelayID_v200) + _, err = pm.AddPeer(getAddr(h5), wps.Static, []string{""}, relay.WakuRelayID_v200) require.NoError(t, err) // since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol. @@ -197,7 +197,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) { require.NoError(t, err) defer h6.Close() - _, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, false, protocol2) + _, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2) require.NoError(t, err) peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2}) diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 4b83efd5d..fb2d2e95a 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -96,18 +96,31 @@ type HistoryRequestParameters struct { s *WakuStore } -type HistoryRequestOption func(*HistoryRequestParameters) +type HistoryRequestOption func(*HistoryRequestParameters) error -// WithPeer is an option used to specify the peerID to request the message history +// WithPeer is an option used to specify the peerID to request the message history. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. func WithPeer(p peer.ID) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.selectedPeer = p + if params.peerAddr != nil { + return errors.New("peerId and peerAddr options are mutually exclusive") + } + return nil } } +//WithPeerAddr is an option used to specify a peerAddress to request the message history. +// This new peer will be added to peerStore. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. + func WithPeerAddr(pAddr multiaddr.Multiaddr) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.peerAddr = pAddr + if params.selectedPeer != "" { + return errors.New("peerAddr and peerId options are mutually exclusive") + } + return nil } } @@ -117,9 +130,10 @@ func WithPeerAddr(pAddr multiaddr.Multiaddr) HistoryRequestOption { // from the node peerstore // Note: This option is avaiable only with peerManager func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.peerSelectionType = peermanager.Automatic params.preferredPeers = fromThesePeers + return nil } } @@ -129,44 +143,50 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption // from the node peerstore // Note: This option is avaiable only with peerManager func WithFastestPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.peerSelectionType = peermanager.LowestRTT + return nil } } // WithRequestID is an option to set a specific request ID to be used when // creating a store request func WithRequestID(requestID []byte) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.requestID = requestID + return nil } } // WithAutomaticRequestID is an option to automatically generate a request ID // when creating a store request func WithAutomaticRequestID() HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.requestID = protocol.GenerateRequestID() + return nil } } func WithCursor(c *pb.Index) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.cursor = c + return nil } } // WithPaging is an option used to specify the order and maximum number of records to return func WithPaging(asc bool, pageSize uint64) HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.asc = asc params.pageSize = pageSize + return nil } } func WithLocalQuery() HistoryRequestOption { - return func(params *HistoryRequestParameters) { + return func(params *HistoryRequestParameters) error { params.localQuery = true + return nil } } @@ -262,7 +282,10 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR optList := DefaultOptions() optList = append(optList, opts...) for _, opt := range optList { - opt(params) + err := opt(params) + if err != nil { + return nil, err + } } if !params.localQuery { @@ -281,11 +304,12 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR //Add Peer to peerstore. if store.pm != nil && params.peerAddr != nil { - peerId, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, true, StoreID_v20beta4) + pData, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v20beta4) if err != nil { return nil, err } - params.selectedPeer = peerId + store.pm.Connect(pData) + params.selectedPeer = pData.AddrInfo.ID } if store.pm != nil && params.selectedPeer == "" { var err error diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index a53a74a4e..76c63ff5c 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -107,7 +107,7 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string peer := service.PeerData{ Origin: peerstore.Rendezvous, AddrInfo: p, - PubSubTopics: []string{namespace}, + PubsubTopics: []string{namespace}, } if !r.PushToChan(peer) { r.log.Error("could push to closed channel/context completed") diff --git a/waku/v2/service/common_discovery_service.go b/waku/v2/service/common_discovery_service.go index 72bf96f1d..c22f18f11 100644 --- a/waku/v2/service/common_discovery_service.go +++ b/waku/v2/service/common_discovery_service.go @@ -14,7 +14,7 @@ type PeerData struct { Origin wps.Origin AddrInfo peer.AddrInfo ENR *enode.Node - PubSubTopics []string + PubsubTopics []string } type CommonDiscoveryService struct {