Skip to content

Commit

Permalink
Merge pull request #8 from rustaceanrob/feature-db
Browse files Browse the repository at this point in the history
crate: put sql db behind feature
  • Loading branch information
rustaceanrob authored Jun 19, 2024
2 parents 5886ec4 + e3b581d commit 4e452d9
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 52 deletions.
16 changes: 12 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dns-lookup = "2.0.0"
# Enable the tokio-console task and poll observations
# console-subscriber = "0.2.0"
rand = "0.8.0"
rusqlite = { version = "0.31.0", features = ["bundled"] }
rusqlite = { version = "0.31.0", features = ["bundled"], optional = true }
thiserror = { version = "1" }
tokio = { version = "1", default-features = false, features = [
"rt-multi-thread",
Expand All @@ -33,6 +33,10 @@ tokio = { version = "1", default-features = false, features = [
"macros",
] }

[features]
default = ["database"]
database = ["rusqlite"]

[dev-dependencies]
hex = { version = "0.4.0" }
tracing = "0.1"
Expand All @@ -41,6 +45,10 @@ tokio = { version = "1", default-features = false, features = [
"full",
] } # add feature "tracing" to use the console

[lib]
name = "kyoto"
path = "src/lib.rs"

[[example]]
name = "signet"
path = "example/signet.rs"
Expand All @@ -49,6 +57,6 @@ path = "example/signet.rs"
name = "rescan"
path = "example/rescan.rs"

[lib]
name = "kyoto"
path = "src/lib.rs"
[[example]]
name = "memory"
path = "example/memory.rs"
74 changes: 74 additions & 0 deletions example/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use bitcoin::BlockHash;
use kyoto::node::messages::NodeMessage;
use kyoto::{chain::checkpoints::HeaderCheckpoint, node::builder::NodeBuilder};
use std::collections::HashSet;
use std::{
net::{IpAddr, Ipv4Addr},
str::FromStr,
};

#[tokio::main]
async fn main() {
// Add third-party logging
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// Add Bitcoin scripts to scan the blockchain for
let address = bitcoin::Address::from_str("tb1q9pvjqz5u5sdgpatg3wn0ce438u5cyv85lly0pc")
.unwrap()
.require_network(bitcoin::Network::Signet)
.unwrap()
.into();
let mut addresses = HashSet::new();
addresses.insert(address);
// Define a peer to connect to
let peer = IpAddr::V4(Ipv4Addr::new(95, 217, 198, 121));
// Create a new node builder
let builder = NodeBuilder::new(bitcoin::Network::Signet);
// Add node preferences and build the node/client
let (mut node, mut client) = builder
// Add the peer
.add_peers(vec![(peer, 38333)])
// The Bitcoin scripts to monitor
.add_scripts(addresses)
// Only scan blocks strictly after an anchor checkpoint
.anchor_checkpoint(HeaderCheckpoint::new(
190_000,
BlockHash::from_str("0000013a6143b7360b7ba3834316b3265ee9072dde440bd45f99c01c42abaef2")
.unwrap(),
))
// The number of connections we would like to maintain
.num_required_peers(1)
// Create the node and client without the usual SQL databases
.build_node_with_custom_databases((), ())
.await;
// Run the node
tokio::task::spawn(async move { node.run().await });
// Split the client into components that send messages and listen to messages.
// With this construction, different parts of the program can take ownership of
// specific tasks.
let (mut sender, mut receiver) = client.split();
// Continually listen for events until the node is synced to its peers.
loop {
if let Ok(message) = receiver.recv().await {
match message {
NodeMessage::Dialog(d) => tracing::info!("{}", d),
NodeMessage::Warning(e) => tracing::warn!("{}", e),
NodeMessage::Transaction(t) => drop(t),
NodeMessage::Block(b) => drop(b),
NodeMessage::BlocksDisconnected(r) => {
let _ = r;
}
NodeMessage::TxBroadcastFailure => {
tracing::error!("The transaction could not be broadcast.")
}
NodeMessage::Synced(tip) => {
tracing::info!("Synced chain up to block {}", tip.height,);
tracing::info!("Chain tip: {}", tip.hash.to_string(),);
break;
}
}
}
}
let _ = sender.shutdown().await;
tracing::info!("Shutting down");
}
7 changes: 3 additions & 4 deletions example/signet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ async fn main() {
let mut addresses = HashSet::new();
addresses.insert(address);
// Add preferred peers to connect to
let peer = IpAddr::V4(Ipv4Addr::new(95, 217, 198, 121));
// let peer_2 = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100));
let peer = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100));
// Create a new node builder
let builder = NodeBuilder::new(bitcoin::Network::Signet);
// Add node preferences and build the node/client
Expand All @@ -33,8 +32,8 @@ async fn main() {
.add_scripts(addresses)
// Only scan blocks strictly after an anchor checkpoint
.anchor_checkpoint(HeaderCheckpoint::new(
180_000,
BlockHash::from_str("0000000870f15246ba23c16e370a7ffb1fc8a3dcf8cb4492882ed4b0e3d4cd26")
190_000,
BlockHash::from_str("0000013a6143b7360b7ba3834316b3265ee9072dde440bd45f99c01c42abaef2")
.unwrap(),
))
// The number of connections we would like to maintain
Expand Down
13 changes: 11 additions & 2 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,26 @@ use std::net::IpAddr;

use bitcoin::p2p::ServiceFlags;

pub(crate) mod error;
/// Errors a database backend may produce.
pub mod error;
pub(crate) mod peer_man;
#[cfg(feature = "database")]
pub(crate) mod sqlite;
pub(crate) mod traits;
/// Traits that define the header and peer databases.
pub mod traits;

/// A peer that will be saved to the [`traits::PeerStore`].
#[derive(Debug, Clone)]
pub struct PersistedPeer {
/// Canonical IP address of this peer.
pub addr: IpAddr,
/// The port believed to be listening for connections.
pub port: u16,
/// The services this peer may offer.
pub services: ServiceFlags,
/// Have we tried this peer before.
pub tried: bool,
/// Did we ban this peer for faulty behavior.
pub banned: bool,
}

Expand Down
26 changes: 13 additions & 13 deletions src/db/peer_man.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,10 @@ impl PeerManager {
// DNS fails if there is an insufficient number of peers
for peer in new_peers {
db_lock
.update(PersistedPeer::new(
peer,
self.default_port,
ServiceFlags::NONE,
false,
false,
))
.update(
PersistedPeer::new(peer, self.default_port, ServiceFlags::NONE, false, false),
true,
)
.await
.map_err(PeerManagerError::Database)?;
}
Expand Down Expand Up @@ -117,13 +114,16 @@ impl PeerManager {
) -> Result<(), PeerManagerError> {
let mut db_lock = self.db.lock().await;
db_lock
.update(PersistedPeer::new(
addr,
port.unwrap_or(self.default_port),
services.unwrap_or(ServiceFlags::NONE),
.update(
PersistedPeer::new(
addr,
port.unwrap_or(self.default_port),
services.unwrap_or(ServiceFlags::NONE),
tried,
ban,
),
tried,
ban,
))
)
.await
.map_err(PeerManagerError::Database)?;
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/db/sqlite/peer_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ impl SqlitePeerDb {

#[async_trait]
impl PeerStore for SqlitePeerDb {
async fn update(&mut self, peer: PersistedPeer) -> Result<(), DatabaseError> {
async fn update(&mut self, peer: PersistedPeer, replace: bool) -> Result<(), DatabaseError> {
let lock = self.conn.lock().await;
let stmt = if !peer.tried {
let stmt = if !replace {
"INSERT OR IGNORE INTO peers (ip_addr, port, service_flags, tried, banned) VALUES (?1, ?2, ?3, ?4, ?5)"
} else {
"INSERT OR REPLACE INTO peers (ip_addr, port, service_flags, tried, banned) VALUES (?1, ?2, ?3, ?4, ?5)"
Expand Down
17 changes: 13 additions & 4 deletions src/db/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,26 @@ use crate::prelude::default_port_from_network;

use super::{error::DatabaseError, PersistedPeer};

/// Methods required to persist the chain of block headers.
#[async_trait]
pub(crate) trait HeaderStore {
pub trait HeaderStore {
/// Load all headers with heights *strictly after* the specified anchor height.
async fn load(&mut self, anchor_height: u32) -> Result<BTreeMap<u32, Header>, DatabaseError>;

/// Write an indexed map of block headers to the database, ignoring if they already exist.
async fn write<'a>(
&mut self,
header_chain: &'a BTreeMap<u32, Header>,
) -> Result<(), DatabaseError>;

/// Write the headers to the database, replacing headers over the specified height.
async fn write_over<'a>(
&mut self,
header_chain: &'a BTreeMap<u32, Header>,
height: u32,
) -> Result<(), DatabaseError>;

/// Return the height of a block hash in the database, if it exists.
async fn height_of<'a>(&mut self, hash: &'a BlockHash) -> Result<Option<u32>, DatabaseError>;
}

Expand Down Expand Up @@ -58,18 +63,22 @@ impl HeaderStore for () {
}
}

/// Methods that define a list of peers on the Bitcoin P2P network.
#[async_trait]
pub(crate) trait PeerStore {
async fn update(&mut self, peer: PersistedPeer) -> Result<(), DatabaseError>;
pub trait PeerStore {
/// Add a peer to the database, defining if it should be replaced or not.
async fn update(&mut self, peer: PersistedPeer, replace: bool) -> Result<(), DatabaseError>;

/// Get any peer from the database, selected at random.
async fn random(&mut self) -> Result<PersistedPeer, DatabaseError>;

/// The number of peers in the database that are not marked as banned.
async fn num_unbanned(&mut self) -> Result<u32, DatabaseError>;
}

#[async_trait]
impl PeerStore for () {
async fn update(&mut self, _peer: PersistedPeer) -> Result<(), DatabaseError> {
async fn update(&mut self, _peer: PersistedPeer, _replace: bool) -> Result<(), DatabaseError> {
Ok(())
}

Expand Down
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
//! and vetted.

#![allow(dead_code)]
/// Strucutres related to the blockchain.
/// Strucutres and checkpoints related to the blockchain.
pub mod chain;
mod db;
/// Traits and structures that define the data persistence required for a node.
pub mod db;
mod filters;
/// Tools to build and run a compact block filters node.
pub mod node;
Expand Down
22 changes: 20 additions & 2 deletions src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::{collections::HashSet, net::IpAddr, path::PathBuf};

use bitcoin::{Network, ScriptBuf};

use crate::chain::checkpoints::HeaderCheckpoint;
use crate::{
chain::checkpoints::HeaderCheckpoint,
db::traits::{HeaderStore, PeerStore},
};

use super::{client::Client, config::NodeConfig, node::Node};

Expand Down Expand Up @@ -55,8 +58,23 @@ impl NodeBuilder {
}

/// Consume the node builder and receive a [`Node`] and [`Client`].
#[cfg(feature = "database")]
pub async fn build_node(&self) -> (Node, Client) {
Node::new_from_config(&self.config, self.network)
use crate::db::sqlite::{header_db::SqliteHeaderDb, peer_db::SqlitePeerDb};
let peer_store = SqlitePeerDb::new(self.network, self.config.data_path.clone()).unwrap();
let header_store =
SqliteHeaderDb::new(self.network, self.config.data_path.clone()).unwrap();
Node::new_from_config(&self.config, self.network, peer_store, header_store)
.await
.unwrap()
}

pub async fn build_node_with_custom_databases(
&self,
peer_store: impl PeerStore + Send + Sync + 'static,
header_store: impl HeaderStore + Send + Sync + 'static,
) -> (Node, Client) {
Node::new_from_config(&self.config, self.network, peer_store, header_store)
.await
.unwrap()
}
Expand Down
Loading

0 comments on commit 4e452d9

Please sign in to comment.