Skip to content

Commit

Permalink
Merge branch 'master' into feat/v0.9.0
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos authored Nov 15, 2023
2 parents fcbe369 + 9a30c78 commit e261006
Show file tree
Hide file tree
Showing 20 changed files with 434 additions and 113 deletions.
9 changes: 9 additions & 0 deletions cmd/waku/server/rest/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ func NewAdminService(node *node.WakuNode, m *chi.Mux, log *zap.Logger) *AdminSer
func (a *AdminService) getV1Peers(w http.ResponseWriter, req *http.Request) {
peers, err := a.node.Peers()
if err != nil {
a.log.Error("failed to fetch peers", zap.Error(err))
writeErrOrResponse(w, err, nil)
return
}
a.log.Info("fetched peers", zap.Int("count", len(peers)))

response := make([]WakuPeer, 0)
for _, peer := range peers {
wPeer := WakuPeer{
Expand All @@ -68,6 +71,7 @@ func (a *AdminService) getV1Peers(w http.ResponseWriter, req *http.Request) {
}
for _, proto := range peer.Protocols {
if !server.IsWakuProtocol(proto) {
a.log.Debug("skipping protocol as it is a non-waku protocol", logging.HostID("peer", peer.ID), zap.String("protocol", string(proto)))
continue
}
wPeer.Protocols = append(wPeer.Protocols, string(proto))
Expand All @@ -85,6 +89,7 @@ func (a *AdminService) postV1Peer(w http.ResponseWriter, req *http.Request) {

decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&pInfo); err != nil {
a.log.Error("failed to decode request", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
Expand All @@ -102,6 +107,10 @@ func (a *AdminService) postV1Peer(w http.ResponseWriter, req *http.Request) {
topics = append(topics, topic.String())
}

for _, proto := range pInfo.Protocols {
protos = append(protos, protocol.ID(proto))
}

id, err := a.node.AddPeer(addr, peerstore.Static, topics, protos...)
if err != nil {
a.log.Error("failed to add peer", zap.Error(err))
Expand Down
12 changes: 4 additions & 8 deletions tests/string_generators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,15 @@ func TestStringGenerators(t *testing.T) {

// Generate string and print out to console
for i := 0; i < 1000; i++ {
x, err := GenerateRandomASCIIString(1, 4097)
x, err := GenerateRandomASCIIString(4097)
require.NoError(t, err)
log.Info("Generated random ASCII string", zap.String(strconv.Itoa(i), x))

x, err = GenerateRandomUTF8String(1, 4097, false)
x, err = GenerateRandomUTF8String(4097)
require.NoError(t, err)
log.Info("Generated random UTF8 string", zap.String(strconv.Itoa(i), x))

x, err = GenerateRandomUTF8String(1, 4097, true)
require.NoError(t, err)
log.Info("Generated uncommon UTF8 string", zap.String(strconv.Itoa(i), x))

x, err = GenerateRandomJSONString()
x, err = GenerateRandomJSONString(4099)
require.NoError(t, err)
log.Info("Generated random JSON string", zap.String(strconv.Itoa(i), x))

Expand All @@ -38,7 +34,7 @@ func TestStringGenerators(t *testing.T) {
require.NoError(t, err)
log.Info("Generated random URL encoded string", zap.String(strconv.Itoa(i), x))

x, err = GenerateRandomSQLInsert()
x, err = GenerateRandomSQLInsert(4096)
require.NoError(t, err)
log.Info("Generated random SQL insert string", zap.String(strconv.Itoa(i), x))
}
Expand Down
48 changes: 22 additions & 26 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"go.uber.org/zap"
)

type StringGenerator func(maxLength int) (string, error)

// GetHostAddress returns the first listen address used by a host
func GetHostAddress(ha host.Host) multiaddr.Multiaddr {
return ha.Addrs()[0]
Expand Down Expand Up @@ -247,12 +249,12 @@ func RandomBytes(n int) ([]byte, error) {
return b, nil
}

func GenerateRandomASCIIString(minLength int, maxLength int) (string, error) {
length, err := rand.Int(rand.Reader, big.NewInt(int64(maxLength-minLength+1)))
func GenerateRandomASCIIString(maxLength int) (string, error) {
length, err := rand.Int(rand.Reader, big.NewInt(int64(maxLength)))
if err != nil {
return "", err
}
length.SetInt64(length.Int64() + int64(minLength))
length.SetInt64(length.Int64() + 1)

const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
result := make([]byte, length.Int64())
Expand All @@ -267,27 +269,21 @@ func GenerateRandomASCIIString(minLength int, maxLength int) (string, error) {
return string(result), nil
}

func GenerateRandomUTF8String(minLength int, maxLength int, withUncommon bool) (string, error) {
length, err := rand.Int(rand.Reader, big.NewInt(int64(maxLength-minLength+1)))
func GenerateRandomUTF8String(maxLength int) (string, error) {
length, err := rand.Int(rand.Reader, big.NewInt(int64(maxLength)))
if err != nil {
return "", err
}
length.SetInt64(length.Int64() + int64(minLength))
length.SetInt64(length.Int64() + 1)

var (
runes []rune
start, end int
)

if withUncommon {
// Unicode range for uncommon or unprintable characters, the Private Use Area (E000–F8FF)
start = 0xE000
end = 0xF8FF
} else {
// Define unicode range
start = 0x0020 // Space character
end = 0x007F // Tilde (~)
}
// Define unicode range
start = 0x0020 // Space character
end = 0x007F // Tilde (~)

for i := 0; int64(i) < length.Int64(); i++ {
randNum, err := rand.Int(rand.Reader, big.NewInt(int64(end-start+1)))
Expand All @@ -304,15 +300,15 @@ func GenerateRandomUTF8String(minLength int, maxLength int, withUncommon bool) (
return string(runes), nil
}

func GenerateRandomJSONString() (string, error) {
func GenerateRandomJSONString(maxLength int) (string, error) {
// With 5 key-value pairs
m := make(map[string]interface{})
for i := 0; i < 5; i++ {
key, err := GenerateRandomASCIIString(1, 20)
key, err := GenerateRandomASCIIString(20)
if err != nil {
return "", err
}
value, err := GenerateRandomASCIIString(1, 4097)
value, err := GenerateRandomASCIIString(maxLength)
if err != nil {
return "", err
}
Expand All @@ -332,17 +328,17 @@ func GenerateRandomJSONString() (string, error) {
return buf.String(), nil
}

func GenerateRandomBase64String(length int) (string, error) {
bytes, err := RandomBytes(length)
func GenerateRandomBase64String(maxLength int) (string, error) {
bytes, err := RandomBytes(maxLength)
if err != nil {
return "", err
}

return base64.StdEncoding.EncodeToString(bytes), nil
}

func GenerateRandomURLEncodedString(length int) (string, error) {
randomString, err := GenerateRandomASCIIString(1, 4097)
func GenerateRandomURLEncodedString(maxLength int) (string, error) {
randomString, err := GenerateRandomASCIIString(maxLength)
if err != nil {
return "", err
}
Expand All @@ -351,9 +347,9 @@ func GenerateRandomURLEncodedString(length int) (string, error) {
return url.QueryEscape(randomString), nil
}

func GenerateRandomSQLInsert() (string, error) {
func GenerateRandomSQLInsert(maxLength int) (string, error) {
// Random table name
tableName, err := GenerateRandomASCIIString(1, 10)
tableName, err := GenerateRandomASCIIString(10)
if err != nil {
return "", err
}
Expand All @@ -365,7 +361,7 @@ func GenerateRandomSQLInsert() (string, error) {
}
columnNames := make([]string, columnCount)
for i := 0; i < columnCount; i++ {
columnName, err := GenerateRandomASCIIString(1, 20)
columnName, err := GenerateRandomASCIIString(maxLength)
if err != nil {
return "", err
}
Expand All @@ -375,7 +371,7 @@ func GenerateRandomSQLInsert() (string, error) {
// Random values
values := make([]string, columnCount)
for i := 0; i < columnCount; i++ {
value, err := GenerateRandomASCIIString(1, 100)
value, err := GenerateRandomASCIIString(maxLength)
if err != nil {
return "", err
}
Expand Down
10 changes: 8 additions & 2 deletions waku/v2/discv5/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,17 +272,19 @@ func (d *DiscoveryV5) evaluateNode() func(node *enode.Node) bool {
if node == nil {
return false
}
d.log.Debug("found a peer", logging.ENode("enr", node))

// node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage
if !isWakuNode(node) {
d.log.Debug("peer is not waku node", logging.ENode("enr", node))
return false
}

d.log.Debug("peer is a waku node", logging.ENode("enr", node))
_, err := wenr.EnodeToPeerInfo(node)

if err != nil {
d.metrics.RecordError(peerInfoFailure)
utils.Logger().Named("discv5").Error("obtaining peer info from enode", logging.ENode("enr", node), zap.Error(err))
d.log.Error("obtaining peer info from enode", logging.ENode("enr", node), zap.Error(err))
return false
}

Expand Down Expand Up @@ -404,21 +406,25 @@ func (d *DiscoveryV5) DefaultPredicate() Predicate {

nodeRS, err := wenr.RelaySharding(n.Record())
if err != nil {
d.log.Debug("failed to get relay shards from node record", logging.ENode("node", n), zap.Error(err))
return false
}

if nodeRS == nil {
d.log.Debug("node has no shards registered", logging.ENode("node", n))
// Node has no shards registered.
return false
}

if nodeRS.ClusterID != localRS.ClusterID {
d.log.Debug("cluster id mismatch from local clusterid", logging.ENode("node", n), zap.Error(err))
return false
}

// Contains any
for _, idx := range localRS.ShardIDs {
if nodeRS.Contains(localRS.ClusterID, idx) {
d.log.Debug("shards match for discovered node", logging.ENode("node", n))
return true
}
}
Expand Down
1 change: 1 addition & 0 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (c *PeerConnectionStrategy) consumeSubscription(s subscription) {
if len(c.host.Network().Peers()) < waku_proto.GossipSubOptimalFullMeshSize {
triggerImmediateConnection = true
}
c.logger.Debug("adding discovered peer", logging.HostID("peer", p.AddrInfo.ID))
c.pm.AddDiscoveredPeer(p, triggerImmediateConnection)

case <-time.After(1 * time.Second):
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/peermanager/peer_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16,

select {
case <-ctx.Done():
pm.logger.Error("failed to find peers for shard and services", zap.Uint16("cluster", cluster),
zap.Uint16("shard", shard), zap.String("service", string(wakuProtocol)), zap.Error(ctx.Err()))
return nil, ctx.Err()
default:
}
Expand Down
41 changes: 14 additions & 27 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (pm *PeerManager) GroupPeersByDirection(specificPeers ...peer.ID) (inPeers
outPeers = append(outPeers, p)
}
} else {
pm.logger.Error("Failed to retrieve peer direction",
pm.logger.Error("failed to retrieve peer direction",
logging.HostID("peerID", p), zap.Error(err))
}
}
Expand All @@ -188,7 +188,7 @@ func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers pee
if err != nil {
return
}
pm.logger.Debug("Number of peers connected", zap.Int("inPeers", inPeers.Len()),
pm.logger.Debug("number of peers connected", zap.Int("inPeers", inPeers.Len()),
zap.Int("outPeers", outPeers.Len()))

//Need to filter peers to check if they support relay
Expand All @@ -211,15 +211,17 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
curPeers := topicInst.topic.ListPeers()
curPeerLen := len(curPeers)
if curPeerLen < waku_proto.GossipSubOptimalFullMeshSize {
pm.logger.Info("Subscribed topic is unhealthy, initiating more connections to maintain health",
pm.logger.Debug("subscribed topic is unhealthy, initiating more connections to maintain health",
zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen),
zap.Int("optimumPeers", waku_proto.GossipSubOptimalFullMeshSize))
//Find not connected peers.
notConnectedPeers := pm.getNotConnectedPers(topicStr)
if notConnectedPeers.Len() == 0 {
pm.logger.Debug("could not find any peers in peerstore to connect to, discovering more", zap.String("pubSubTopic", topicStr))
pm.discoverPeersByPubsubTopics([]string{topicStr}, relay.WakuRelayID_v200, pm.ctx, 2)
continue
}
pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr))
//Connect to eligible peers.
numPeersToConnect := waku_proto.GossipSubOptimalFullMeshSize - curPeerLen

Expand All @@ -239,7 +241,7 @@ func (pm *PeerManager) connectToRelayPeers() {
pm.ensureMinRelayConnsPerTopic()

inRelayPeers, outRelayPeers := pm.getRelayPeers()
pm.logger.Info("number of relay peers connected",
pm.logger.Debug("number of relay peers connected",
zap.Int("in", inRelayPeers.Len()),
zap.Int("out", outRelayPeers.Len()))
if inRelayPeers.Len() > 0 &&
Expand All @@ -248,28 +250,10 @@ func (pm *PeerManager) connectToRelayPeers() {
}
}

// addrInfoToPeerData returns addressinfo for a peer
// If addresses are expired, it removes the peer from host peerStore and returns nil.
func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) *service.PeerData {
addrs := host.Peerstore().Addrs(peerID)
if len(addrs) == 0 {
//Addresses expired, remove peer from peerStore
host.Peerstore().RemovePeer(peerID)
return nil
}
return &service.PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
ID: peerID,
Addrs: addrs,
},
}
}

// connectToPeers connects to peers provided in the list if the addresses have not expired.
func (pm *PeerManager) connectToPeers(peers peer.IDSlice) {
for _, peerID := range peers {
peerData := addrInfoToPeerData(wps.PeerManager, peerID, pm.host)
peerData := AddrInfoToPeerData(wps.PeerManager, peerID, pm.host)
if peerData == nil {
continue
}
Expand Down Expand Up @@ -306,10 +290,10 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) {
p := inRelayPeers[pruningStartIndex]
err := pm.host.Network().ClosePeer(p)
if err != nil {
pm.logger.Warn("Failed to disconnect connection towards peer",
pm.logger.Warn("failed to disconnect connection towards peer",
logging.HostID("peerID", p))
}
pm.logger.Debug("Successfully disconnected connection towards peer",
pm.logger.Debug("successfully disconnected connection towards peer",
logging.HostID("peerID", p))
}
}
Expand Down Expand Up @@ -357,7 +341,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
//Check if the peer is already present, if so skip adding
_, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID)
if err == nil {
pm.logger.Debug("Found discovered peer already in peerStore", logging.HostID("peer", p.AddrInfo.ID))
pm.logger.Debug("peer already in peerStore", logging.HostID("peer", p.AddrInfo.ID))
return
}
supportedProtos := []protocol.ID{}
Expand All @@ -376,6 +360,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
}
}
if connectNow {
pm.logger.Debug("connecting now to discovered peer", logging.HostID("peer", p.AddrInfo.ID))
go pm.peerConnector.PushToChan(p)
}
}
Expand All @@ -384,6 +369,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
// It also sets additional metadata such as origin, ENR and supported protocols
func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error {
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
pm.logger.Error("could not add peer as peer store capacity is reached", logging.HostID("peer", ID), zap.Int("capacity", pm.maxPeers))
return errors.New("peer store capacity reached")
}
pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID))
Expand All @@ -403,6 +389,7 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig
if len(protocols) > 0 {
err = pm.host.Peerstore().AddProtocols(ID, protocols...)
if err != nil {
pm.logger.Error("could not set protocols", zap.Error(err), logging.HostID("peer", ID))
return err
}
}
Expand Down Expand Up @@ -492,7 +479,7 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {

//For now adding the peer to serviceSlot which means the latest added peer would be given priority.
//TODO: Ideally we should sort the peers per service and return best peer based on peer score or RTT etc.
pm.logger.Info("Adding peer to service slots", logging.HostID("peer", peerID),
pm.logger.Info("adding peer to service slots", logging.HostID("peer", peerID),
zap.String("service", string(proto)))
// getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol
pm.serviceSlots.getPeers(proto).add(peerID)
Expand Down
Loading

0 comments on commit e261006

Please sign in to comment.