From 33eb93fd41dc2a139b11fffd6927f60dc7e29532 Mon Sep 17 00:00:00 2001 From: Alan Szepieniec Date: Tue, 26 Nov 2024 15:01:23 +0100 Subject: [PATCH 1/5] feat(peer-loop): Ask peer for connections first Whenever we establish a connection with a new peer, the first thing to do after the handshake is now to ask the peer for its list of peers. This change is motivated by topological robustness in the context of adversaries who deploy sybil or other attacks. In such a context, the connection is not guaranteed to last and so we want to prioritize the information that affects network topology -- blockchain data may be gathered from other nodes. --- src/connect_to_peers.rs | 7 +++++++ src/main_loop.rs | 5 +++-- src/models/peer.rs | 3 ++- src/peer_loop.rs | 11 +++++++---- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/connect_to_peers.rs b/src/connect_to_peers.rs index 9d8b0b197..4612631f5 100644 --- a/src/connect_to_peers.rs +++ b/src/connect_to_peers.rs @@ -411,6 +411,12 @@ where bail!("Attempted to connect to peer that was not allowed. This connection attempt should not have been made."); } + // By default, start by asking the peer for its peers. In an adversarial + // context, we want the network topology to be as robust as possible. + // Blockchain data can be obtained from other peers, if this connection + // fails. + peer.send(PeerMessage::PeerListRequest).await?; + let mut peer_loop_handler = PeerLoopHandler::new( peer_task_to_main_tx, state, @@ -516,6 +522,7 @@ mod connect_tests { .read(&to_bytes(&PeerMessage::ConnectionStatus( ConnectionStatus::Accepted, ))?) + .write(&to_bytes(&PeerMessage::PeerListRequest)?) .read(&to_bytes(&PeerMessage::Bye)?) .build(); diff --git a/src/main_loop.rs b/src/main_loop.rs index 61833e693..7462380f5 100644 --- a/src/main_loop.rs +++ b/src/main_loop.rs @@ -816,8 +816,9 @@ impl MainLoopHandler { Ok(()) } - /// Function to perform peer discovery: Finds potential peers from connected peers and attempts - /// to establish connections with one of those potential peers. + /// Perform peer discovery and (if necessary) reconnect to the peers listed + /// as CLI arguments. Peer discovery involves finding potential peers from + /// connected peers and attempts to establish connections with one of them. /// /// Locking: /// * acquires `global_state_lock` for read diff --git a/src/models/peer.rs b/src/models/peer.rs index 0bf7d149f..f8ac0b440 100644 --- a/src/models/peer.rs +++ b/src/models/peer.rs @@ -583,7 +583,8 @@ impl PeerMessage { } } -/// `MutablePeerState` contains the part of the peer-loop's state that is mutable +/// `MutablePeerState` contains information about the peer's blockchain state. +/// Under normal conditions, this information varies across time. #[derive(Clone, Debug)] pub struct MutablePeerState { pub highest_shared_block_height: BlockHeight, diff --git a/src/peer_loop.rs b/src/peer_loop.rs index 1541cb8d8..ceec87b4d 100644 --- a/src/peer_loop.rs +++ b/src/peer_loop.rs @@ -198,7 +198,8 @@ impl PeerLoopHandler { /// in the batch. /// /// # Locking - /// * acquires `global_state_lock` for write via Self::punish() + /// * Acquires `global_state_lock` for write via `self.punish(..)` and + /// `self.reward(..)`. /// /// # Panics /// @@ -309,7 +310,8 @@ impl PeerLoopHandler { /// are passed down the pipeline. /// /// Locking: - /// * acquires `global_state_lock` for write via Self::punish() + /// * Acquires `global_state_lock` for write via `self.punish(..)` and + /// `self.reward(..)`. async fn try_ensure_path( &mut self, received_block: Box, @@ -452,8 +454,9 @@ impl PeerLoopHandler { /// Otherwise returns OK(false). /// /// Locking: - /// * acquires `global_state_lock` for read - /// * acquires `global_state_lock` for write via Self::punish() + /// * Acquires `global_state_lock` for read. + /// * Acquires `global_state_lock` for write via `self.punish(..)` and + /// `self.reward(..)`. async fn handle_peer_message( &mut self, msg: PeerMessage, From 1d8d929342059f4580066c0da1eacf88c29cd2e6 Mon Sep 17 00:00:00 2001 From: Alan Szepieniec Date: Tue, 26 Nov 2024 17:05:04 +0100 Subject: [PATCH 2/5] style(CLI-arguments): Rename `max_peers` to `max_num_peers` Benefits clarity. --- src/config_models/cli_args.rs | 8 ++++---- src/connect_to_peers.rs | 8 ++++---- src/main_loop.rs | 15 ++++++++------- src/models/state/mod.rs | 2 +- src/peer_loop.rs | 2 +- 5 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/config_models/cli_args.rs b/src/config_models/cli_args.rs index 2ddcdec78..2ec171726 100644 --- a/src/config_models/cli_args.rs +++ b/src/config_models/cli_args.rs @@ -56,7 +56,7 @@ pub struct Args { /// Will not prevent outgoing connections made with `--peers`. /// Set this value to 0 to refuse all incoming connections. #[clap(long, default_value = "10", value_name = "COUNT")] - pub max_peers: u16, + pub max_num_peers: u16, /// If this flag is set, the node will refuse to initiate a transaction. /// This flag makes sense for machines whose resources are dedicated to @@ -229,7 +229,7 @@ impl Args { /// Indicates if all incoming peer connections are disallowed. pub(crate) fn disallow_all_incoming_peer_connections(&self) -> bool { - self.max_peers.is_zero() + self.max_num_peers.is_zero() } /// Return the port that peer can connect on. None if incoming connections @@ -319,7 +319,7 @@ mod cli_args_tests { let default_args = Args::default(); assert_eq!(1000, default_args.peer_tolerance); - assert_eq!(10, default_args.max_peers); + assert_eq!(10, default_args.max_num_peers); assert_eq!(9798, default_args.peer_port); assert_eq!(9799, default_args.rpc_port); assert_eq!( @@ -342,7 +342,7 @@ mod cli_args_tests { #[test] fn max_peers_0_means_no_incoming_connections() { let args = Args { - max_peers: 0, + max_num_peers: 0, ..Default::default() }; assert!(args.disallow_all_incoming_peer_connections()); diff --git a/src/connect_to_peers.rs b/src/connect_to_peers.rs index 4612631f5..91f2f0474 100644 --- a/src/connect_to_peers.rs +++ b/src/connect_to_peers.rs @@ -100,7 +100,7 @@ async fn check_if_connection_is_allowed( if let Some(status) = { // Disallow connection if max number of &peers has been attained - if (cli_arguments.max_peers as usize) <= global_state.net.peer_map.len() { + if (cli_arguments.max_num_peers as usize) <= global_state.net.peer_map.len() { Some(ConnectionStatus::Refused( ConnectionRefusedReason::MaxPeerNumberExceeded, )) @@ -589,7 +589,7 @@ mod connect_tests { // pretend --max_peers is 1. let mut cli = state_lock.cli().clone(); - cli.max_peers = 1; + cli.max_num_peers = 1; state_lock.set_cli(cli.clone()).await; status = check_if_connection_is_allowed( @@ -606,7 +606,7 @@ mod connect_tests { } // pretend --max-peers is 100 - cli.max_peers = 100; + cli.max_num_peers = 100; state_lock.set_cli(cli.clone()).await; // Attempt to connect to already connected peer @@ -897,7 +897,7 @@ mod connect_tests { // set max_peers to 2 to ensure failure on next connection attempt let mut cli = state_lock.cli().clone(); - cli.max_peers = 2; + cli.max_num_peers = 2; state_lock.set_cli(cli).await; let answer = answer_peer( diff --git a/src/main_loop.rs b/src/main_loop.rs index 7462380f5..7ba43c646 100644 --- a/src/main_loop.rs +++ b/src/main_loop.rs @@ -731,7 +731,7 @@ impl MainLoopHandler { PeerTaskToMain::PeerDiscoveryAnswer((pot_peers, reported_by, distance)) => { log_slow_scope!(fn_name!() + "::PeerTaskToMain::PeerDiscoveryAnswer"); - let max_peers = self.global_state_lock.cli().max_peers; + let max_peers = self.global_state_lock.cli().max_num_peers; for pot_peer in pot_peers { main_loop_state.potential_peers.add( reported_by, @@ -832,7 +832,7 @@ impl MainLoopHandler { let connected_peers: Vec = global_state.net.peer_map.values().cloned().collect(); // Check if we are connected to too many peers - if connected_peers.len() > cli_args.max_peers as usize { + if connected_peers.len() > cli_args.max_num_peers as usize { // If *all* peer connections were outgoing, then it's OK to exceed // the max-peer count. But in that case we don't want to connect to // more peers, so we should just stop execution of this scheduled @@ -847,7 +847,7 @@ impl MainLoopHandler { warn!( "Max peer parameter is exceeded. max is {} but we are connected to {}. Attempting to fix.", connected_peers.len(), - self.global_state_lock.cli().max_peers + self.global_state_lock.cli().max_num_peers ); let mut rng = thread_rng(); @@ -927,8 +927,9 @@ impl MainLoopHandler { // We don't make an outgoing connection if we've reached the peer limit, *or* if we are // one below the peer limit as we reserve this last slot for an ingoing connection. - if connected_peers.len() == cli_args.max_peers as usize - || connected_peers.len() > 2 && connected_peers.len() - 1 == cli_args.max_peers as usize + if connected_peers.len() == cli_args.max_num_peers as usize + || connected_peers.len() > 2 + && connected_peers.len() - 1 == cli_args.max_num_peers as usize { return Ok(()); } @@ -1833,7 +1834,7 @@ mod tests { // Set CLI to ban incoming connections and all outgoing peer-discovery- // initiated connections. let mocked_cli = cli_args::Args { - max_peers: 0, + max_num_peers: 0, ..Default::default() }; main_loop_handler @@ -1890,7 +1891,7 @@ mod tests { // Set CLI to attempt to make more connections let mocked_cli = cli_args::Args { - max_peers: 10, + max_num_peers: 10, ..Default::default() }; main_loop_handler diff --git a/src/models/state/mod.rs b/src/models/state/mod.rs index d53d4361a..8b950665a 100644 --- a/src/models/state/mod.rs +++ b/src/models/state/mod.rs @@ -1649,7 +1649,7 @@ mod global_state_tests { ) .await; let no_incoming_connections = cli_args::Args { - max_peers: 0, + max_num_peers: 0, ..Default::default() }; bob.set_cli(no_incoming_connections).await; diff --git a/src/peer_loop.rs b/src/peer_loop.rs index ceec87b4d..8758d2a54 100644 --- a/src/peer_loop.rs +++ b/src/peer_loop.rs @@ -1433,7 +1433,7 @@ impl PeerLoopHandler { bail!("Attempted to connect to already connected peer. Aborting connection."); } - if global_state.net.peer_map.len() >= cli_args.max_peers as usize { + if global_state.net.peer_map.len() >= cli_args.max_num_peers as usize { bail!("Attempted to connect to more peers than allowed. Aborting connection."); } From 5559043aaae402a7cef817b05351d4c8078f41b9 Mon Sep 17 00:00:00 2001 From: Alan Szepieniec Date: Tue, 26 Nov 2024 17:06:49 +0100 Subject: [PATCH 3/5] feat: Bootstrapper mode This commit - adds a CLI argument `bootstrap` that instructs the client to run in bootstrapper mode, which: - kills the connection with the longest-lived peer when the max num peers is reached, such that: - a new incoming connection can always be established. The number of peers is tested against the maximum prior to entering the peer loop. If the maximum is about to be reached, a message is sent to the main loop, instructing it to select a peer and disconnect from it. --- src/config_models/cli_args.rs | 10 +++++++ src/connect_to_peers.rs | 53 +++++++++++++++++++++++++++-------- src/main_loop.rs | 26 +++++++++++++++++ src/models/channel.rs | 2 ++ src/models/peer.rs | 1 + src/models/state/mod.rs | 6 ++++ 6 files changed, 86 insertions(+), 12 deletions(-) diff --git a/src/config_models/cli_args.rs b/src/config_models/cli_args.rs index 2ec171726..f848336dc 100644 --- a/src/config_models/cli_args.rs +++ b/src/config_models/cli_args.rs @@ -58,6 +58,16 @@ pub struct Args { #[clap(long, default_value = "10", value_name = "COUNT")] pub max_num_peers: u16, + /// Whether to act as bootstrapper node. + /// + /// Bootstrapper nodes ensure that the maximum number of peers is never + /// reached by disconnecting from existing peers when the maximum is about + /// to be reached. As a result, they will respond with high likelihood to + /// incoming connection requests -- in contrast to regular nodes, which + /// refuse incoming connections when the max is reached. + #[clap(long)] + pub bootstrap: bool, + /// If this flag is set, the node will refuse to initiate a transaction. /// This flag makes sense for machines whose resources are dedicated to /// composing, and which must do so in a regular and predictable manner, diff --git a/src/connect_to_peers.rs b/src/connect_to_peers.rs index 91f2f0474..db4862bb1 100644 --- a/src/connect_to_peers.rs +++ b/src/connect_to_peers.rs @@ -134,10 +134,20 @@ async fn check_if_connection_is_allowed( return ConnectionStatus::Refused(ConnectionRefusedReason::IncompatibleVersion); } + // If this connection touches the maximum number of peer connections, say + // so with special OK code. + if cli_arguments.max_num_peers as usize == global_state.net.peer_map.len() + 1 { + info!("ConnectionStatus::Accepted, but max # connections is now reached"); + return ConnectionStatus::AcceptedMaxReached; + } + info!("ConnectionStatus::Accepted"); ConnectionStatus::Accepted } +/// Respond to an incoming connection initiation. +/// +/// Catch and process errors (if any) gracefully. pub(crate) async fn answer_peer_wrapper( stream: S, state_lock: GlobalStateLock, @@ -206,9 +216,9 @@ where > = SymmetricallyFramed::new(length_delimited, SymmetricalBincode::default()); // Complete Neptune handshake - let peer_handshake_data: HandshakeData = match peer.try_next().await? { + let (peer_handshake_data, acceptance_code) = match peer.try_next().await? { Some(PeerMessage::Handshake(payload)) => { - let (v, hsd) = *payload; + let (v, handshake_data) = *payload; if v != crate::MAGIC_STRING_REQUEST { bail!("Expected magic value, got {:?}", v); } @@ -220,11 +230,11 @@ where .await?; // Verify peer network before moving on - if hsd.network != own_handshake_data.network { + if handshake_data.network != own_handshake_data.network { bail!( "Cannot connect with {}: Peer runs {}, this client runs {}.", peer_address, - hsd.network, + handshake_data.network, own_handshake_data.network, ); } @@ -233,28 +243,47 @@ where let connection_status = check_if_connection_is_allowed( state.clone(), &own_handshake_data, - &hsd, + &handshake_data, &peer_address, ) .await; - peer.send(PeerMessage::ConnectionStatus(connection_status)) - .await?; - if let ConnectionStatus::Refused(refused_reason) = connection_status { - warn!("Incoming connection refused: {:?}", refused_reason); - bail!("Refusing incoming connection. Reason: {:?}", refused_reason); + match connection_status { + ConnectionStatus::Refused(refused_reason) => { + peer.send(PeerMessage::ConnectionStatus(ConnectionStatus::Refused( + refused_reason, + ))) + .await?; + warn!("Incoming connection refused: {:?}", refused_reason); + bail!("Refusing incoming connection. Reason: {:?}", refused_reason); + } + ConnectionStatus::AcceptedMaxReached | ConnectionStatus::Accepted => { + peer.send(PeerMessage::ConnectionStatus(ConnectionStatus::Accepted)) + .await?; + } } debug!("Got correct magic value request!"); - hsd + (handshake_data, connection_status) } _ => { bail!("Didn't get handshake on connection attempt"); } }; - // Whether the incoming connection comes from a peer in bad standing is checked in `get_connection_status` + // Whether the incoming connection comes from a peer in bad standing is + // checked in `check_if_connection_is_allowed`. So if we get here, we are + // good to go. info!("Connection accepted from {}", peer_address); + + // If necessary, disconnect from another, existing peer. + if acceptance_code == ConnectionStatus::AcceptedMaxReached { + info!("Maximum # peers reached, so disconnecting from an existing peer."); + peer_task_to_main_tx + .send(PeerTaskToMain::DisconnectFromLongestLivedPeer) + .await?; + } + let peer_distance = 1; // All incoming connections have distance 1 let mut peer_loop_handler = PeerLoopHandler::new( peer_task_to_main_tx, diff --git a/src/main_loop.rs b/src/main_loop.rs index 7ba43c646..a743561e3 100644 --- a/src/main_loop.rs +++ b/src/main_loop.rs @@ -811,6 +811,32 @@ impl MainLoopHandler { self.main_to_miner_tx.send(MainToMiner::NewBlockProposal); } + PeerTaskToMain::DisconnectFromLongestLivedPeer => { + let global_state = self.global_state_lock.lock_guard().await; + + // get all peers + let all_peers = global_state.net.peer_map.iter(); + + // filter out CLI peers + let disconnect_candidates = + all_peers.filter(|p| !global_state.cli_peers().contains(p.0)); + + // find the one with the oldest connection + let longest_lived_peer = disconnect_candidates.min_by( + |(_socket_address_left, peer_info_left), + (_socket_address_right, peer_info_right)| { + peer_info_left + .connection_established() + .cmp(&peer_info_right.connection_established()) + }, + ); + + // tell to disconnect + if let Some((peer_socket, _peer_info)) = longest_lived_peer { + self.main_to_peer_broadcast_tx + .send(MainToPeerTask::Disconnect(peer_socket.to_owned()))?; + } + } } Ok(()) diff --git a/src/models/channel.rs b/src/models/channel.rs index 573de3b15..5fcfa065c 100644 --- a/src/models/channel.rs +++ b/src/models/channel.rs @@ -135,6 +135,7 @@ pub(crate) enum PeerTaskToMain { PeerDiscoveryAnswer((Vec<(SocketAddr, u128)>, SocketAddr, u8)), // ([(peer_listen_address)], reported_by, distance) Transaction(Box), BlockProposal(Box), + DisconnectFromLongestLivedPeer, } #[derive(Clone, Debug)] @@ -152,6 +153,7 @@ impl PeerTaskToMain { PeerTaskToMain::PeerDiscoveryAnswer(_) => "peer discovery answer", PeerTaskToMain::Transaction(_) => "transaction", PeerTaskToMain::BlockProposal(_) => "block proposal", + PeerTaskToMain::DisconnectFromLongestLivedPeer => "disconnect from longest lived peer", } .to_string() } diff --git a/src/models/peer.rs b/src/models/peer.rs index f8ac0b440..3d77e6327 100644 --- a/src/models/peer.rs +++ b/src/models/peer.rs @@ -449,6 +449,7 @@ pub enum ConnectionRefusedReason { #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum ConnectionStatus { Refused(ConnectionRefusedReason), + AcceptedMaxReached, Accepted, } diff --git a/src/models/state/mod.rs b/src/models/state/mod.rs index 8b950665a..efbd79b1b 100644 --- a/src/models/state/mod.rs +++ b/src/models/state/mod.rs @@ -12,6 +12,7 @@ pub mod tx_proving_capability; pub mod wallet; use std::cmp::max; +use std::net::SocketAddr; use std::ops::Deref; use std::ops::DerefMut; use std::time::SystemTime; @@ -1542,6 +1543,11 @@ impl GlobalState { &self.cli } + /// Return the list of peers that were supplied as CLI arguments. + pub(crate) fn cli_peers(&self) -> Vec { + self.cli().peers.clone() + } + pub(crate) fn proving_capability(&self) -> TxProvingCapability { self.cli().proving_capability() } From 71728a012340a458d64b699601077b7c256a79e8 Mon Sep 17 00:00:00 2001 From: Alan Szepieniec Date: Wed, 27 Nov 2024 16:33:44 +0100 Subject: [PATCH 4/5] test: Verify disconnect from longest-lived peer --- src/connect_to_peers.rs | 13 ++- src/main_loop.rs | 209 +++++++++++++++++++++++++++++++++++++++- src/models/peer.rs | 5 + src/tests/shared.rs | 2 +- 4 files changed, 220 insertions(+), 9 deletions(-) diff --git a/src/connect_to_peers.rs b/src/connect_to_peers.rs index db4862bb1..20be723fb 100644 --- a/src/connect_to_peers.rs +++ b/src/connect_to_peers.rs @@ -193,7 +193,7 @@ where inner_ret } -async fn answer_peer( +pub(crate) async fn answer_peer( stream: S, state: GlobalStateLock, peer_address: std::net::SocketAddr, @@ -218,7 +218,7 @@ where // Complete Neptune handshake let (peer_handshake_data, acceptance_code) = match peer.try_next().await? { Some(PeerMessage::Handshake(payload)) => { - let (v, handshake_data) = *payload; + let (v, peer_handshake_data) = *payload; if v != crate::MAGIC_STRING_REQUEST { bail!("Expected magic value, got {:?}", v); } @@ -230,11 +230,11 @@ where .await?; // Verify peer network before moving on - if handshake_data.network != own_handshake_data.network { + if peer_handshake_data.network != own_handshake_data.network { bail!( "Cannot connect with {}: Peer runs {}, this client runs {}.", peer_address, - handshake_data.network, + peer_handshake_data.network, own_handshake_data.network, ); } @@ -243,7 +243,7 @@ where let connection_status = check_if_connection_is_allowed( state.clone(), &own_handshake_data, - &handshake_data, + &peer_handshake_data, &peer_address, ) .await; @@ -263,8 +263,7 @@ where } } - debug!("Got correct magic value request!"); - (handshake_data, connection_status) + (peer_handshake_data, connection_status) } _ => { bail!("Didn't get handshake on connection attempt"); diff --git a/src/main_loop.rs b/src/main_loop.rs index a743561e3..68f716441 100644 --- a/src/main_loop.rs +++ b/src/main_loop.rs @@ -1601,7 +1601,9 @@ impl MainLoopHandler { } #[cfg(test)] -mod tests { +mod test { + use std::time::UNIX_EPOCH; + use tracing_test::traced_test; use super::*; @@ -1959,4 +1961,209 @@ mod tests { drop(main_to_peer_rx); } } + + #[test] + fn older_systemtime_ranks_first() { + let start = UNIX_EPOCH; + let other = UNIX_EPOCH + Duration::from_secs(1000); + let mut instants = [start, other]; + + assert_eq!( + start, + instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap() + ); + + instants.reverse(); + + assert_eq!( + start, + instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap() + ); + } + mod bootstrapper_mode { + + use rand::Rng; + + use crate::{ + connect_to_peers::answer_peer, + models::peer::{ConnectionStatus, PeerMessage}, + tests::shared::{get_dummy_peer_connection_data_genesis, to_bytes}, + }; + + use super::*; + + #[tokio::test] + #[traced_test] + async fn disconnect_from_oldest_peer_upon_connection_request() { + // Set up a node in bootstrapper mode and connected to a given + // number of peers, which is one less than the maximum. Initiate a + // connection request. Verify that the oldest of the existing + // connections is dropped. + + let network = Network::Main; + let num_init_peers_outgoing = 5; + let test_setup = setup(num_init_peers_outgoing).await; + let TestSetup { + mut peer_to_main_rx, + miner_to_main_rx: _, + rpc_server_to_main_rx: _, + task_join_handles, + mut main_loop_handler, + mut main_to_peer_rx, + } = test_setup; + + let mocked_cli = cli_args::Args { + max_num_peers: num_init_peers_outgoing as u16 + 1, + bootstrap: true, + network, + ..Default::default() + }; + main_loop_handler + .global_state_lock + .set_cli(mocked_cli) + .await; + + let mut mutable_main_loop_state = MutableMainLoopState::new(task_join_handles); + + // check sanity: at startup, we are connected to the initial number of peers + assert_eq!( + num_init_peers_outgoing as usize, + main_loop_handler + .global_state_lock + .lock_guard() + .await + .net + .peer_map + .len() + ); + + // randomize "connection established" timestamps + let mut rng = thread_rng(); + let now = SystemTime::now(); + let now_as_unix_timestamp = now.duration_since(UNIX_EPOCH).unwrap(); + main_loop_handler + .global_state_lock + .lock_guard_mut() + .await + .net + .peer_map + .iter_mut() + .for_each(|(_socket_address, peer_info)| { + peer_info.set_connection_established( + UNIX_EPOCH + + Duration::from_millis( + rng.gen_range(0..(now_as_unix_timestamp.as_millis() as u64)), + ), + ); + }); + + // compute which peer will be dropped, for later reference + let expected_drop_peer_socket_address = main_loop_handler + .global_state_lock + .lock_guard() + .await + .net + .peer_map + .iter() + .min_by(|l, r| { + l.1.connection_established() + .cmp(&r.1.connection_established()) + }) + .map(|(socket_address, _peer_info)| socket_address) + .cloned() + .unwrap(); + + // simulate incoming connection + let (peer_handshake_data, peer_socket_address) = + get_dummy_peer_connection_data_genesis(network, 1).await; + let own_handshake_data = main_loop_handler + .global_state_lock + .lock_guard() + .await + .get_own_handshakedata() + .await; + assert_eq!(peer_handshake_data.network, own_handshake_data.network,); + assert_eq!(peer_handshake_data.version, own_handshake_data.version,); + let mock_stream = tokio_test::io::Builder::new() + .read( + &to_bytes(&PeerMessage::Handshake(Box::new(( + crate::MAGIC_STRING_REQUEST.to_vec(), + peer_handshake_data.clone(), + )))) + .unwrap(), + ) + .write( + &to_bytes(&PeerMessage::Handshake(Box::new(( + crate::MAGIC_STRING_RESPONSE.to_vec(), + own_handshake_data.clone(), + )))) + .unwrap(), + ) + .write( + &to_bytes(&PeerMessage::ConnectionStatus(ConnectionStatus::Accepted)).unwrap(), + ) + .build(); + let peer_to_main_tx_clone = main_loop_handler.peer_task_to_main_tx.clone(); + let global_state_lock_clone = main_loop_handler.global_state_lock.clone(); + let (_main_to_peer_tx_mock, main_to_peer_rx_mock) = tokio::sync::broadcast::channel(10); + let incoming_peer_task_handle = tokio::task::Builder::new() + .name("answer_peer_wrapper") + .spawn(async move { + match answer_peer( + mock_stream, + global_state_lock_clone, + peer_socket_address, + main_to_peer_rx_mock, + peer_to_main_tx_clone, + own_handshake_data, + ) + .await + { + Ok(()) => (), + Err(err) => error!("Got error: {:?}", err), + } + }) + .unwrap(); + + // `answer_peer_wrapper` should send a + // `DisconnectFromLongestLivedPeer` message to main + let peer_to_main_message = peer_to_main_rx.recv().await.unwrap(); + assert!(matches!( + peer_to_main_message, + PeerTaskToMain::DisconnectFromLongestLivedPeer, + )); + + // process this message + main_loop_handler + .handle_peer_task_message(peer_to_main_message, &mut mutable_main_loop_state) + .await + .unwrap(); + + // main loop should send a `Disconnect` message + let main_to_peers_message = main_to_peer_rx.recv().await.unwrap(); + assert!(matches!( + main_to_peers_message, + MainToPeerTask::Disconnect(_) + )); + let MainToPeerTask::Disconnect(observed_drop_peer_socket_address) = + main_to_peers_message + else { + unreachable!() + }; + + // matched observed droppee against expectation + assert_eq!( + expected_drop_peer_socket_address, + observed_drop_peer_socket_address + ); + + println!( + "Dropped connection with {}.", + expected_drop_peer_socket_address + ); + + // don't forget to terminate the peer task, which is still running + incoming_peer_task_handle.abort(); + } + } } diff --git a/src/models/peer.rs b/src/models/peer.rs index 3d77e6327..329c016bf 100644 --- a/src/models/peer.rs +++ b/src/models/peer.rs @@ -115,6 +115,11 @@ impl PeerInfo { .port_for_incoming_connections .map(|port| SocketAddr::new(self.peer_connection_info.connected_address.ip(), port)) } + + #[cfg(test)] + pub(crate) fn set_connection_established(&mut self, new_timestamp: SystemTime) { + self.connection_established = new_timestamp; + } } trait Sanction { diff --git a/src/tests/shared.rs b/src/tests/shared.rs index 2bf3e8635..71e4ba9af 100644 --- a/src/tests/shared.rs +++ b/src/tests/shared.rs @@ -145,7 +145,7 @@ pub(crate) fn get_dummy_peer(address: SocketAddr) -> PeerInfo { } pub fn get_dummy_version() -> String { - "0.1.0".to_string() + "0.0.10".to_string() } /// Return a handshake object with a randomly set instance ID From c2d556a7395f81a23f47605c536b09aef3c92f38 Mon Sep 17 00:00:00 2001 From: Alan Szepieniec Date: Wed, 27 Nov 2024 17:53:56 +0100 Subject: [PATCH 5/5] fix: Condition disconnect on CLI flag --- src/connect_to_peers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connect_to_peers.rs b/src/connect_to_peers.rs index 20be723fb..34d232b78 100644 --- a/src/connect_to_peers.rs +++ b/src/connect_to_peers.rs @@ -276,7 +276,7 @@ where info!("Connection accepted from {}", peer_address); // If necessary, disconnect from another, existing peer. - if acceptance_code == ConnectionStatus::AcceptedMaxReached { + if acceptance_code == ConnectionStatus::AcceptedMaxReached && state.cli().bootstrap { info!("Maximum # peers reached, so disconnecting from an existing peer."); peer_task_to_main_tx .send(PeerTaskToMain::DisconnectFromLongestLivedPeer)