Skip to content

Commit

Permalink
Merge pull request #10 from rustaceanrob/stateless-peers
Browse files Browse the repository at this point in the history
db: add in-memory peer db
  • Loading branch information
rustaceanrob authored Jun 21, 2024
2 parents 74bc127 + ed825d7 commit 711dad1
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 18 deletions.
23 changes: 14 additions & 9 deletions example/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
//! the underlying machine, there is no privacy if the connected node is also used to broadcast
//! transactions to the Bitcoin P2P network.

use bitcoin::BlockHash;
use kyoto::chain::checkpoints::SIGNET_HEADER_CP;
use kyoto::db::memory::peers::StatelessPeerStore;
use kyoto::node::messages::NodeMessage;
use kyoto::BlockHash;
use kyoto::{chain::checkpoints::HeaderCheckpoint, node::builder::NodeBuilder};
use std::collections::HashSet;
use std::{
Expand All @@ -25,8 +27,13 @@ async fn main() {
.into();
let mut addresses = HashSet::new();
addresses.insert(address);
// If you don't have any checkpoint stored yet, you can use a predefined header.
let (height, hash) = SIGNET_HEADER_CP.last().unwrap();
let anchor = HeaderCheckpoint::new(*height, BlockHash::from_str(hash).unwrap());
// Define a peer to connect to
let peer = IpAddr::V4(Ipv4Addr::new(95, 217, 198, 121));
// Limited devices may not save any peers to disk
let peer_store = StatelessPeerStore::new();
// Create a new node builder
let builder = NodeBuilder::new(bitcoin::Network::Signet);
// Add node preferences and build the node/client
Expand All @@ -36,15 +43,13 @@ async fn main() {
// The Bitcoin scripts to monitor
.add_scripts(addresses)
// Only scan blocks strictly after an anchor checkpoint
.anchor_checkpoint(HeaderCheckpoint::new(
190_000,
BlockHash::from_str("0000013a6143b7360b7ba3834316b3265ee9072dde440bd45f99c01c42abaef2")
.unwrap(),
))
.anchor_checkpoint(anchor)
// The number of connections we would like to maintain
.num_required_peers(1)
// Create the node and client without the usual SQL databases
.build_node_with_custom_databases((), ())
.num_required_peers(2)
// We only maintain a list of 32 peers in memory
.peer_db_size(32)
// Build without the default databases
.build_node_with_custom_databases(peer_store, ())
.await;
// Run the node
tokio::task::spawn(async move { node.run().await });
Expand Down
2 changes: 2 additions & 0 deletions src/db/memory/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/// In-memory peer storage.
pub mod peers;
99 changes: 99 additions & 0 deletions src/db/memory/peers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::{collections::HashMap, net::IpAddr};

use async_trait::async_trait;
use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng};

use crate::db::{error::DatabaseError, traits::PeerStore, PersistedPeer};

/// A simple peer store that does not save state in between sessions.
/// If DNS is not enabled, a node will require at least one peer to connect to.
/// Thereafter, the node will find peers to connect to throughout the session.
pub struct StatelessPeerStore {
list: HashMap<IpAddr, PersistedPeer>,
}

impl StatelessPeerStore {
pub fn new() -> Self {
Self {
list: HashMap::new(),
}
}
}

#[async_trait]
impl PeerStore for StatelessPeerStore {
async fn update(&mut self, peer: PersistedPeer, replace: bool) -> Result<(), DatabaseError> {
// Don't add back peers we already connected to this session.
if peer.tried {
return Ok(());
}
if replace {
// We are banning a peer and want to keep track of who we don't get along with
self.list.insert(peer.addr, peer);
} else {
self.list.entry(peer.addr).or_insert(peer);
}
Ok(())
}

async fn random(&mut self) -> Result<PersistedPeer, DatabaseError> {
let mut rng = StdRng::from_entropy();
let random_peer = {
let iter = self.list.iter_mut().filter(|(_, peer)| !peer.banned);
iter.choose(&mut rng).map(|(key, _)| *key)
};
match random_peer {
Some(ip) => self.list.remove(&ip).ok_or(DatabaseError::LoadError),
None => Err(DatabaseError::LoadError),
}
}

async fn num_unbanned(&mut self) -> Result<u32, DatabaseError> {
Ok(self.list.iter().filter(|(_, peer)| !peer.banned).count() as u32)
}
}

impl Default for StatelessPeerStore {
fn default() -> Self {
Self::new()
}
}

#[cfg(test)]
mod tests {
use std::net::Ipv4Addr;

use bitcoin::p2p::ServiceFlags;

use super::*;

#[tokio::test]
async fn test_stateless_store() {
let mut peer_store = StatelessPeerStore::new();
let ip_1 = IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1));
let ip_2 = IpAddr::V4(Ipv4Addr::new(2, 2, 2, 2));
let ip_3 = IpAddr::V4(Ipv4Addr::new(3, 3, 3, 3));
let peer_1 = PersistedPeer::new(ip_1, 0, ServiceFlags::NONE, false, false);
let peer_2 = PersistedPeer::new(ip_2, 0, ServiceFlags::NONE, false, false);
let peer_3 = PersistedPeer::new(ip_3, 0, ServiceFlags::NONE, false, false);
let try_peer_2 = PersistedPeer::new(ip_2, 0, ServiceFlags::NONE, true, false);
let ban_peer_1 = PersistedPeer::new(ip_1, 0, ServiceFlags::NONE, false, true);
peer_store.update(peer_1, false).await.unwrap();
assert_eq!(peer_store.num_unbanned().await.unwrap(), 1);
peer_store.update(peer_2, false).await.unwrap();
assert_eq!(peer_store.num_unbanned().await.unwrap(), 2);
peer_store.update(peer_3, false).await.unwrap();
assert_eq!(peer_store.num_unbanned().await.unwrap(), 3);
peer_store.update(try_peer_2, true).await.unwrap();
assert_eq!(peer_store.num_unbanned().await.unwrap(), 3);
peer_store.update(ban_peer_1, true).await.unwrap();
assert_eq!(peer_store.num_unbanned().await.unwrap(), 2);
let _ = peer_store.random().await.unwrap();
assert_eq!(peer_store.num_unbanned().await.unwrap(), 1);
let _ = peer_store.random().await.unwrap();
assert_eq!(peer_store.num_unbanned().await.unwrap(), 0);
assert_eq!(peer_store.list.len(), 1);
let last_peer = peer_store.random().await;
assert!(last_peer.is_err());
}
}
4 changes: 3 additions & 1 deletion src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ use bitcoin::p2p::ServiceFlags;

