Skip to content

Commit

Permalink
chore: address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Nov 13, 2023
1 parent c5a64ec commit 12575ce
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 62 deletions.
35 changes: 13 additions & 22 deletions cmd/waku/server/rest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
)

type StoreService struct {
Expand Down Expand Up @@ -56,7 +55,7 @@ func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService {
return s
}

func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store.HistoryRequestOption, error) {
func getStoreParams(r *http.Request) (*store.Query, []store.HistoryRequestOption, error) {
query := &store.Query{}
var options []store.HistoryRequestOption
var err error
Expand All @@ -65,15 +64,9 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
if peerAddrStr != "" {
m, err = multiaddr.NewMultiaddr(peerAddrStr)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}

peerID, err := utils.GetPeerID(m)
if err != nil {
return nil, nil, nil, err
}

options = append(options, store.WithPeer(peerID))
options = append(options, store.WithPeerAddr(m))
}
query.PubsubTopic = r.URL.Query().Get("pubsubTopic")

Expand All @@ -86,7 +79,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
if startTimeStr != "" {
startTime, err := strconv.ParseInt(startTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
query.StartTime = &startTime
}
Expand All @@ -95,7 +88,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
if endTimeStr != "" {
endTime, err := strconv.ParseInt(endTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
query.EndTime = &endTime
}
Expand All @@ -112,21 +105,21 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
if senderTimeStr != "" {
cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
}

if storeTimeStr != "" {
cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
}

if digestStr != "" {
cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
}

Expand All @@ -143,21 +136,21 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
if ascendingStr != "" {
ascending, err = strconv.ParseBool(ascendingStr)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
}

if pageSizeStr != "" {
pageSize, err = strconv.ParseUint(pageSizeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
}

options = append(options, store.WithPaging(ascending, pageSize))
}

return m, query, options, nil
return query, options, nil
}

func writeStoreError(w http.ResponseWriter, code int, err error) {
Expand Down Expand Up @@ -191,17 +184,15 @@ func toStoreResponse(result *store.Result) StoreResponse {
}

func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
peerAddr, query, options, err := getStoreParams(r)
query, options, err := getStoreParams(r)
if err != nil {
writeStoreError(w, http.StatusBadRequest, err)
return
}

ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
if peerAddr != nil {
options = append(options, store.WithPeerAddr(peerAddr))
}

result, err := d.node.Store().Query(ctx, *query, options...)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
Expand Down
8 changes: 6 additions & 2 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,11 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro
// AddPeer is used to add a peer and the protocols it support to the node peerstore
// TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics.
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
return w.peermanager.AddPeer(address, origin, pubSubTopics, false, protocols...)
pData, err := w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...)
if err != nil {
return "", err
}
return pData.AddrInfo.ID, nil
}

// AddDiscoveredPeer to add a discovered peer to the node peerStore
Expand All @@ -725,7 +729,7 @@ func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wp
ID: ID,
Addrs: addrs,
},
PubSubTopics: pubsubTopics,
PubsubTopics: pubsubTopics,
}
w.peermanager.AddDiscoveredPeer(p, connectNow)
}
Expand Down
53 changes: 38 additions & 15 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,11 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID {
logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
} else {
if shards != nil {
p.PubSubTopics = make([]string, 0)
p.PubsubTopics = make([]string, 0)
topics := shards.Topics()
for _, topic := range topics {
topicStr := topic.String()
p.PubSubTopics = append(p.PubSubTopics, topicStr)
p.PubsubTopics = append(p.PubsubTopics, topicStr)
}
} else {
pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID))
Expand Down Expand Up @@ -361,12 +361,12 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
return
}
supportedProtos := []protocol.ID{}
if len(p.PubSubTopics) == 0 && p.ENR != nil {
if len(p.PubsubTopics) == 0 && p.ENR != nil {
// Try to fetch shard info and supported protocols from ENR to arrive at pubSub topics.
supportedProtos = pm.processPeerENR(&p)
}

_ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubSubTopics, supportedProtos...)
_ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubsubTopics, supportedProtos...)

if p.ENR != nil {
err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR)
Expand Down Expand Up @@ -419,12 +419,29 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig
return nil
}

func AddrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host, pubsubTopics ...string) *service.PeerData {
addrs := host.Peerstore().Addrs(peerID)
if len(addrs) == 0 {
//Addresses expired, remove peer from peerStore
host.Peerstore().RemovePeer(peerID)
return nil
}
return &service.PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
ID: peerID,
Addrs: addrs,
},
PubsubTopics: pubsubTopics,
}
}

// AddPeer adds peer to the peerStore and also to service slots
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, connectNow bool, protocols ...protocol.ID) (peer.ID, error) {
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) {
//Assuming all addresses have peerId
info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {
return "", err
return nil, err
}

//Add Service peers to serviceSlots.
Expand All @@ -433,20 +450,26 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTo
}

//Add to the peer-store
err = pm.addPeer(info.ID, info.Addrs, origin, pubSubTopics, protocols...)
err = pm.addPeer(info.ID, info.Addrs, origin, pubsubTopics, protocols...)
if err != nil {
return "", err
return nil, err
}

if connectNow {
go pm.peerConnector.PushToChan(service.PeerData{
Origin: origin,
AddrInfo: *info,
PubSubTopics: pubSubTopics,
})
pData := &service.PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
ID: info.ID,
Addrs: info.Addrs,
},
PubsubTopics: pubsubTopics,
}

return info.ID, nil
return pData, nil
}

// Connect establishes a connection to a peer.
func (pm *PeerManager) Connect(pData *service.PeerData) {
go pm.peerConnector.PushToChan(*pData)
}

// RemovePeer deletes peer from the peerStore after disconnecting it.
Expand Down
14 changes: 7 additions & 7 deletions waku/v2/peermanager/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestServiceSlots(t *testing.T) {

// add h2 peer to peer manager
t.Log(h2.ID())
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, libp2pProtocol.ID(protocol))
require.NoError(t, err)

///////////////
Expand All @@ -84,7 +84,7 @@ func TestServiceSlots(t *testing.T) {
require.Equal(t, peerID, h2.ID())

// add h3 peer to peer manager
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, libp2pProtocol.ID(protocol))
require.NoError(t, err)

// check that returned peer is h2 or h3 peer
Expand All @@ -108,7 +108,7 @@ func TestServiceSlots(t *testing.T) {
require.Error(t, err, ErrNoPeersAvailable)

// add h4 peer for protocol1
_, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol1))
_, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1))
require.NoError(t, err)

//Test peer selection for protocol1
Expand All @@ -134,10 +134,10 @@ func TestPeerSelection(t *testing.T) {
defer h3.Close()

protocol := libp2pProtocol.ID("test/protocol")
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, false, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)

_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, false, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)

_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestDefaultProtocol(t *testing.T) {
defer h5.Close()

//Test peer selection for relay protocol from peer store
_, err = pm.AddPeer(getAddr(h5), wps.Static, []string{""}, false, relay.WakuRelayID_v200)
_, err = pm.AddPeer(getAddr(h5), wps.Static, []string{""}, relay.WakuRelayID_v200)
require.NoError(t, err)

// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
Expand All @@ -197,7 +197,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
require.NoError(t, err)
defer h6.Close()

_, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, false, protocol2)
_, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2)
require.NoError(t, err)

peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
Expand Down
Loading

0 comments on commit 12575ce

Please sign in to comment.