From ad076e1cfefa6967cd3fa222e93c332b6c289b59 Mon Sep 17 00:00:00 2001 From: Artem Pikulin Date: Fri, 11 Dec 2020 19:59:22 +0700 Subject: [PATCH] Some improvements of peers_exchange. --- mm2src/mm2_libp2p/src/atomicdex_behaviour.rs | 7 +- mm2src/mm2_libp2p/src/peers_exchange.rs | 120 ++++++++++++++++++- 2 files changed, 119 insertions(+), 8 deletions(-) diff --git a/mm2src/mm2_libp2p/src/atomicdex_behaviour.rs b/mm2src/mm2_libp2p/src/atomicdex_behaviour.rs index d0bb2bef18..8a933e863d 100644 --- a/mm2src/mm2_libp2p/src/atomicdex_behaviour.rs +++ b/mm2src/mm2_libp2p/src/atomicdex_behaviour.rs @@ -519,9 +519,12 @@ fn announce_my_addresses(swarm: &mut AtomicDexSwarm) { } false }) + .take(1) .cloned() .collect(); - swarm.announce_listeners(global_listeners); + if !global_listeners.is_empty() { + swarm.announce_listeners(global_listeners); + } } /// Creates and spawns new AdexBehaviour Swarm returning: @@ -610,7 +613,7 @@ pub fn start_gossipsub( gossipsub, floodsub, request_response, - peers_exchange: PeersExchange::new(), + peers_exchange: PeersExchange::new(port), ping, }; libp2p::swarm::SwarmBuilder::new(transport, adex_behavior, local_peer_id.clone()) diff --git a/mm2src/mm2_libp2p/src/peers_exchange.rs b/mm2src/mm2_libp2p/src/peers_exchange.rs index 249e6a7ad2..fa4a37a3fe 100644 --- a/mm2src/mm2_libp2p/src/peers_exchange.rs +++ b/mm2src/mm2_libp2p/src/peers_exchange.rs @@ -1,7 +1,7 @@ use crate::request_response::Codec; use futures::StreamExt; use libp2p::swarm::NetworkBehaviour; -use libp2p::{multiaddr::Multiaddr, +use libp2p::{multiaddr::{Multiaddr, Protocol}, request_response::{handler::RequestProtocol, ProtocolName, ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage}, swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, @@ -33,6 +33,7 @@ impl ProtocolName for PeersExchangeProtocol { type PeersExchangeCodec = Codec; +const DEFAULT_PEERS_NUM: usize = 20; const REQUEST_PEERS_INITIAL_DELAY: u64 = 60; const REQUEST_PEERS_INTERVAL: u64 = 300; const MAX_PEERS: usize = 100; @@ -79,11 +80,13 @@ pub struct PeersExchange { events: VecDeque, ()>>, #[behaviour(ignore)] maintain_peers_interval: Interval, + #[behaviour(ignore)] + netid_port: u16, } #[allow(clippy::new_without_default)] impl PeersExchange { - pub fn new() -> Self { + pub fn new(netid_port: u16) -> Self { let codec = Codec::default(); let protocol = iter::once((PeersExchangeProtocol::Version1, ProtocolSupport::Full)); let config = RequestResponseConfig::default(); @@ -96,6 +99,7 @@ impl PeersExchange { Instant::now() + Duration::from_secs(REQUEST_PEERS_INITIAL_DELAY), Duration::from_secs(REQUEST_PEERS_INTERVAL), ), + netid_port, } } @@ -110,7 +114,6 @@ impl PeersExchange { result } - #[allow(unused)] fn forget_peer(&mut self, peer: &PeerId) { self.known_peers.retain(|known_peer| known_peer != peer); self.forget_peer_addresses(peer); @@ -123,6 +126,15 @@ impl PeersExchange { } pub fn add_peer_addresses(&mut self, peer: &PeerId, addresses: PeerAddresses) { + if addresses.len() > 1 { + return; + } + + for address in addresses.iter() { + if !self.validate_global_multiaddr(address) { + return; + } + } if !self.known_peers.contains(&peer) && !addresses.is_empty() { self.known_peers.push(peer.clone()); } @@ -148,7 +160,6 @@ impl PeersExchange { } fn request_known_peers_from_random_peer(&mut self) { - const DEFAULT_PEERS_NUM: usize = 20; let mut rng = thread_rng(); if let Some(from_peer) = self.known_peers.choose(&mut rng) { info!("Try to request {} peers from peer {}", DEFAULT_PEERS_NUM, from_peer); @@ -180,6 +191,61 @@ impl PeersExchange { } } + fn validate_global_multiaddr(&self, address: &Multiaddr) -> bool { + let mut components = address.iter(); + match components.next() { + Some(maybe_ip_v4) => match maybe_ip_v4 { + Protocol::Ip4(addr) => { + if !addr.is_global() { + return false; + } + }, + _ => return false, + }, + None => return false, + } + + match components.next() { + Some(maybe_ip_v4) => match maybe_ip_v4 { + Protocol::Tcp(port) => { + if port != self.netid_port { + return false; + } + }, + _ => return false, + }, + None => return false, + } + + if let Some(_) = components.next() { + return false; + } + true + } + + fn validate_get_known_peers_response(&self, response: &HashMap) -> bool { + if response.is_empty() { + return false; + } + + if response.len() > DEFAULT_PEERS_NUM { + return false; + } + + for addresses in response.values() { + if addresses.is_empty() { + return false; + } + + for address in addresses { + if !self.validate_global_multiaddr(address) { + return false; + } + } + } + true + } + fn poll( &mut self, cx: &mut Context, @@ -200,7 +266,7 @@ impl PeersExchange { impl NetworkBehaviourEventProcess> for PeersExchange { fn inject_event(&mut self, event: RequestResponseEvent) { match event { - RequestResponseEvent::Message { message, .. } => match message { + RequestResponseEvent::Message { message, peer } => match message { RequestResponseMessage::Request { request, channel, .. } => match request { PeersExchangeRequest::GetKnownPeers { num } => { let response = PeersExchangeResponse::KnownPeers { @@ -211,6 +277,13 @@ impl NetworkBehaviourEventProcess match response { PeersExchangeResponse::KnownPeers { peers } => { + if !self.validate_get_known_peers_response(&peers) { + // if peer provides invalid response forget it and try to request from other peer + self.forget_peer(&peer); + self.request_known_peers_from_random_peer(); + return; + } + info!("Got peers {:?}", peers); peers.into_iter().for_each(|(peer, addresses)| { self.add_peer_addresses(&peer.0, addresses); @@ -227,6 +300,8 @@ impl NetworkBehaviourEventProcess { error!( @@ -240,8 +315,11 @@ impl NetworkBehaviourEventProcess