diff --git a/crates/iroha_core/src/peers_gossiper.rs b/crates/iroha_core/src/peers_gossiper.rs index b7a02b7757f..57e040b9095 100644 --- a/crates/iroha_core/src/peers_gossiper.rs +++ b/crates/iroha_core/src/peers_gossiper.rs @@ -11,7 +11,7 @@ use std::{ use iroha_config::parameters::actual::TrustedPeers; use iroha_data_model::peer::{Peer, PeerId}; use iroha_futures::supervisor::{Child, OnShutdown, ShutdownSignal}; -use iroha_p2p::{Broadcast, UpdatePeers}; +use iroha_p2p::{Broadcast, UpdatePeers, UpdateTopology}; use iroha_primitives::{addr::SocketAddr, unique_vec::UniqueVec}; use iroha_version::{Decode, Encode}; use parity_scale_codec::{Error, Input}; @@ -23,6 +23,7 @@ use crate::{IrohaNetwork, NetworkMessage}; #[derive(Clone)] pub struct PeersGossiperHandle { message_sender: mpsc::Sender, + update_topology_sender: mpsc::UnboundedSender, } impl PeersGossiperHandle { @@ -33,6 +34,13 @@ impl PeersGossiperHandle { .await .expect("Gossiper must handle messages until there is at least one handle to it") } + + /// Send [`UpdateTopology`] message on network actor. + pub fn update_topology(&self, topology: UpdateTopology) { + self.update_topology_sender + .send(topology) + .expect("Gossiper must accept messages until there is at least one handle to it") + } } /// Actor which gossips peers addresses. @@ -41,6 +49,7 @@ pub struct PeersGossiper { initial_peers: HashMap, /// Peers received via gossiping from other peers gossip_peers: HashMap, + current_topology: HashSet, network: IrohaNetwork, } @@ -69,15 +78,24 @@ impl PeersGossiper { let gossiper = Self { initial_peers, gossip_peers: HashMap::new(), + current_topology: HashSet::new(), network, }; gossiper.network_update_peers_addresses(); let (message_sender, message_receiver) = mpsc::channel(1); + let (update_topology_sender, update_topology_receiver) = mpsc::unbounded_channel(); ( - PeersGossiperHandle { message_sender }, + PeersGossiperHandle { + message_sender, + update_topology_sender, + }, Child::new( - tokio::task::spawn(gossiper.run(message_receiver, shutdown_signal)), + tokio::task::spawn(gossiper.run( + message_receiver, + update_topology_receiver, + shutdown_signal, + )), OnShutdown::Abort, ), ) @@ -86,11 +104,15 @@ impl PeersGossiper { async fn run( mut self, mut message_receiver: mpsc::Receiver, + mut update_topology_receiver: mpsc::UnboundedReceiver, shutdown_signal: ShutdownSignal, ) { let mut gossip_period = tokio::time::interval(Duration::from_secs(60)); loop { tokio::select! { + Some(update_topology) = update_topology_receiver.recv() => { + self.set_current_topology(update_topology); + } _ = gossip_period.tick() => { self.gossip_peers() } @@ -109,6 +131,11 @@ impl PeersGossiper { } } + fn set_current_topology(&mut self, UpdateTopology(topology): UpdateTopology) { + self.gossip_peers.retain(|peer, _| topology.contains(peer)); + self.current_topology = topology; + } + fn gossip_peers(&self) { let online_peers = self.network.online_peers(Clone::clone); let online_peers = UniqueVec::from_iter(online_peers); @@ -118,7 +145,9 @@ impl PeersGossiper { fn handle_peers_gossip(&mut self, PeersGossip(peers): PeersGossip) { for peer in peers { - self.gossip_peers.insert(peer.id, peer.address); + if self.current_topology.contains(&peer.id) { + self.gossip_peers.insert(peer.id, peer.address); + } } self.network_update_peers_addresses(); } diff --git a/crates/iroha_core/src/sumeragi/main_loop.rs b/crates/iroha_core/src/sumeragi/main_loop.rs index 4211c6d8e36..db54d62d9e1 100644 --- a/crates/iroha_core/src/sumeragi/main_loop.rs +++ b/crates/iroha_core/src/sumeragi/main_loop.rs @@ -1,5 +1,9 @@ //! The main event loop that powers sumeragi. -use std::{collections::BTreeSet, ops::Deref, sync::mpsc}; +use std::{ + collections::{BTreeSet, HashSet}, + ops::Deref, + sync::mpsc, +}; use iroha_crypto::{HashOf, KeyPair}; use iroha_data_model::{block::*, events::pipeline::PipelineEventBox, peer::PeerId}; @@ -8,7 +12,10 @@ use iroha_p2p::UpdateTopology; use tracing::{span, Level}; use super::{view_change::ProofBuilder, *}; -use crate::{block::*, queue::TransactionGuard, sumeragi::tracing::instrument}; +use crate::{ + block::*, peers_gossiper::PeersGossiperHandle, queue::TransactionGuard, + sumeragi::tracing::instrument, +}; /// `Sumeragi` is the implementation of the consensus. pub struct Sumeragi { @@ -26,6 +33,8 @@ pub struct Sumeragi { pub kura: Arc, /// [`iroha_p2p::Network`] actor address pub network: IrohaNetwork, + /// Peers gossiper + pub peers_gossiper: PeersGossiperHandle, /// Receiver channel, for control flow messages. pub control_message_receiver: mpsc::Receiver, /// Receiver channel. @@ -112,8 +121,9 @@ impl Sumeragi { /// Connect or disconnect peers according to the current network topology. fn connect_peers(&self, topology: &Topology) { - let peers = topology.iter().cloned().collect(); - self.network.update_topology(UpdateTopology(peers)); + let peers = topology.iter().cloned().collect::>(); + self.network.update_topology(UpdateTopology(peers.clone())); + self.peers_gossiper.update_topology(UpdateTopology(peers)); } fn send_event(&self, event: impl Into) { diff --git a/crates/iroha_core/src/sumeragi/mod.rs b/crates/iroha_core/src/sumeragi/mod.rs index 79276a5f462..c256b4eddc3 100644 --- a/crates/iroha_core/src/sumeragi/mod.rs +++ b/crates/iroha_core/src/sumeragi/mod.rs @@ -28,7 +28,10 @@ pub mod network_topology; pub mod view_change; use self::{message::*, view_change::ProofChain}; -use crate::{kura::Kura, prelude::*, queue::Queue, EventsSender, IrohaNetwork, NetworkMessage}; +use crate::{ + kura::Kura, peers_gossiper::PeersGossiperHandle, prelude::*, queue::Queue, EventsSender, + IrohaNetwork, NetworkMessage, +}; /// Handle to `Sumeragi` actor #[derive(Clone)] @@ -144,6 +147,7 @@ impl SumeragiStartArgs { queue, kura, network, + peers_gossiper, genesis_network, block_count: BlockCount(block_count), #[cfg(feature = "telemetry")] @@ -217,6 +221,7 @@ impl SumeragiStartArgs { events_sender, kura: Arc::clone(&kura), network: network.clone(), + peers_gossiper, control_message_receiver, message_receiver, debug_force_soft_fork, @@ -297,6 +302,7 @@ pub struct SumeragiStartArgs { pub queue: Arc, pub kura: Arc, pub network: IrohaNetwork, + pub peers_gossiper: PeersGossiperHandle, pub genesis_network: GenesisWithPubKey, pub block_count: BlockCount, #[cfg(feature = "telemetry")] diff --git a/crates/irohad/src/main.rs b/crates/irohad/src/main.rs index 147a71844ce..add74d9d783 100644 --- a/crates/irohad/src/main.rs +++ b/crates/irohad/src/main.rs @@ -259,6 +259,13 @@ impl Iroha { queue.clone(), ); + let (peers_gossiper, child) = PeersGossiper::start( + config.sumeragi.trusted_peers.value().clone(), + network.clone(), + supervisor.shutdown_signal(), + ); + supervisor.monitor(child); + let (sumeragi, child) = SumeragiStartArgs { sumeragi_config: config.sumeragi.clone(), common_config: config.common.clone(), @@ -267,6 +274,7 @@ impl Iroha { queue: queue.clone(), kura: kura.clone(), network: network.clone(), + peers_gossiper: peers_gossiper.clone(), genesis_network: GenesisWithPubKey { genesis, public_key: config.genesis.public_key.clone(), @@ -302,13 +310,6 @@ impl Iroha { .start(supervisor.shutdown_signal()); supervisor.monitor(child); - let (peers_gossiper, child) = PeersGossiper::start( - config.sumeragi.trusted_peers.value().clone(), - network.clone(), - supervisor.shutdown_signal(), - ); - supervisor.monitor(child); - supervisor.monitor(task::spawn( NetworkRelay { sumeragi,