Skip to content

Commit

Permalink
client: make chan broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
rustaceanrob committed Jun 3, 2024
1 parent 9dd61a1 commit b1c601b
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 35 deletions.
4 changes: 2 additions & 2 deletions example/signet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ async fn main() {
// Split the client into components that send messages and listen to messages.
// With this construction, different parts of the program can take ownership of
// specific tasks.
let (mut sender, receiver) = client.split();
let (mut sender, mut receiver) = client.split();
// Continually listen for events until the node is synced to its peers.
loop {
if let Some(message) = receiver.recv().await {
if let Ok(message) = receiver.recv().await {
match message {
NodeMessage::Dialog(d) => tracing::info!("{}", d),
NodeMessage::Warning(e) => tracing::warn!("{}", e),
Expand Down
4 changes: 2 additions & 2 deletions src/filters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub const CF_HEADER_BATCH_SIZE: u32 = 1_999;
pub const FILTER_BATCH_SIZE: u32 = 99;
pub(crate) const CF_HEADER_BATCH_SIZE: u32 = 1_999;
pub(crate) const FILTER_BATCH_SIZE: u32 = 99;

pub(crate) mod cfheader_batch;
pub(crate) mod cfheader_chain;
Expand Down
49 changes: 32 additions & 17 deletions src/node/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub use bitcoin::{Block, Transaction};
pub use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::sync::broadcast;
pub use tokio::sync::broadcast::Receiver;
pub use tokio::sync::mpsc::Sender;

use crate::{IndexedBlock, IndexedTransaction};

Expand All @@ -10,21 +11,21 @@ use super::{
};

/// A [`Client`] allows for communication with a running node.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Client {
nrx: Receiver<NodeMessage>,
nrx: broadcast::Sender<NodeMessage>,
ntx: Sender<ClientMessage>,
}

impl Client {
pub(crate) fn new(nrx: Receiver<NodeMessage>, ntx: Sender<ClientMessage>) -> Self {
pub(crate) fn new(nrx: broadcast::Sender<NodeMessage>, ntx: Sender<ClientMessage>) -> Self {
Self { nrx, ntx }
}

/// For a majority of cases, some parts of your program will respond to node events, and other parts of the program
/// will send events to the node. This method returns a [`ClientSender`] to issue commands to the node, and a
/// [`Receiver`] to continually respond to events issued from the node.
pub fn split(&mut self) -> (ClientSender, &mut Receiver<NodeMessage>) {
pub fn split(&mut self) -> (ClientSender, Receiver<NodeMessage>) {
(self.sender(), self.receiver())
}

Expand All @@ -34,11 +35,14 @@ impl Client {
}

/// Return a [`Receiver`] to listen for incoming node events.
/// This method returns a mutable borrow to the [`Receiver`]. If you require the
/// ability to send events to the node, i.e. to broadcast a transaction, either call [`Client::split`]
/// or [`Client::sender`] before calling [`Client::receiver`].
pub fn receiver(&mut self) -> &mut Receiver<NodeMessage> {
&mut self.nrx
/// You may call this function as many times as required, however please note
/// there are memory and performance implications when calling this method. Namely, a clone of the object,
/// potentially a large data structure like a [`Block`], is held in memory for _every_ receiver until _all_
/// receivers have gotten the message.
/// You should only call this twice if two separate portions of your application need to process
/// data differently. For example, a Lightning Network node implementation.
pub fn receiver(&mut self) -> Receiver<NodeMessage> {
self.nrx.subscribe()
}

/// Tell the node to stop running.
Expand All @@ -56,11 +60,15 @@ impl Client {
/// such as Lightning Network nodes, which are expected to be online for long durations.
pub async fn collect_relevant_tx(&mut self) -> Vec<IndexedTransaction> {
let mut txs = Vec::new();
let mut rec = self.nrx.subscribe();
loop {
while let Some(message) = self.nrx.recv().await {
while let Ok(message) = rec.recv().await {
match message {
NodeMessage::Transaction(tx) => txs.push(tx),
NodeMessage::Synced(_) => return txs,
NodeMessage::Synced(_) => {
drop(rec);
return txs;
}
_ => (),
}
}
Expand All @@ -73,12 +81,16 @@ impl Client {
/// such as a server or desktop computer.
/// For devices like smart phones, see [`Client::collect_relevant_tx`].
pub async fn collect_relevant_blocks(&mut self) -> Vec<IndexedBlock> {
let mut rec = self.nrx.subscribe();
let mut blocks = Vec::new();
loop {
while let Some(message) = self.nrx.recv().await {
while let Ok(message) = rec.recv().await {
match message {
NodeMessage::Block(block) => blocks.push(block),
NodeMessage::Synced(_) => return blocks,
NodeMessage::Synced(_) => {
drop(rec);
return blocks;
}
_ => (),
}
}
Expand All @@ -87,9 +99,11 @@ impl Client {

/// Wait until the client's headers and filters are fully synced to connected peers, dropping any block and transaction messages.
pub async fn wait_until_synced(&mut self) {
let mut rec = self.nrx.subscribe();
loop {
while let Some(message) = self.nrx.recv().await {
while let Ok(message) = rec.recv().await {
if let NodeMessage::Synced(_) = message {
drop(rec);
return;
}
}
Expand All @@ -99,8 +113,9 @@ impl Client {
/// Print a stream of logs to the console. This function continually loops for the duration of the program, and is not particularly helpful in production applications.
/// See [`Client::receiver`] to listen for events from the node.
pub async fn print_log_stream(&mut self) {
let mut rec = self.nrx.subscribe();
loop {
while let Some(message) = self.nrx.recv().await {
while let Ok(message) = rec.recv().await {
match message {
NodeMessage::Dialog(message) => {
println!("\x1b[32mInfo\x1b[0m {}", message);
Expand Down
10 changes: 5 additions & 5 deletions src/node/dialog.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tokio::sync::mpsc::Sender;
use tokio::sync::broadcast::Sender;

use super::node_messages::NodeMessage;

Expand All @@ -13,7 +13,7 @@ impl Dialog {
}

pub(crate) async fn send_dialog(&mut self, dialog: String) {
let _ = self.ntx.send(NodeMessage::Dialog(dialog)).await;
let _ = self.ntx.send(NodeMessage::Dialog(dialog));
}

pub(crate) async fn chain_update(
Expand All @@ -27,14 +27,14 @@ impl Dialog {
"Headers ({}/{}) Compact Filter Headers ({}/{}) Filters ({}/{})",
num_headers, best_height, num_cf_headers, best_height, num_filters, best_height
);
let _ = self.ntx.send(NodeMessage::Dialog(message)).await;
let _ = self.ntx.send(NodeMessage::Dialog(message));
}

pub(crate) async fn send_warning(&mut self, warning: String) {
let _ = self.ntx.send(NodeMessage::Warning(warning)).await;
let _ = self.ntx.send(NodeMessage::Warning(warning));
}

pub(crate) async fn send_data(&mut self, message: NodeMessage) {
let _ = self.ntx.send(message).await;
let _ = self.ntx.send(message);
}
}
8 changes: 4 additions & 4 deletions src/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use bitcoin::{
Block, Network,
};
use rand::{prelude::SliceRandom, rngs::StdRng, SeedableRng};
use tokio::sync::{mpsc::Receiver, Mutex, RwLock};
use tokio::sync::{broadcast, mpsc::Receiver, Mutex, RwLock};
use tokio::{
select,
sync::mpsc::{self},
Expand Down Expand Up @@ -87,9 +87,9 @@ impl Node {
required_peers: usize,
) -> Result<(Self, Client), NodeError> {
// Set up a communication channel between the node and client
let (ntx, nrx) = mpsc::channel::<NodeMessage>(32);
let (ntx, _) = broadcast::channel::<NodeMessage>(32);
let (ctx, crx) = mpsc::channel::<ClientMessage>(5);
let client = Client::new(nrx, ctx);
let client = Client::new(ntx.clone(), ctx);
// We always assume we are behind
let state = Arc::new(RwLock::new(NodeState::Behind));
// Load the databases
Expand All @@ -107,7 +107,7 @@ impl Node {
let mut scripts = HashSet::new();
scripts.extend(addresses.iter().map(|address| address.script_pubkey()));
// A structured way to talk to the client
let mut dialog = Dialog::new(ntx.clone());
let mut dialog = Dialog::new(ntx);
// Build the chain
let loaded_chain = Chain::new(
&network,
Expand Down
2 changes: 1 addition & 1 deletion src/node/node_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
/// Messages receivable by a running node.
#[derive(Debug, Clone)]
pub enum NodeMessage {
/// A human readable dialog
/// A human readable dialog of what the node is currently doing
Dialog(String),
/// A human readable warning that may effect the function of the node
Warning(String),
Expand Down
8 changes: 4 additions & 4 deletions src/peers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod dns;
pub mod outbound_messages;
pub mod peer;
pub mod reader;
pub(crate) mod dns;
pub(crate) mod outbound_messages;
pub(crate) mod peer;
pub(crate) mod reader;

0 comments on commit b1c601b

Please sign in to comment.