diff --git a/example/signet.rs b/example/signet.rs index 2cc50f6..2044e8f 100644 --- a/example/signet.rs +++ b/example/signet.rs @@ -47,10 +47,10 @@ async fn main() { // Split the client into components that send messages and listen to messages. // With this construction, different parts of the program can take ownership of // specific tasks. - let (mut sender, receiver) = client.split(); + let (mut sender, mut receiver) = client.split(); // Continually listen for events until the node is synced to its peers. loop { - if let Some(message) = receiver.recv().await { + if let Ok(message) = receiver.recv().await { match message { NodeMessage::Dialog(d) => tracing::info!("{}", d), NodeMessage::Warning(e) => tracing::warn!("{}", e), diff --git a/src/filters/mod.rs b/src/filters/mod.rs index d5991ef..4fd5696 100644 --- a/src/filters/mod.rs +++ b/src/filters/mod.rs @@ -1,5 +1,5 @@ -pub const CF_HEADER_BATCH_SIZE: u32 = 1_999; -pub const FILTER_BATCH_SIZE: u32 = 99; +pub(crate) const CF_HEADER_BATCH_SIZE: u32 = 1_999; +pub(crate) const FILTER_BATCH_SIZE: u32 = 99; pub(crate) mod cfheader_batch; pub(crate) mod cfheader_chain; diff --git a/src/node/client.rs b/src/node/client.rs index ad5b6b6..107c22f 100644 --- a/src/node/client.rs +++ b/src/node/client.rs @@ -1,6 +1,7 @@ pub use bitcoin::{Block, Transaction}; -pub use tokio::sync::mpsc::Receiver; -use tokio::sync::mpsc::Sender; +use tokio::sync::broadcast; +pub use tokio::sync::broadcast::Receiver; +pub use tokio::sync::mpsc::Sender; use crate::{IndexedBlock, IndexedTransaction}; @@ -10,21 +11,21 @@ use super::{ }; /// A [`Client`] allows for communication with a running node. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Client { - nrx: Receiver, + nrx: broadcast::Sender, ntx: Sender, } impl Client { - pub(crate) fn new(nrx: Receiver, ntx: Sender) -> Self { + pub(crate) fn new(nrx: broadcast::Sender, ntx: Sender) -> Self { Self { nrx, ntx } } /// For a majority of cases, some parts of your program will respond to node events, and other parts of the program /// will send events to the node. This method returns a [`ClientSender`] to issue commands to the node, and a /// [`Receiver`] to continually respond to events issued from the node. - pub fn split(&mut self) -> (ClientSender, &mut Receiver) { + pub fn split(&mut self) -> (ClientSender, Receiver) { (self.sender(), self.receiver()) } @@ -34,11 +35,14 @@ impl Client { } /// Return a [`Receiver`] to listen for incoming node events. - /// This method returns a mutable borrow to the [`Receiver`]. If you require the - /// ability to send events to the node, i.e. to broadcast a transaction, either call [`Client::split`] - /// or [`Client::sender`] before calling [`Client::receiver`]. - pub fn receiver(&mut self) -> &mut Receiver { - &mut self.nrx + /// You may call this function as many times as required, however please note + /// there are memory and performance implications when calling this method. Namely, a clone of the object, + /// potentially a large data structure like a [`Block`], is held in memory for _every_ receiver until _all_ + /// receivers have gotten the message. + /// You should only call this twice if two separate portions of your application need to process + /// data differently. For example, a Lightning Network node implementation. + pub fn receiver(&mut self) -> Receiver { + self.nrx.subscribe() } /// Tell the node to stop running. @@ -56,11 +60,15 @@ impl Client { /// such as Lightning Network nodes, which are expected to be online for long durations. pub async fn collect_relevant_tx(&mut self) -> Vec { let mut txs = Vec::new(); + let mut rec = self.nrx.subscribe(); loop { - while let Some(message) = self.nrx.recv().await { + while let Ok(message) = rec.recv().await { match message { NodeMessage::Transaction(tx) => txs.push(tx), - NodeMessage::Synced(_) => return txs, + NodeMessage::Synced(_) => { + drop(rec); + return txs; + } _ => (), } } @@ -73,12 +81,16 @@ impl Client { /// such as a server or desktop computer. /// For devices like smart phones, see [`Client::collect_relevant_tx`]. pub async fn collect_relevant_blocks(&mut self) -> Vec { + let mut rec = self.nrx.subscribe(); let mut blocks = Vec::new(); loop { - while let Some(message) = self.nrx.recv().await { + while let Ok(message) = rec.recv().await { match message { NodeMessage::Block(block) => blocks.push(block), - NodeMessage::Synced(_) => return blocks, + NodeMessage::Synced(_) => { + drop(rec); + return blocks; + } _ => (), } } @@ -87,9 +99,11 @@ impl Client { /// Wait until the client's headers and filters are fully synced to connected peers, dropping any block and transaction messages. pub async fn wait_until_synced(&mut self) { + let mut rec = self.nrx.subscribe(); loop { - while let Some(message) = self.nrx.recv().await { + while let Ok(message) = rec.recv().await { if let NodeMessage::Synced(_) = message { + drop(rec); return; } } @@ -99,8 +113,9 @@ impl Client { /// Print a stream of logs to the console. This function continually loops for the duration of the program, and is not particularly helpful in production applications. /// See [`Client::receiver`] to listen for events from the node. pub async fn print_log_stream(&mut self) { + let mut rec = self.nrx.subscribe(); loop { - while let Some(message) = self.nrx.recv().await { + while let Ok(message) = rec.recv().await { match message { NodeMessage::Dialog(message) => { println!("\x1b[32mInfo\x1b[0m {}", message); diff --git a/src/node/dialog.rs b/src/node/dialog.rs index f8a7160..deb8aff 100644 --- a/src/node/dialog.rs +++ b/src/node/dialog.rs @@ -1,4 +1,4 @@ -use tokio::sync::mpsc::Sender; +use tokio::sync::broadcast::Sender; use super::node_messages::NodeMessage; @@ -13,7 +13,7 @@ impl Dialog { } pub(crate) async fn send_dialog(&mut self, dialog: String) { - let _ = self.ntx.send(NodeMessage::Dialog(dialog)).await; + let _ = self.ntx.send(NodeMessage::Dialog(dialog)); } pub(crate) async fn chain_update( @@ -27,14 +27,14 @@ impl Dialog { "Headers ({}/{}) Compact Filter Headers ({}/{}) Filters ({}/{})", num_headers, best_height, num_cf_headers, best_height, num_filters, best_height ); - let _ = self.ntx.send(NodeMessage::Dialog(message)).await; + let _ = self.ntx.send(NodeMessage::Dialog(message)); } pub(crate) async fn send_warning(&mut self, warning: String) { - let _ = self.ntx.send(NodeMessage::Warning(warning)).await; + let _ = self.ntx.send(NodeMessage::Warning(warning)); } pub(crate) async fn send_data(&mut self, message: NodeMessage) { - let _ = self.ntx.send(message).await; + let _ = self.ntx.send(message); } } diff --git a/src/node/node.rs b/src/node/node.rs index fa9f6ee..b9c307c 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -15,7 +15,7 @@ use bitcoin::{ Block, Network, }; use rand::{prelude::SliceRandom, rngs::StdRng, SeedableRng}; -use tokio::sync::{mpsc::Receiver, Mutex, RwLock}; +use tokio::sync::{broadcast, mpsc::Receiver, Mutex, RwLock}; use tokio::{ select, sync::mpsc::{self}, @@ -87,9 +87,9 @@ impl Node { required_peers: usize, ) -> Result<(Self, Client), NodeError> { // Set up a communication channel between the node and client - let (ntx, nrx) = mpsc::channel::(32); + let (ntx, _) = broadcast::channel::(32); let (ctx, crx) = mpsc::channel::(5); - let client = Client::new(nrx, ctx); + let client = Client::new(ntx.clone(), ctx); // We always assume we are behind let state = Arc::new(RwLock::new(NodeState::Behind)); // Load the databases @@ -107,7 +107,7 @@ impl Node { let mut scripts = HashSet::new(); scripts.extend(addresses.iter().map(|address| address.script_pubkey())); // A structured way to talk to the client - let mut dialog = Dialog::new(ntx.clone()); + let mut dialog = Dialog::new(ntx); // Build the chain let loaded_chain = Chain::new( &network, diff --git a/src/node/node_messages.rs b/src/node/node_messages.rs index bd8aec5..0a1f3e0 100644 --- a/src/node/node_messages.rs +++ b/src/node/node_messages.rs @@ -7,7 +7,7 @@ use crate::{ /// Messages receivable by a running node. #[derive(Debug, Clone)] pub enum NodeMessage { - /// A human readable dialog + /// A human readable dialog of what the node is currently doing Dialog(String), /// A human readable warning that may effect the function of the node Warning(String), diff --git a/src/peers/mod.rs b/src/peers/mod.rs index 4509ff4..6f01ebc 100644 --- a/src/peers/mod.rs +++ b/src/peers/mod.rs @@ -1,4 +1,4 @@ -pub mod dns; -pub mod outbound_messages; -pub mod peer; -pub mod reader; +pub(crate) mod dns; +pub(crate) mod outbound_messages; +pub(crate) mod peer; +pub(crate) mod reader;