From f0a32543bf303e87e6f03137bfaa9fd6b8f4456b Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Tue, 18 Jun 2024 17:23:20 -1000 Subject: [PATCH] node: update node with new peer db --- example/signet.rs | 4 +- src/db/sqlite/peer_db.rs | 199 +-------------------------------------- src/node/node.rs | 141 +++++++++++++-------------- src/peers/reader.rs | 2 +- 4 files changed, 71 insertions(+), 275 deletions(-) diff --git a/example/signet.rs b/example/signet.rs index 1c7be67..65f09a3 100644 --- a/example/signet.rs +++ b/example/signet.rs @@ -22,13 +22,13 @@ async fn main() { addresses.insert(address); // Add preferred peers to connect to let peer = IpAddr::V4(Ipv4Addr::new(95, 217, 198, 121)); - let peer_2 = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100)); + // let peer_2 = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100)); // Create a new node builder let builder = NodeBuilder::new(bitcoin::Network::Signet); // Add node preferences and build the node/client let (mut node, mut client) = builder // Add the peers - .add_peers(vec![(peer, 38333), (peer_2, 38333)]) + .add_peers(vec![(peer, 38333)]) // The Bitcoin scripts to monitor .add_scripts(addresses) // Only scan blocks strictly after an anchor checkpoint diff --git a/src/db/sqlite/peer_db.rs b/src/db/sqlite/peer_db.rs index 9c1e6aa..9c3b020 100644 --- a/src/db/sqlite/peer_db.rs +++ b/src/db/sqlite/peer_db.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use bitcoin::p2p::{Address, ServiceFlags}; +use bitcoin::p2p::ServiceFlags; use bitcoin::Network; use rusqlite::params; use rusqlite::{Connection, Result}; @@ -12,31 +12,6 @@ use tokio::sync::Mutex; use crate::db::error::DatabaseError; use crate::db::traits::PeerStore; use crate::db::PersistedPeer; -use crate::prelude::default_port_from_network; - -const MAXIMUM_TABLE_SIZE: i64 = 256; - -const NEW_SCHEMA: &str = "CREATE TABLE IF NOT EXISTS peers ( - ip_addr TEXT PRIMARY KEY, - port INTEGER NOT NULL, - last_seen INTEGER -)"; - -const TRIED_SCHEMA: &str = "CREATE TABLE IF NOT EXISTS peers ( - ip_addr TEXT PRIMARY KEY, - services INTEGER NOT NULL, - port INTEGER NOT NULL -)"; - -const CPF_SCHEMA: &str = "CREATE TABLE IF NOT EXISTS cpfpeers ( - ip_addr TEXT PRIMARY KEY, - port INTEGER NOT NULL -)"; - -const BAN_SCHEMA: &str = "CREATE TABLE IF NOT EXISTS banned ( - ip_addr TEXT PRIMARY KEY, - banned_until INTEGER -)"; const PEER_SCHEMA: &str = "CREATE TABLE IF NOT EXISTS peers ( ip_addr TEXT PRIMARY KEY, @@ -48,176 +23,10 @@ const PEER_SCHEMA: &str = "CREATE TABLE IF NOT EXISTS peers ( #[derive(Debug)] pub(crate) struct SqlitePeerDb { - default_port: u16, - conn_new: Arc>, - conn_tried: Arc>, - conn_evict: Arc>, - conn_cpf: Arc>, - conn_ban: Arc>, -} - -impl SqlitePeerDb { - pub fn new(network: Network, path: Option) -> Result { - let default_port = default_port_from_network(&network); - let mut path = path.unwrap_or_else(|| PathBuf::from(".")); - path.push("data"); - path.push(network.to_string()); - if !path.exists() { - fs::create_dir_all(&path).unwrap(); - } - let conn_new = Connection::open(path.join("peers_new.db"))?; - let conn_tried = Connection::open(path.join("peers_tried.db"))?; - let conn_evict = Connection::open(path.join("peers_evict.db"))?; - let conn_cpf = Connection::open(path.join("peers_cpf.db"))?; - let conn_ban = Connection::open(path.join("peers_ban.db"))?; - conn_new.execute(NEW_SCHEMA, [])?; - conn_evict.execute(NEW_SCHEMA, [])?; - conn_tried.execute(TRIED_SCHEMA, [])?; - conn_cpf.execute(CPF_SCHEMA, [])?; - conn_ban.execute(BAN_SCHEMA, [])?; - Ok(Self { - default_port, - conn_new: Arc::new(Mutex::new(conn_new)), - conn_tried: Arc::new(Mutex::new(conn_tried)), - conn_evict: Arc::new(Mutex::new(conn_evict)), - conn_cpf: Arc::new(Mutex::new(conn_cpf)), - conn_ban: Arc::new(Mutex::new(conn_ban)), - }) - } - - pub async fn add_new( - &mut self, - ip: IpAddr, - port: Option, - last_seen: Option, - ) -> Result<()> { - let lock = self.conn_new.lock().await; - let new_count: i64 = lock.query_row("SELECT COUNT(*) FROM peers", [], |row| row.get(0))?; - let peer_banned = self.is_banned(&ip).await?; - - if new_count < MAXIMUM_TABLE_SIZE && !peer_banned { - lock.execute( - "INSERT OR IGNORE INTO peers (ip_addr, port, last_seen) VALUES (?1, ?2, ?3)", - params![ - ip.to_string(), - port.unwrap_or(self.default_port), - last_seen.unwrap_or(0), - ], - )?; - } else if !peer_banned { - self.evict_random_peer().await?; - lock.execute( - "INSERT OR IGNORE INTO peers (ip_addr, port, last_seen) VALUES (?1, ?2, ?3)", - params![ - ip.to_string(), - port.unwrap_or(self.default_port), - last_seen.unwrap_or(0), - ], - )?; - } - Ok(()) - } - - pub async fn add_cpf_peers(&mut self, peers: Vec
) -> Result<()> { - let mut lock = self.conn_cpf.lock().await; - let tx = lock.transaction()?; - for peer in peers { - let ip = peer - .socket_addr() - .expect("peers should have been screened") - .ip() - .to_string(); - tx.execute( - "INSERT OR IGNORE INTO cpfpeers (ip_addr, port) VALUES (?1, ?2)", - params![ip, peer.port], - )?; - } - tx.commit()?; - Ok(()) - } - - async fn is_banned(&self, ip_addr: &IpAddr) -> Result { - let lock = self.conn_ban.lock().await; - let mut stmt = lock.prepare("SELECT COUNT(*) FROM banned WHERE ip_addr = ?1")?; - let count: i64 = stmt.query_row(params![ip_addr.to_string()], |row| row.get(0))?; - Ok(count > 0) - } - - async fn evict_random_peer(&self) -> Result<()> { - let lock = self.conn_new.lock().await; - let mut stmt = lock.prepare("SELECT ip_addr FROM peers ORDER BY RANDOM() LIMIT 1")?; - let ip_addr: String = stmt.query_row([], |row| row.get(0))?; - - lock.execute( - "INSERT OR IGNORE INTO peers (ip_addr, port, last_seen) SELECT ip_addr, port, last_seen FROM peers WHERE ip_addr = ?1", - params![ip_addr], - )?; - - lock.execute("DELETE FROM peers WHERE ip_addr = ?1", params![ip_addr])?; - - Ok(()) - } - - pub async fn get_random_new(&mut self) -> Result> { - if let Ok(Some(peer)) = self.get_random_from_evict().await { - return Ok(Some(peer)); - } - let lock = self.conn_new.lock().await; - let mut stmt = lock.prepare("SELECT ip_addr, port FROM peers ORDER BY RANDOM() LIMIT 1")?; - let mut rows = stmt.query([])?; - if let Some(row) = rows.next()? { - let ip_addr: String = row.get(0)?; - let port: u16 = row.get(1)?; - let ip = ip_addr - .parse::() - .map_err(|_| rusqlite::Error::InvalidQuery)?; - Ok(Some((ip, port))) - } else { - Ok(None) - } - } - - async fn get_random_from_evict(&mut self) -> Result> { - let lock = self.conn_evict.lock().await; - let mut stmt = lock.prepare("SELECT ip_addr, port FROM peers ORDER BY RANDOM() LIMIT 1")?; - let mut rows = stmt.query([])?; - if let Some(row) = rows.next()? { - let ip_addr: String = row.get(0)?; - let port: u16 = row.get(1)?; - lock.execute("DELETE FROM peers WHERE ip_addr = ?1", [&ip_addr])?; - let ip = ip_addr - .parse::() - .map_err(|_| rusqlite::Error::InvalidQuery)?; - Ok(Some((ip, port))) - } else { - Ok(None) - } - } - - pub async fn get_random_cpf_peer(&mut self) -> Result> { - let lock = self.conn_cpf.lock().await; - let mut stmt = - lock.prepare("SELECT ip_addr, port FROM cpfpeers ORDER BY RANDOM() LIMIT 1")?; - let mut rows = stmt.query([])?; - if let Some(row) = rows.next()? { - let ip_addr: String = row.get(0)?; - let port: u16 = row.get(1)?; - let ip = ip_addr - .parse::() - .map_err(|_| rusqlite::Error::InvalidQuery)?; - Ok(Some((ip, port))) - } else { - Ok(None) - } - } -} - -#[derive(Debug)] -pub(crate) struct SqlPeerDb { conn: Arc>, } -impl SqlPeerDb { +impl SqlitePeerDb { pub fn new(network: Network, path: Option) -> Result { let mut path = path.unwrap_or_else(|| PathBuf::from(".")); path.push("data"); @@ -227,7 +36,7 @@ impl SqlPeerDb { } let conn = Connection::open(path.join("peers.db")).map_err(|_| DatabaseError::WriteError)?; - conn.execute(NEW_SCHEMA, []) + conn.execute(PEER_SCHEMA, []) .map_err(|_| DatabaseError::WriteError)?; Ok(Self { conn: Arc::new(Mutex::new(conn)), @@ -236,7 +45,7 @@ impl SqlPeerDb { } #[async_trait] -impl PeerStore for SqlPeerDb { +impl PeerStore for SqlitePeerDb { async fn update(&mut self, peer: PersistedPeer) -> Result<(), DatabaseError> { let lock = self.conn.lock().await; let stmt = if !peer.tried { diff --git a/src/node/node.rs b/src/node/node.rs index 396fd02..b14a62e 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -14,7 +14,6 @@ use bitcoin::{ }, Block, Network, ScriptBuf, }; -use rand::{prelude::SliceRandom, rngs::StdRng, SeedableRng}; use tokio::sync::{broadcast, mpsc::Receiver, Mutex, RwLock}; use tokio::{ select, @@ -27,10 +26,9 @@ use crate::{ checkpoints::{HeaderCheckpoint, HeaderCheckpoints}, error::HeaderSyncError, }, - db::sqlite::header_db::SqliteHeaderDb, + db::{peer_man::PeerManager, sqlite::header_db::SqliteHeaderDb}, filters::cfheader_chain::CFHeaderSyncResult, node::{error::PersistenceError, peer_map::PeerMap}, - peers::dns::Dns, TxBroadcastPolicy, }; @@ -70,7 +68,7 @@ pub enum NodeState { pub struct Node { state: Arc>, chain: Arc>, - peer_db: Arc>, + peer_man: Arc>, required_peers: usize, white_list: Whitelist, network: Network, @@ -95,16 +93,17 @@ impl Node { // We always assume we are behind let state = Arc::new(RwLock::new(NodeState::Behind)); // Load the databases + // Configure the address manager let peer_db = SqlitePeerDb::new(network, data_path.clone()) .map_err(|_| NodeError::LoadError(PersistenceError::PeerLoadFailure))?; - let peer_db = Arc::new(Mutex::new(peer_db)); + let peer_man = Arc::new(Mutex::new(PeerManager::new(peer_db, &network))); + // Load the headers from storage + let db = SqliteHeaderDb::new(network, data_path) + .map_err(|_| NodeError::LoadError(PersistenceError::HeaderLoadError))?; // Prepare the header checkpoints for the chain source let mut checkpoints = HeaderCheckpoints::new(&network); let checkpoint = header_checkpoint.unwrap_or_else(|| checkpoints.last()); checkpoints.prune_up_to(checkpoint); - // Load the headers from storage - let db = SqliteHeaderDb::new(network, data_path) - .map_err(|_| NodeError::LoadError(PersistenceError::HeaderLoadError))?; // A structured way to talk to the client let mut dialog = Dialog::new(ntx); // Build the chain @@ -129,7 +128,7 @@ impl Node { Self { state, chain, - peer_db, + peer_man, required_peers, white_list, network, @@ -408,14 +407,31 @@ impl Node { } async fn handle_new_addrs(&mut self, new_peers: Vec
) { - let mut chain = self.peer_db.lock().await; - if let Err(e) = chain.add_cpf_peers(new_peers).await { - self.dialog - .send_warning(format!( - "Encountered error adding peer to the database: {}", - e - )) - .await; + self.dialog + .send_dialog(format!( + "Adding {} new peers to the peer database", + new_peers.len() + )) + .await; + let mut lock = self.peer_man.lock().await; + for addr in new_peers { + if let Err(e) = lock + .add_new_peer( + addr.socket_addr() + .expect("IP should have been screened") + .ip(), + Some(addr.port), + Some(addr.services), + ) + .await + { + self.dialog + .send_warning(format!( + "Encountered error adding peer to the database: {}", + e + )) + .await; + } } } @@ -614,38 +630,10 @@ impl Node { } } - // First we search the whitelist for peers that we trust. Then, depending on the state - // we either need to catch up on block headers or we may start requesting filters and blocks. - // When requesting filters, we try to select peers that have signaled for CF support. + // First we search the whitelist for peers that we trust. If we don't have any more whitelisted peers, + // we try to get a new peer from the peer manager. If that fails and our database is empty, we try DNS. + // Otherwise, the node throws an error. async fn next_peer(&mut self) -> Result<(IpAddr, Option), NodeError> { - let state = *self.state.read().await; - match state { - NodeState::Behind => self.any_peer().await, - _ => match self.cpf_peer().await { - Ok(got_peer) => match got_peer { - Some(peer) => Ok(peer), - None => self.any_peer().await, - }, - Err(e) => Err(e), - }, - } - // self.any_peer().await - } - - async fn cpf_peer(&mut self) -> Result)>, NodeError> { - let mut chain = self.peer_db.lock().await; - if let Some(peer) = chain - .get_random_cpf_peer() - .await - .map_err(|_| NodeError::LoadError(PersistenceError::PeerLoadFailure))? - { - return Ok(Some((peer.0, Some(peer.1)))); - } - Ok(None) - } - - async fn any_peer(&mut self) -> Result<(IpAddr, Option), NodeError> { - // Empty the whitelist, if there is one if let Some(whitelist) = &mut self.white_list { if let Some((ip, port)) = whitelist.pop() { return { @@ -656,40 +644,39 @@ impl Node { }; } } - let mut chain = self.peer_db.lock().await; - // Try to get any new peer - let next_peer = chain - .get_random_new() - .await - .map_err(|_| NodeError::LoadError(PersistenceError::PeerLoadFailure))?; - match next_peer { - // We found some peer to use but may not be reachable - Some(peer) => { + let mut peer_manager = self.peer_man.lock().await; + match peer_manager.next_peer().await { + Ok((ip, port)) => { self.dialog - .send_dialog(format!("Loaded peer from the database {}", peer.0)) + .send_dialog("Found an existing peer in the database".into()) .await; - Ok((peer.0, Some(peer.1))) + Ok((ip, Some(port))) } - // We have no peers in our DB, try DNS - None => { - let mut new_peers = Dns::bootstrap(self.network) + Err(_) => { + let current_count = peer_manager + .peer_count() .await - .map_err(|_| NodeError::DnsFailure)?; - let mut rng = StdRng::from_entropy(); - new_peers.shuffle(&mut rng); - // DNS fails if there is an insufficient number of peers - let ret_ip = new_peers[0]; - for peer in new_peers { - if let Err(e) = chain.add_new(peer, None, None).await { - self.dialog - .send_warning(format!( - "Encountered error adding a peer to the database: {}", - e - )) - .await; - } + .map_err(|_| NodeError::LoadError(PersistenceError::PeerLoadFailure))?; + if current_count < 1 { + self.dialog + .send_dialog( + "Peer count is less than one, using DNS to find new peers.".into(), + ) + .await; + peer_manager + .bootstrap() + .await + .map_err(|_| NodeError::DnsFailure)?; + let next_peer = peer_manager + .next_peer() + .await + .map_err(|_| NodeError::LoadError(PersistenceError::PeerLoadFailure))?; + return Ok((next_peer.0, Some(next_peer.1))); } - Ok((ret_ip, None)) + self.dialog + .send_warning("An error occured while finding a new peer".into()) + .await; + Err(NodeError::LoadError(PersistenceError::PeerLoadFailure)) } } } diff --git a/src/peers/reader.rs b/src/peers/reader.rs index e31aaeb..5e45afb 100644 --- a/src/peers/reader.rs +++ b/src/peers/reader.rs @@ -24,7 +24,7 @@ const ONE_MONTH: u64 = 2_500_000; const ONE_MINUTE: u64 = 60; // The peer must have sent at least 10 messages to trigger DOS const MINIMUM_DOS_THRESHOLD: u64 = 10; -// We allow up to 500 messages per second +// We allow up to 5000 messages per second const RATE_LIMIT: u64 = 5000; pub(crate) struct Reader {