From e3b581d7a89d4d420ac46100871652d38a288898 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Wed, 19 Jun 2024 08:23:28 -1000 Subject: [PATCH] crate: put sql db behind feature --- Cargo.toml | 16 ++++++--- example/memory.rs | 74 ++++++++++++++++++++++++++++++++++++++++ example/signet.rs | 7 ++-- src/db/mod.rs | 13 +++++-- src/db/peer_man.rs | 26 +++++++------- src/db/sqlite/peer_db.rs | 4 +-- src/db/traits.rs | 17 ++++++--- src/lib.rs | 5 +-- src/node/builder.rs | 22 ++++++++++-- src/node/node.rs | 27 +++++++-------- src/peers/peer.rs | 5 --- 11 files changed, 164 insertions(+), 52 deletions(-) create mode 100644 example/memory.rs diff --git a/Cargo.toml b/Cargo.toml index 52b268b..f3c942d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ dns-lookup = "2.0.0" # Enable the tokio-console task and poll observations # console-subscriber = "0.2.0" rand = "0.8.0" -rusqlite = { version = "0.31.0", features = ["bundled"] } +rusqlite = { version = "0.31.0", features = ["bundled"], optional = true } thiserror = { version = "1" } tokio = { version = "1", default-features = false, features = [ "rt-multi-thread", @@ -33,6 +33,10 @@ tokio = { version = "1", default-features = false, features = [ "macros", ] } +[features] +default = ["database"] +database = ["rusqlite"] + [dev-dependencies] hex = { version = "0.4.0" } tracing = "0.1" @@ -41,6 +45,10 @@ tokio = { version = "1", default-features = false, features = [ "full", ] } # add feature "tracing" to use the console +[lib] +name = "kyoto" +path = "src/lib.rs" + [[example]] name = "signet" path = "example/signet.rs" @@ -49,6 +57,6 @@ path = "example/signet.rs" name = "rescan" path = "example/rescan.rs" -[lib] -name = "kyoto" -path = "src/lib.rs" +[[example]] +name = "memory" +path = "example/memory.rs" diff --git a/example/memory.rs b/example/memory.rs new file mode 100644 index 0000000..11be18a --- /dev/null +++ b/example/memory.rs @@ -0,0 +1,74 @@ +use bitcoin::BlockHash; +use kyoto::node::messages::NodeMessage; +use kyoto::{chain::checkpoints::HeaderCheckpoint, node::builder::NodeBuilder}; +use std::collections::HashSet; +use std::{ + net::{IpAddr, Ipv4Addr}, + str::FromStr, +}; + +#[tokio::main] +async fn main() { + // Add third-party logging + let subscriber = tracing_subscriber::FmtSubscriber::new(); + tracing::subscriber::set_global_default(subscriber).unwrap(); + // Add Bitcoin scripts to scan the blockchain for + let address = bitcoin::Address::from_str("tb1q9pvjqz5u5sdgpatg3wn0ce438u5cyv85lly0pc") + .unwrap() + .require_network(bitcoin::Network::Signet) + .unwrap() + .into(); + let mut addresses = HashSet::new(); + addresses.insert(address); + // Define a peer to connect to + let peer = IpAddr::V4(Ipv4Addr::new(95, 217, 198, 121)); + // Create a new node builder + let builder = NodeBuilder::new(bitcoin::Network::Signet); + // Add node preferences and build the node/client + let (mut node, mut client) = builder + // Add the peer + .add_peers(vec![(peer, 38333)]) + // The Bitcoin scripts to monitor + .add_scripts(addresses) + // Only scan blocks strictly after an anchor checkpoint + .anchor_checkpoint(HeaderCheckpoint::new( + 190_000, + BlockHash::from_str("0000013a6143b7360b7ba3834316b3265ee9072dde440bd45f99c01c42abaef2") + .unwrap(), + )) + // The number of connections we would like to maintain + .num_required_peers(1) + // Create the node and client without the usual SQL databases + .build_node_with_custom_databases((), ()) + .await; + // Run the node + tokio::task::spawn(async move { node.run().await }); + // Split the client into components that send messages and listen to messages. + // With this construction, different parts of the program can take ownership of + // specific tasks. + let (mut sender, mut receiver) = client.split(); + // Continually listen for events until the node is synced to its peers. + loop { + if let Ok(message) = receiver.recv().await { + match message { + NodeMessage::Dialog(d) => tracing::info!("{}", d), + NodeMessage::Warning(e) => tracing::warn!("{}", e), + NodeMessage::Transaction(t) => drop(t), + NodeMessage::Block(b) => drop(b), + NodeMessage::BlocksDisconnected(r) => { + let _ = r; + } + NodeMessage::TxBroadcastFailure => { + tracing::error!("The transaction could not be broadcast.") + } + NodeMessage::Synced(tip) => { + tracing::info!("Synced chain up to block {}", tip.height,); + tracing::info!("Chain tip: {}", tip.hash.to_string(),); + break; + } + } + } + } + let _ = sender.shutdown().await; + tracing::info!("Shutting down"); +} diff --git a/example/signet.rs b/example/signet.rs index 65f09a3..f350922 100644 --- a/example/signet.rs +++ b/example/signet.rs @@ -21,8 +21,7 @@ async fn main() { let mut addresses = HashSet::new(); addresses.insert(address); // Add preferred peers to connect to - let peer = IpAddr::V4(Ipv4Addr::new(95, 217, 198, 121)); - // let peer_2 = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100)); + let peer = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100)); // Create a new node builder let builder = NodeBuilder::new(bitcoin::Network::Signet); // Add node preferences and build the node/client @@ -33,8 +32,8 @@ async fn main() { .add_scripts(addresses) // Only scan blocks strictly after an anchor checkpoint .anchor_checkpoint(HeaderCheckpoint::new( - 180_000, - BlockHash::from_str("0000000870f15246ba23c16e370a7ffb1fc8a3dcf8cb4492882ed4b0e3d4cd26") + 190_000, + BlockHash::from_str("0000013a6143b7360b7ba3834316b3265ee9072dde440bd45f99c01c42abaef2") .unwrap(), )) // The number of connections we would like to maintain diff --git a/src/db/mod.rs b/src/db/mod.rs index 32e7263..0c468aa 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -2,17 +2,26 @@ use std::net::IpAddr; use bitcoin::p2p::ServiceFlags; -pub(crate) mod error; +/// Errors a database backend may produce. +pub mod error; pub(crate) mod peer_man; +#[cfg(feature = "database")] pub(crate) mod sqlite; -pub(crate) mod traits; +/// Traits that define the header and peer databases. +pub mod traits; +/// A peer that will be saved to the [`traits::PeerStore`]. #[derive(Debug, Clone)] pub struct PersistedPeer { + /// Canonical IP address of this peer. pub addr: IpAddr, + /// The port believed to be listening for connections. pub port: u16, + /// The services this peer may offer. pub services: ServiceFlags, + /// Have we tried this peer before. pub tried: bool, + /// Did we ban this peer for faulty behavior. pub banned: bool, } diff --git a/src/db/peer_man.rs b/src/db/peer_man.rs index 7515226..062ef6a 100644 --- a/src/db/peer_man.rs +++ b/src/db/peer_man.rs @@ -56,13 +56,10 @@ impl PeerManager { // DNS fails if there is an insufficient number of peers for peer in new_peers { db_lock - .update(PersistedPeer::new( - peer, - self.default_port, - ServiceFlags::NONE, - false, - false, - )) + .update( + PersistedPeer::new(peer, self.default_port, ServiceFlags::NONE, false, false), + true, + ) .await .map_err(PeerManagerError::Database)?; } @@ -117,13 +114,16 @@ impl PeerManager { ) -> Result<(), PeerManagerError> { let mut db_lock = self.db.lock().await; db_lock - .update(PersistedPeer::new( - addr, - port.unwrap_or(self.default_port), - services.unwrap_or(ServiceFlags::NONE), + .update( + PersistedPeer::new( + addr, + port.unwrap_or(self.default_port), + services.unwrap_or(ServiceFlags::NONE), + tried, + ban, + ), tried, - ban, - )) + ) .await .map_err(PeerManagerError::Database)?; Ok(()) diff --git a/src/db/sqlite/peer_db.rs b/src/db/sqlite/peer_db.rs index 9c3b020..acf5b26 100644 --- a/src/db/sqlite/peer_db.rs +++ b/src/db/sqlite/peer_db.rs @@ -46,9 +46,9 @@ impl SqlitePeerDb { #[async_trait] impl PeerStore for SqlitePeerDb { - async fn update(&mut self, peer: PersistedPeer) -> Result<(), DatabaseError> { + async fn update(&mut self, peer: PersistedPeer, replace: bool) -> Result<(), DatabaseError> { let lock = self.conn.lock().await; - let stmt = if !peer.tried { + let stmt = if !replace { "INSERT OR IGNORE INTO peers (ip_addr, port, service_flags, tried, banned) VALUES (?1, ?2, ?3, ?4, ?5)" } else { "INSERT OR REPLACE INTO peers (ip_addr, port, service_flags, tried, banned) VALUES (?1, ?2, ?3, ?4, ?5)" diff --git a/src/db/traits.rs b/src/db/traits.rs index 439fbaa..d6003d1 100644 --- a/src/db/traits.rs +++ b/src/db/traits.rs @@ -10,21 +10,26 @@ use crate::prelude::default_port_from_network; use super::{error::DatabaseError, PersistedPeer}; +/// Methods required to persist the chain of block headers. #[async_trait] -pub(crate) trait HeaderStore { +pub trait HeaderStore { + /// Load all headers with heights *strictly after* the specified anchor height. async fn load(&mut self, anchor_height: u32) -> Result, DatabaseError>; + /// Write an indexed map of block headers to the database, ignoring if they already exist. async fn write<'a>( &mut self, header_chain: &'a BTreeMap, ) -> Result<(), DatabaseError>; + /// Write the headers to the database, replacing headers over the specified height. async fn write_over<'a>( &mut self, header_chain: &'a BTreeMap, height: u32, ) -> Result<(), DatabaseError>; + /// Return the height of a block hash in the database, if it exists. async fn height_of<'a>(&mut self, hash: &'a BlockHash) -> Result, DatabaseError>; } @@ -58,18 +63,22 @@ impl HeaderStore for () { } } +/// Methods that define a list of peers on the Bitcoin P2P network. #[async_trait] -pub(crate) trait PeerStore { - async fn update(&mut self, peer: PersistedPeer) -> Result<(), DatabaseError>; +pub trait PeerStore { + /// Add a peer to the database, defining if it should be replaced or not. + async fn update(&mut self, peer: PersistedPeer, replace: bool) -> Result<(), DatabaseError>; + /// Get any peer from the database, selected at random. async fn random(&mut self) -> Result; + /// The number of peers in the database that are not marked as banned. async fn num_unbanned(&mut self) -> Result; } #[async_trait] impl PeerStore for () { - async fn update(&mut self, _peer: PersistedPeer) -> Result<(), DatabaseError> { + async fn update(&mut self, _peer: PersistedPeer, _replace: bool) -> Result<(), DatabaseError> { Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index d1ded47..476f5b6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,9 +6,10 @@ //! and vetted. #![allow(dead_code)] -/// Strucutres related to the blockchain. +/// Strucutres and checkpoints related to the blockchain. pub mod chain; -mod db; +/// Traits and structures that define the data persistence required for a node. +pub mod db; mod filters; /// Tools to build and run a compact block filters node. pub mod node; diff --git a/src/node/builder.rs b/src/node/builder.rs index 26a982b..4639e48 100644 --- a/src/node/builder.rs +++ b/src/node/builder.rs @@ -2,7 +2,10 @@ use std::{collections::HashSet, net::IpAddr, path::PathBuf}; use bitcoin::{Network, ScriptBuf}; -use crate::chain::checkpoints::HeaderCheckpoint; +use crate::{ + chain::checkpoints::HeaderCheckpoint, + db::traits::{HeaderStore, PeerStore}, +}; use super::{client::Client, config::NodeConfig, node::Node}; @@ -55,8 +58,23 @@ impl NodeBuilder { } /// Consume the node builder and receive a [`Node`] and [`Client`]. + #[cfg(feature = "database")] pub async fn build_node(&self) -> (Node, Client) { - Node::new_from_config(&self.config, self.network) + use crate::db::sqlite::{header_db::SqliteHeaderDb, peer_db::SqlitePeerDb}; + let peer_store = SqlitePeerDb::new(self.network, self.config.data_path.clone()).unwrap(); + let header_store = + SqliteHeaderDb::new(self.network, self.config.data_path.clone()).unwrap(); + Node::new_from_config(&self.config, self.network, peer_store, header_store) + .await + .unwrap() + } + + pub async fn build_node_with_custom_databases( + &self, + peer_store: impl PeerStore + Send + Sync + 'static, + header_store: impl HeaderStore + Send + Sync + 'static, + ) -> (Node, Client) { + Node::new_from_config(&self.config, self.network, peer_store, header_store) .await .unwrap() } diff --git a/src/node/node.rs b/src/node/node.rs index b14a62e..10054e0 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -1,7 +1,6 @@ use std::{ collections::HashSet, net::IpAddr, - path::PathBuf, sync::{atomic::AtomicBool, Arc}, time::Duration, }; @@ -26,7 +25,10 @@ use crate::{ checkpoints::{HeaderCheckpoint, HeaderCheckpoints}, error::HeaderSyncError, }, - db::{peer_man::PeerManager, sqlite::header_db::SqliteHeaderDb}, + db::{ + peer_man::PeerManager, + traits::{HeaderStore, PeerStore}, + }, filters::cfheader_chain::CFHeaderSyncResult, node::{error::PersistenceError, peer_map::PeerMap}, TxBroadcastPolicy, @@ -44,7 +46,6 @@ use super::{ error::NodeError, messages::{ClientMessage, NodeMessage}, }; -use crate::db::sqlite::peer_db::SqlitePeerDb; type Whitelist = Option>; @@ -82,9 +83,10 @@ impl Node { network: Network, white_list: Whitelist, scripts: HashSet, - data_path: Option, header_checkpoint: Option, required_peers: usize, + peer_store: impl PeerStore + Send + Sync + 'static, + header_store: impl HeaderStore + Send + Sync + 'static, ) -> Result<(Self, Client), NodeError> { // Set up a communication channel between the node and client let (ntx, _) = broadcast::channel::(32); @@ -92,14 +94,8 @@ impl Node { let client = Client::new(ntx.clone(), ctx); // We always assume we are behind let state = Arc::new(RwLock::new(NodeState::Behind)); - // Load the databases // Configure the address manager - let peer_db = SqlitePeerDb::new(network, data_path.clone()) - .map_err(|_| NodeError::LoadError(PersistenceError::PeerLoadFailure))?; - let peer_man = Arc::new(Mutex::new(PeerManager::new(peer_db, &network))); - // Load the headers from storage - let db = SqliteHeaderDb::new(network, data_path) - .map_err(|_| NodeError::LoadError(PersistenceError::HeaderLoadError))?; + let peer_man = Arc::new(Mutex::new(PeerManager::new(peer_store, &network))); // Prepare the header checkpoints for the chain source let mut checkpoints = HeaderCheckpoints::new(&network); let checkpoint = header_checkpoint.unwrap_or_else(|| checkpoints.last()); @@ -113,13 +109,13 @@ impl Node { checkpoint, checkpoints, dialog.clone(), - db, + header_store, required_peers, ) .await .map_err(|_| NodeError::LoadError(PersistenceError::HeaderLoadError))?; // Initialize the height of the chain - let best_known_height = loaded_chain.height() as u32; + let best_known_height = loaded_chain.height(); let chain = Arc::new(Mutex::new(loaded_chain)); dialog .send_dialog(format!("Starting sync from block {}", best_known_height)) @@ -143,14 +139,17 @@ impl Node { pub(crate) async fn new_from_config( config: &NodeConfig, network: Network, + peer_store: impl PeerStore + Send + Sync + 'static, + header_store: impl HeaderStore + Send + Sync + 'static, ) -> Result<(Self, Client), NodeError> { Node::new( network, config.white_list.clone(), config.addresses.clone(), - config.data_path.clone(), config.header_checkpoint, config.required_peers as usize, + peer_store, + header_store, ) .await } diff --git a/src/peers/peer.rs b/src/peers/peer.rs index 41aeadc..da9498d 100644 --- a/src/peers/peer.rs +++ b/src/peers/peer.rs @@ -153,11 +153,6 @@ impl Peer { .write_all(&message_generator.new_verack()) .await .map_err(|_| PeerError::BufferWrite)?; - // writer - // .write_all(&message_generator.new_get_addr()) - // .await - // .map_err(|_| PeerError::BufferWrite)?; - // can ask for addresses here depending on if we need them Ok(()) } PeerMessage::Addr(addrs) => {