diff --git a/example/memory.rs b/example/memory.rs index bbc2891..afa1cfb 100644 --- a/example/memory.rs +++ b/example/memory.rs @@ -3,8 +3,10 @@ //! the underlying machine, there is no privacy if the connected node is also used to broadcast //! transactions to the Bitcoin P2P network. -use bitcoin::BlockHash; +use kyoto::chain::checkpoints::SIGNET_HEADER_CP; +use kyoto::db::memory::peers::StatelessPeerStore; use kyoto::node::messages::NodeMessage; +use kyoto::BlockHash; use kyoto::{chain::checkpoints::HeaderCheckpoint, node::builder::NodeBuilder}; use std::collections::HashSet; use std::{ @@ -25,8 +27,13 @@ async fn main() { .into(); let mut addresses = HashSet::new(); addresses.insert(address); + // If you don't have any checkpoint stored yet, you can use a predefined header. + let (height, hash) = SIGNET_HEADER_CP.last().unwrap(); + let anchor = HeaderCheckpoint::new(*height, BlockHash::from_str(hash).unwrap()); // Define a peer to connect to let peer = IpAddr::V4(Ipv4Addr::new(95, 217, 198, 121)); + // Limited devices may not save any peers to disk + let peer_store = StatelessPeerStore::new(); // Create a new node builder let builder = NodeBuilder::new(bitcoin::Network::Signet); // Add node preferences and build the node/client @@ -36,15 +43,13 @@ async fn main() { // The Bitcoin scripts to monitor .add_scripts(addresses) // Only scan blocks strictly after an anchor checkpoint - .anchor_checkpoint(HeaderCheckpoint::new( - 190_000, - BlockHash::from_str("0000013a6143b7360b7ba3834316b3265ee9072dde440bd45f99c01c42abaef2") - .unwrap(), - )) + .anchor_checkpoint(anchor) // The number of connections we would like to maintain - .num_required_peers(1) - // Create the node and client without the usual SQL databases - .build_node_with_custom_databases((), ()) + .num_required_peers(2) + // We only maintain a list of 32 peers in memory + .peer_db_size(32) + // Build without the default databases + .build_node_with_custom_databases(peer_store, ()) .await; // Run the node tokio::task::spawn(async move { node.run().await }); diff --git a/src/db/memory/mod.rs b/src/db/memory/mod.rs new file mode 100644 index 0000000..28e9727 --- /dev/null +++ b/src/db/memory/mod.rs @@ -0,0 +1,2 @@ +/// In-memory peer storage. +pub mod peers; diff --git a/src/db/memory/peers.rs b/src/db/memory/peers.rs new file mode 100644 index 0000000..7c6527b --- /dev/null +++ b/src/db/memory/peers.rs @@ -0,0 +1,99 @@ +use std::{collections::HashMap, net::IpAddr}; + +use async_trait::async_trait; +use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng}; + +use crate::db::{error::DatabaseError, traits::PeerStore, PersistedPeer}; + +/// A simple peer store that does not save state in between sessions. +/// If DNS is not enabled, a node will require at least one peer to connect to. +/// Thereafter, the node will find peers to connect to throughout the session. +pub struct StatelessPeerStore { + list: HashMap, +} + +impl StatelessPeerStore { + pub fn new() -> Self { + Self { + list: HashMap::new(), + } + } +} + +#[async_trait] +impl PeerStore for StatelessPeerStore { + async fn update(&mut self, peer: PersistedPeer, replace: bool) -> Result<(), DatabaseError> { + // Don't add back peers we already connected to this session. + if peer.tried { + return Ok(()); + } + if replace { + // We are banning a peer and want to keep track of who we don't get along with + self.list.insert(peer.addr, peer); + } else { + self.list.entry(peer.addr).or_insert(peer); + } + Ok(()) + } + + async fn random(&mut self) -> Result { + let mut rng = StdRng::from_entropy(); + let random_peer = { + let iter = self.list.iter_mut().filter(|(_, peer)| !peer.banned); + iter.choose(&mut rng).map(|(key, _)| *key) + }; + match random_peer { + Some(ip) => self.list.remove(&ip).ok_or(DatabaseError::LoadError), + None => Err(DatabaseError::LoadError), + } + } + + async fn num_unbanned(&mut self) -> Result { + Ok(self.list.iter().filter(|(_, peer)| !peer.banned).count() as u32) + } +} + +impl Default for StatelessPeerStore { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use std::net::Ipv4Addr; + + use bitcoin::p2p::ServiceFlags; + + use super::*; + + #[tokio::test] + async fn test_stateless_store() { + let mut peer_store = StatelessPeerStore::new(); + let ip_1 = IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)); + let ip_2 = IpAddr::V4(Ipv4Addr::new(2, 2, 2, 2)); + let ip_3 = IpAddr::V4(Ipv4Addr::new(3, 3, 3, 3)); + let peer_1 = PersistedPeer::new(ip_1, 0, ServiceFlags::NONE, false, false); + let peer_2 = PersistedPeer::new(ip_2, 0, ServiceFlags::NONE, false, false); + let peer_3 = PersistedPeer::new(ip_3, 0, ServiceFlags::NONE, false, false); + let try_peer_2 = PersistedPeer::new(ip_2, 0, ServiceFlags::NONE, true, false); + let ban_peer_1 = PersistedPeer::new(ip_1, 0, ServiceFlags::NONE, false, true); + peer_store.update(peer_1, false).await.unwrap(); + assert_eq!(peer_store.num_unbanned().await.unwrap(), 1); + peer_store.update(peer_2, false).await.unwrap(); + assert_eq!(peer_store.num_unbanned().await.unwrap(), 2); + peer_store.update(peer_3, false).await.unwrap(); + assert_eq!(peer_store.num_unbanned().await.unwrap(), 3); + peer_store.update(try_peer_2, true).await.unwrap(); + assert_eq!(peer_store.num_unbanned().await.unwrap(), 3); + peer_store.update(ban_peer_1, true).await.unwrap(); + assert_eq!(peer_store.num_unbanned().await.unwrap(), 2); + let _ = peer_store.random().await.unwrap(); + assert_eq!(peer_store.num_unbanned().await.unwrap(), 1); + let _ = peer_store.random().await.unwrap(); + assert_eq!(peer_store.num_unbanned().await.unwrap(), 0); + assert_eq!(peer_store.list.len(), 1); + let last_peer = peer_store.random().await; + assert!(last_peer.is_err()); + } +} diff --git a/src/db/mod.rs b/src/db/mod.rs index 0c468aa..ef85ab0 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -4,6 +4,8 @@ use bitcoin::p2p::ServiceFlags; /// Errors a database backend may produce. pub mod error; +/// In-memory persistence trait implementations for light-weight nodes running on constrained or semi-trusted setups. +pub mod memory; pub(crate) mod peer_man; #[cfg(feature = "database")] pub(crate) mod sqlite; @@ -11,7 +13,7 @@ pub(crate) mod sqlite; pub mod traits; /// A peer that will be saved to the [`traits::PeerStore`]. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct PersistedPeer { /// Canonical IP address of this peer. pub addr: IpAddr, diff --git a/src/db/peer_man.rs b/src/db/peer_man.rs index c99f5a6..c3da1b3 100644 --- a/src/db/peer_man.rs +++ b/src/db/peer_man.rs @@ -13,16 +13,22 @@ pub(crate) struct PeerManager { netgroups: HashSet, network: Network, default_port: u16, + target_size: u32, } impl PeerManager { - pub(crate) fn new(db: impl PeerStore + Send + Sync + 'static, network: &Network) -> Self { + pub(crate) fn new( + db: impl PeerStore + Send + Sync + 'static, + network: &Network, + target_size: u32, + ) -> Self { let default_port = default_port_from_network(network); Self { db: Arc::new(Mutex::new(db)), netgroups: HashSet::new(), network: *network, default_port, + target_size, } } @@ -42,6 +48,14 @@ impl PeerManager { Ok((next.addr, next.port)) } + pub(crate) async fn need_peers(&mut self) -> Result { + let peer_count = self.peer_count().await?; + if peer_count < self.target_size { + return Ok(true); + } + Ok(false) + } + #[cfg(feature = "dns")] pub(crate) async fn bootstrap(&mut self) -> Result<(), PeerManagerError> { use crate::peers::dns::Dns; diff --git a/src/db/traits.rs b/src/db/traits.rs index 40051b0..49f924f 100644 --- a/src/db/traits.rs +++ b/src/db/traits.rs @@ -28,7 +28,7 @@ pub trait HeaderStore { async fn height_of<'a>(&mut self, hash: &'a BlockHash) -> Result, DatabaseError>; } -// Do nothing +/// This is a simple wrapper for the unit type, signifying that no headers will be stored between sessions. #[async_trait] impl HeaderStore for () { async fn load(&mut self, _anchor_height: u32) -> Result, DatabaseError> { @@ -64,7 +64,7 @@ pub trait PeerStore { /// Add a peer to the database, defining if it should be replaced or not. async fn update(&mut self, peer: PersistedPeer, replace: bool) -> Result<(), DatabaseError>; - /// Get any peer from the database, selected at random. + /// Get any peer from the database, selected at random. If no peers exist, an error is thrown. async fn random(&mut self) -> Result; /// The number of peers in the database that are not marked as banned. diff --git a/src/node/builder.rs b/src/node/builder.rs index 4639e48..c998398 100644 --- a/src/node/builder.rs +++ b/src/node/builder.rs @@ -2,6 +2,7 @@ use std::{collections::HashSet, net::IpAddr, path::PathBuf}; use bitcoin::{Network, ScriptBuf}; +use crate::db::error::DatabaseError; use crate::{ chain::checkpoints::HeaderCheckpoint, db::traits::{HeaderStore, PeerStore}, @@ -24,13 +25,13 @@ impl NodeBuilder { } } - /// Add preferred and most likely trusted peers to try to connect to. + /// Add preferred peers to try to connect to. pub fn add_peers(mut self, whitelist: Vec<(IpAddr, u16)>) -> Self { self.config.white_list = Some(whitelist); self } - /// Add Bitcoin scripts to monitor for. + /// Add Bitcoin scripts to monitor for. You may add more later with the [`Client`]. pub fn add_scripts(mut self, addresses: HashSet) -> Self { self.config.addresses = addresses; self @@ -43,11 +44,22 @@ impl NodeBuilder { } /// Add the minimum number of peer connections that should be maintained by the node. + /// Adding more connections increases the node's anonymity, but requires waiting for more responses, + /// higher bandwidth, and higher memory requirements. pub fn num_required_peers(mut self, num_peers: u8) -> Self { self.config.required_peers = num_peers; self } + /// Set the desired number of peers for the database to keep track of. For limited or in-memory peer storage, + /// this number may be small, however a sufficient margin of peers should be set so the node can try many options + /// when downloading compact block filters. For nodes that store peers on disk, more peers will typically result in + /// fewer errors. + pub fn peer_db_size(mut self, target_num: u32) -> Self { + self.config.target_peer_size = target_num; + self + } + /// Add a checkpoint for the node to look for relevant blocks _strictly after_ the given height. /// This may be from the same [`HeaderCheckpoint`] every time the node is ran, or from the last known sync height. /// In the case of a block reorganization, the node may scan for blocks below the given block height @@ -66,6 +78,7 @@ impl NodeBuilder { SqliteHeaderDb::new(self.network, self.config.data_path.clone()).unwrap(); Node::new_from_config(&self.config, self.network, peer_store, header_store) .await + .map_err(|_| DatabaseError::LoadError) .unwrap() } @@ -76,6 +89,7 @@ impl NodeBuilder { ) -> (Node, Client) { Node::new_from_config(&self.config, self.network, peer_store, header_store) .await + .map_err(|_| DatabaseError::LoadError) .unwrap() } } diff --git a/src/node/config.rs b/src/node/config.rs index 2ab30b6..286953a 100644 --- a/src/node/config.rs +++ b/src/node/config.rs @@ -4,12 +4,15 @@ use bitcoin::ScriptBuf; use crate::chain::checkpoints::HeaderCheckpoint; +const TARGET_PEER_DB_SIZE: u32 = 256; + pub(crate) struct NodeConfig { pub required_peers: u8, pub white_list: Option>, pub addresses: HashSet, pub data_path: Option, pub header_checkpoint: Option, + pub target_peer_size: u32, } impl Default for NodeConfig { @@ -20,6 +23,7 @@ impl Default for NodeConfig { addresses: Default::default(), data_path: Default::default(), header_checkpoint: Default::default(), + target_peer_size: TARGET_PEER_DB_SIZE, } } } diff --git a/src/node/node.rs b/src/node/node.rs index 34731a9..8949e66 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -79,12 +79,14 @@ pub struct Node { } impl Node { + #[allow(clippy::too_many_arguments)] pub(crate) async fn new( network: Network, white_list: Whitelist, scripts: HashSet, header_checkpoint: Option, required_peers: usize, + target_peer_size: u32, peer_store: impl PeerStore + Send + Sync + 'static, header_store: impl HeaderStore + Send + Sync + 'static, ) -> Result<(Self, Client), NodeError> { @@ -95,7 +97,11 @@ impl Node { // We always assume we are behind let state = Arc::new(RwLock::new(NodeState::Behind)); // Configure the address manager - let peer_man = Arc::new(Mutex::new(PeerManager::new(peer_store, &network))); + let peer_man = Arc::new(Mutex::new(PeerManager::new( + peer_store, + &network, + target_peer_size, + ))); // Prepare the header checkpoints for the chain source let mut checkpoints = HeaderCheckpoints::new(&network); let checkpoint = header_checkpoint.unwrap_or_else(|| checkpoints.last()); @@ -148,6 +154,7 @@ impl Node { config.addresses.clone(), config.header_checkpoint, config.required_peers as usize, + config.target_peer_size, peer_store, header_store, ) @@ -230,6 +237,9 @@ impl Node { node_map.set_height(peer_thread.nonce, version.height as u32); let best = *node_map.best_height().unwrap_or(&0); let response = self.handle_version(version, best).await; + if self.need_peers().await? { + node_map.send_message(peer_thread.nonce, MainThreadMessage::GetAddr).await; + } node_map.send_message(peer_thread.nonce, response).await; self.dialog.send_dialog(format!("[Peer {}]: version", peer_thread.nonce)) .await; @@ -637,7 +647,7 @@ impl Node { if let Some((ip, port)) = whitelist.pop() { return { self.dialog - .send_dialog("Using a peer from the white list".into()) + .send_dialog(format!("Using a peer from the white list: {}", ip)) .await; Ok((ip, Some(port))) }; @@ -647,7 +657,7 @@ impl Node { match peer_manager.next_peer().await { Ok((ip, port)) => { self.dialog - .send_dialog("Found an existing peer in the database".into()) + .send_dialog(format!("Found an existing peer in the database: {}", ip)) .await; Ok((ip, Some(port))) } @@ -683,4 +693,13 @@ impl Node { } } } + + async fn need_peers(&mut self) -> Result { + self.peer_man + .lock() + .await + .need_peers() + .await + .map_err(|_| NodeError::LoadError(PersistenceError::PeerLoadFailure)) + } }