/// Errors a database backend may produce.
pub mod error;
/// In-memory persistence trait implementations for light-weight nodes running on constrained or semi-trusted setups.
pub mod memory;
pub(crate) mod peer_man;
#[cfg(feature = "database")]
pub(crate) mod sqlite;
/// Traits that define the header and peer databases.
pub mod traits;

/// A peer that will be saved to the [`traits::PeerStore`].
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct PersistedPeer {
/// Canonical IP address of this peer.
pub addr: IpAddr,
Expand Down
16 changes: 15 additions & 1 deletion src/db/peer_man.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,22 @@ pub(crate) struct PeerManager {
netgroups: HashSet<String>,
network: Network,
default_port: u16,
target_size: u32,
}

impl PeerManager {
pub(crate) fn new(db: impl PeerStore + Send + Sync + 'static, network: &Network) -> Self {
pub(crate) fn new(
db: impl PeerStore + Send + Sync + 'static,
network: &Network,
target_size: u32,
) -> Self {
let default_port = default_port_from_network(network);
Self {
db: Arc::new(Mutex::new(db)),
netgroups: HashSet::new(),
network: *network,
default_port,
target_size,
}
}

Expand All @@ -42,6 +48,14 @@ impl PeerManager {
Ok((next.addr, next.port))
}

pub(crate) async fn need_peers(&mut self) -> Result<bool, PeerManagerError> {
let peer_count = self.peer_count().await?;
if peer_count < self.target_size {
return Ok(true);
}
Ok(false)
}

#[cfg(feature = "dns")]
pub(crate) async fn bootstrap(&mut self) -> Result<(), PeerManagerError> {
use crate::peers::dns::Dns;
Expand Down
4 changes: 2 additions & 2 deletions src/db/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub trait HeaderStore {
async fn height_of<'a>(&mut self, hash: &'a BlockHash) -> Result<Option<u32>, DatabaseError>;
}

// Do nothing
/// This is a simple wrapper for the unit type, signifying that no headers will be stored between sessions.
#[async_trait]
impl HeaderStore for () {
async fn load(&mut self, _anchor_height: u32) -> Result<BTreeMap<u32, Header>, DatabaseError> {
Expand Down Expand Up @@ -64,7 +64,7 @@ pub trait PeerStore {
/// Add a peer to the database, defining if it should be replaced or not.
async fn update(&mut self, peer: PersistedPeer, replace: bool) -> Result<(), DatabaseError>;

/// Get any peer from the database, selected at random.
/// Get any peer from the database, selected at random. If no peers exist, an error is thrown.
async fn random(&mut self) -> Result<PersistedPeer, DatabaseError>;

/// The number of peers in the database that are not marked as banned.
Expand Down
18 changes: 16 additions & 2 deletions src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{collections::HashSet, net::IpAddr, path::PathBuf};

use bitcoin::{Network, ScriptBuf};

use crate::db::error::DatabaseError;
use crate::{
chain::checkpoints::HeaderCheckpoint,
db::traits::{HeaderStore, PeerStore},
Expand All @@ -24,13 +25,13 @@ impl NodeBuilder {
}
}

/// Add preferred and most likely trusted peers to try to connect to.
/// Add preferred peers to try to connect to.
pub fn add_peers(mut self, whitelist: Vec<(IpAddr, u16)>) -> Self {
self.config.white_list = Some(whitelist);
self
}

/// Add Bitcoin scripts to monitor for.
/// Add Bitcoin scripts to monitor for. You may add more later with the [`Client`].
pub fn add_scripts(mut self, addresses: HashSet<ScriptBuf>) -> Self {
self.config.addresses = addresses;
self
Expand All @@ -43,11 +44,22 @@ impl NodeBuilder {
}

/// Add the minimum number of peer connections that should be maintained by the node.
/// Adding more connections increases the node's anonymity, but requires waiting for more responses,
/// higher bandwidth, and higher memory requirements.
pub fn num_required_peers(mut self, num_peers: u8) -> Self {
self.config.required_peers = num_peers;
self
}

/// Set the desired number of peers for the database to keep track of. For limited or in-memory peer storage,
/// this number may be small, however a sufficient margin of peers should be set so the node can try many options
/// when downloading compact block filters. For nodes that store peers on disk, more peers will typically result in
/// fewer errors.
pub fn peer_db_size(mut self, target_num: u32) -> Self {
self.config.target_peer_size = target_num;
self
}

/// Add a checkpoint for the node to look for relevant blocks _strictly after_ the given height.
/// This may be from the same [`HeaderCheckpoint`] every time the node is ran, or from the last known sync height.
/// In the case of a block reorganization, the node may scan for blocks below the given block height
Expand All @@ -66,6 +78,7 @@ impl NodeBuilder {
SqliteHeaderDb::new(self.network, self.config.data_path.clone()).unwrap();
Node::new_from_config(&self.config, self.network, peer_store, header_store)
.await
.map_err(|_| DatabaseError::LoadError)
.unwrap()
}

Expand All @@ -76,6 +89,7 @@ impl NodeBuilder {
) -> (Node, Client) {
Node::new_from_config(&self.config, self.network, peer_store, header_store)
.await
.map_err(|_| DatabaseError::LoadError)
.unwrap()
}
}
4 changes: 4 additions & 0 deletions src/node/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ use bitcoin::ScriptBuf;

use crate::chain::checkpoints::HeaderCheckpoint;

const TARGET_PEER_DB_SIZE: u32 = 256;

pub(crate) struct NodeConfig {
pub required_peers: u8,
pub white_list: Option<Vec<(IpAddr, u16)>>,
pub addresses: HashSet<ScriptBuf>,
pub data_path: Option<PathBuf>,
pub header_checkpoint: Option<HeaderCheckpoint>,
pub target_peer_size: u32,
}

impl Default for NodeConfig {
Expand All @@ -20,6 +23,7 @@ impl Default for NodeConfig {
addresses: Default::default(),
data_path: Default::default(),
header_checkpoint: Default::default(),
target_peer_size: TARGET_PEER_DB_SIZE,
}
}
}
25 changes: 22 additions & 3 deletions src/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,14 @@ pub struct Node {
}

impl Node {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn new(
network: Network,
white_list: Whitelist,
scripts: HashSet<ScriptBuf>,
header_checkpoint: Option<HeaderCheckpoint>,
required_peers: usize,
target_peer_size: u32,
peer_store: impl PeerStore + Send + Sync + 'static,
header_store: impl HeaderStore + Send + Sync + 'static,
) -> Result<(Self, Client), NodeError> {
Expand All @@ -95,7 +97,11 @@ impl Node {
// We always assume we are behind
let state = Arc::new(RwLock::new(NodeState::Behind));
// Configure the address manager
let peer_man = Arc::new(Mutex::new(PeerManager::new(peer_store, &network)));
let peer_man = Arc::new(Mutex::new(PeerManager::new(
peer_store,
&network,
target_peer_size,
)));
// Prepare the header checkpoints for the chain source
let mut checkpoints = HeaderCheckpoints::new(&network);
let checkpoint = header_checkpoint.unwrap_or_else(|| checkpoints.last());
Expand Down Expand Up @@ -148,6 +154,7 @@ impl Node {
config.addresses.clone(),
config.header_checkpoint,
config.required_peers as usize,
config.target_peer_size,
peer_store,
header_store,
)
Expand Down Expand Up @@ -230,6 +237,9 @@ impl Node {
node_map.set_height(peer_thread.nonce, version.height as u32);
let best = *node_map.best_height().unwrap_or(&0);
let response = self.handle_version(version, best).await;
if self.need_peers().await? {
node_map.send_message(peer_thread.nonce, MainThreadMessage::GetAddr).await;
}
node_map.send_message(peer_thread.nonce, response).await;
self.dialog.send_dialog(format!("[Peer {}]: version", peer_thread.nonce))
.await;
Expand Down Expand Up @@ -637,7 +647,7 @@ impl Node {
if let Some((ip, port)) = whitelist.pop() {
return {
self.dialog
.send_dialog("Using a peer from the white list".into())
.send_dialog(format!("Using a peer from the white list: {}", ip))
.await;
Ok((ip, Some(port)))
};
Expand All @@ -647,7 +657,7 @@ impl Node {
match peer_manager.next_peer().await {
Ok((ip, port)) => {
self.dialog
.send_dialog("Found an existing peer in the database".into())
.send_dialog(format!("Found an existing peer in the database: {}", ip))
.await;
Ok((ip, Some(port)))
}
Expand Down Expand Up @@ -683,4 +693,13 @@ impl Node {
}
}
}

async fn need_peers(&mut self) -> Result<bool, NodeError> {
self.peer_man
.lock()
.await
.need_peers()
.await
.map_err(|_| NodeError::LoadError(PersistenceError::PeerLoadFailure))
}
}

0 comments on commit 711dad1

Please sign in to comment.