Skip to content

Commit

Permalink
Merge #1010: Update our state upon broadcasting a transaction
Browse files Browse the repository at this point in the history
58c71c7 lib: gate the RPC server availability on the 'daemon' feature (Antoine Poinsot)
b7fde6a commands: update our state immediately after broadcasting a tx (Antoine Poinsot)
1cf42d9 poller: introduce a communication channel with the poller thread (Antoine Poinsot)
f6ce85c lib: remove the panic hook. (Antoine Poinsot)
b4fe963 lib: encapsulate the handling of both threads (poller and RPC server) (Antoine Poinsot)
fd5387f poller: use the same database connection across one update round (Antoine Poinsot)
ea6923e poller: make the updating process into its own function. (Antoine Poinsot)

Pull request description:

  Fixes #887.

  This takes a couple commits from #909 but takes the approach from there in another direction: we don't externalize the poller, since only a single instance must be ran. Instead we properly keep track of the (up to) two threads we manage in the `DaemonHandle` and provide a way for a user of the library to check for errors in any of the threads.

  This approach allows us to 1) communicate with the poller thread from inside the Liana library/daemon (here we leverage this to tell it to poll) 2) eventually (#909) expose all internal errors from the library to the user instead of panic'ing internally.

  See the commit messages for details.

ACKs for top commit:
  darosior:
    ACK 58c71c7 -- did another pass and Edouard tested this in the GUI.

Tree-SHA512: 0ab436b2a187f9d124ed8861a47f03bb1e9252cdc4f3b5c4308db07be738c78b2ea3f07dc0a9586e3d5bd34f071a1e2a2569cad30676c9cc004e39260ebb94ca
  • Loading branch information
darosior committed Mar 22, 2024
2 parents 5527e63 + 58c71c7 commit 2aa8874
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 237 deletions.
11 changes: 7 additions & 4 deletions src/bin/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,16 @@ fn main() {
process::exit(1);
});

let daemon = DaemonHandle::start_default(config).unwrap_or_else(|e| {
let handle = DaemonHandle::start_default(config, true).unwrap_or_else(|e| {
log::error!("Error starting Liana daemon: {}", e);
process::exit(1);
});
daemon
.rpc_server()
.expect("JSONRPC server must terminate cleanly");
while handle.is_alive() {
thread::sleep(time::Duration::from_millis(500));
}
if let Err(e) = handle.stop() {
log::error!("Error stopping Liana daemon: {}", e);
}

// We are always logging to stdout, should it be then piped to the log file (if self) or
// not. So just make sure that all messages were actually written.
Expand Down
91 changes: 19 additions & 72 deletions src/bitcoin/poller/looper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ use crate::{
descriptors,
};

use std::{
collections::HashSet,
sync::{self, atomic},
thread, time,
};
use std::{collections::HashSet, sync, time};

use miniscript::bitcoin::{self, secp256k1};

Expand Down Expand Up @@ -208,13 +204,11 @@ fn new_tip(bit: &impl BitcoinInterface, current_tip: &BlockChainTip) -> TipUpdat
}

fn updates(
db_conn: &mut Box<dyn DatabaseConnection>,
bit: &impl BitcoinInterface,
db: &impl DatabaseInterface,
descs: &[descriptors::SinglePathLianaDesc],
secp: &secp256k1::Secp256k1<secp256k1::VerifyOnly>,
) {
let mut db_conn = db.connection();

// Check if there was a new block before updating ourselves.
let current_tip = db_conn.chain_tip().expect("Always set at first startup");
let latest_tip = match new_tip(bit, &current_tip) {
Expand All @@ -225,18 +219,18 @@ fn updates(
// between our former chain and the new one, then restart fresh.
db_conn.rollback_tip(&new_tip);
log::info!("Tip was rolled back to '{}'.", new_tip);
return updates(bit, db, descs, secp);
return updates(db_conn, bit, descs, secp);
}
};

// Then check the state of our coins. Do it even if the tip did not change since last poll, as
// we may have unconfirmed transactions.
let updated_coins = update_coins(bit, &mut db_conn, &current_tip, descs, secp);
let updated_coins = update_coins(bit, db_conn, &current_tip, descs, secp);

// If the tip changed while we were polling our Bitcoin interface, start over.
if bit.chain_tip() != latest_tip {
log::info!("Chain tip changed while we were updating our state. Starting over.");
return updates(bit, db, descs, secp);
return updates(db_conn, bit, descs, secp);
}

// The chain tip did not change since we started our updates. Record them and the latest tip.
Expand All @@ -258,13 +252,12 @@ fn updates(

// Check if there is any rescan of the backend ongoing or one that just finished.
fn rescan_check(
db_conn: &mut Box<dyn DatabaseConnection>,
bit: &impl BitcoinInterface,
db: &impl DatabaseInterface,
descs: &[descriptors::SinglePathLianaDesc],
secp: &secp256k1::Secp256k1<secp256k1::VerifyOnly>,
) {
log::debug!("Checking the state of an ongoing rescan if there is any");
let mut db_conn = db.connection();

// Check if there is an ongoing rescan. If there isn't and we previously asked for a rescan of
// the backend, we treat it as completed.
Expand Down Expand Up @@ -299,14 +292,14 @@ fn rescan_check(
"Rolling back our internal tip to '{}' to update our internal state with past transactions.",
rescan_tip
);
updates(bit, db, descs, secp)
updates(db_conn, bit, descs, secp)
} else {
log::debug!("No ongoing rescan.");
}
}

// If the database chain tip is NULL (first startup), initialize it.
fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) {
/// If the database chain tip is NULL (first startup), initialize it.
pub fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) {
let mut db_conn = db.connection();

if db_conn.chain_tip().is_none() {
Expand All @@ -315,7 +308,7 @@ fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface
}
}

fn sync_poll_interval() -> time::Duration {
pub fn sync_poll_interval() -> time::Duration {
// TODO: be smarter, like in revaultd, but more generic too.
#[cfg(not(test))]
{
Expand All @@ -325,60 +318,14 @@ fn sync_poll_interval() -> time::Duration {
time::Duration::from_secs(0)
}

/// Main event loop. Repeatedly polls the Bitcoin interface until told to stop through the
/// `shutdown` atomic.
pub fn looper(
bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
shutdown: sync::Arc<atomic::AtomicBool>,
poll_interval: time::Duration,
desc: descriptors::LianaDescriptor,
/// Update our state from the Bitcoin backend.
pub fn poll(
bit: &sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
db: &sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
secp: &secp256k1::Secp256k1<secp256k1::VerifyOnly>,
descs: &[descriptors::SinglePathLianaDesc],
) {
let mut last_poll = None;
let mut synced = false;
let descs = [
desc.receive_descriptor().clone(),
desc.change_descriptor().clone(),
];
let secp = secp256k1::Secp256k1::verification_only();

maybe_initialize_tip(&bit, &db);

while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() {
let now = time::Instant::now();

if let Some(last_poll) = last_poll {
let time_since_poll = now.duration_since(last_poll);
let poll_interval = if synced {
poll_interval
} else {
// Until we are synced we poll less often to avoid harassing bitcoind and impeding
// the sync. As a function since it's mocked for the tests.
sync_poll_interval()
};
if time_since_poll < poll_interval {
thread::sleep(time::Duration::from_millis(500));
continue;
}
}
last_poll = Some(now);

// Don't poll until the Bitcoin backend is fully synced.
if !synced {
let progress = bit.sync_progress();
log::info!(
"Block chain synchronization progress: {:.2}% ({} blocks / {} headers)",
progress.rounded_up_progress() * 100.0,
progress.blocks,
progress.headers
);
synced = progress.is_complete();
if !synced {
continue;
}
}

updates(&bit, &db, &descs, &secp);
rescan_check(&bit, &db, &descs, &secp);
}
let mut db_conn = db.connection();
updates(&mut db_conn, bit, descs, secp);
rescan_check(&mut db_conn, bit, descs, secp);
}
144 changes: 106 additions & 38 deletions src/bitcoin/poller/mod.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,128 @@
mod looper;

use crate::{
bitcoin::{poller::looper::looper, BitcoinInterface},
database::DatabaseInterface,
descriptors,
};
use crate::{bitcoin::BitcoinInterface, database::DatabaseInterface, descriptors};

use std::{
sync::{self, atomic},
thread, time,
sync::{self, mpsc},
time,
};

use miniscript::bitcoin::secp256k1;

#[derive(Debug, Clone)]
pub enum PollerMessage {
Shutdown,
/// Ask the Bitcoin poller to poll immediately, get notified through the passed channel once
/// it's done.
PollNow(mpsc::SyncSender<()>),
}

/// The Bitcoin poller handler.
pub struct Poller {
handle: thread::JoinHandle<()>,
shutdown: sync::Arc<atomic::AtomicBool>,
bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
secp: secp256k1::Secp256k1<secp256k1::VerifyOnly>,
// The receive and change descriptors (in this order).
descs: [descriptors::SinglePathLianaDesc; 2],
}

impl Poller {
pub fn start(
pub fn new(
bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
poll_interval: time::Duration,
desc: descriptors::LianaDescriptor,
) -> Poller {
let shutdown = sync::Arc::from(atomic::AtomicBool::from(false));
let handle = thread::Builder::new()
.name("Bitcoin poller".to_string())
.spawn({
let shutdown = shutdown.clone();
move || looper(bit, db, shutdown, poll_interval, desc)
})
.expect("Must not fail");

Poller { shutdown, handle }
}
let secp = secp256k1::Secp256k1::verification_only();
let descs = [
desc.receive_descriptor().clone(),
desc.change_descriptor().clone(),
];

pub fn trigger_stop(&self) {
self.shutdown.store(true, atomic::Ordering::Relaxed);
}
// On first startup the tip may be NULL. Make sure it's set as the poller relies on it.
looper::maybe_initialize_tip(&bit, &db);

pub fn stop(self) {
self.trigger_stop();
self.handle.join().expect("The poller loop must not fail");
Poller {
bit,
db,
secp,
descs,
}
}

#[cfg(feature = "nonblocking_shutdown")]
pub fn is_stopped(&self) -> bool {
// Doc says "This might return true for a brief moment after the thread’s main function has
// returned, but before the thread itself has stopped running.". But it's not an issue for
// us, as long as the main poller function has returned we are good.
self.handle.is_finished()
}
/// Continuously update our state from the Bitcoin backend.
/// - `poll_interval`: how frequently to perform an update.
/// - `shutdown`: set to true to stop continuously updating and make this function return.
///
/// Typically this would run for the whole duration of the program in a thread, and the main
/// thread would set the `shutdown` atomic to `true` when shutting down.
pub fn poll_forever(
&self,
poll_interval: time::Duration,
receiver: mpsc::Receiver<PollerMessage>,
) {
let mut last_poll = None;
let mut synced = false;

loop {
// How long to wait before the next poll.
let time_before_poll = if let Some(last_poll) = last_poll {
let time_since_poll = time::Instant::now().duration_since(last_poll);
// Until we are synced we poll less often to avoid harassing bitcoind and impeding
// the sync. As a function since it's mocked for the tests.
let poll_interval = if synced {
poll_interval
} else {
looper::sync_poll_interval()
};
poll_interval.saturating_sub(time_since_poll)
} else {
// Don't wait before doing the first poll.
time::Duration::ZERO
};

// Wait for the duration of the interval between polls, but listen to messages in the
// meantime.
match receiver.recv_timeout(time_before_poll) {
Ok(PollerMessage::Shutdown) => {
log::info!("Bitcoin poller was told to shut down.");
return;
}
Ok(PollerMessage::PollNow(sender)) => {
// We've been asked to poll, don't wait any further and signal completion to
// the caller.
last_poll = Some(time::Instant::now());
looper::poll(&self.bit, &self.db, &self.secp, &self.descs);
if let Err(e) = sender.send(()) {
log::error!("Error sending immediate poll completion signal: {}.", e);
}
continue;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
// It's been long enough since the last poll.
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
log::error!("Bitcoin poller communication channel got disconnected. Exiting.");
return;
}
}
last_poll = Some(time::Instant::now());

// Don't poll until the Bitcoin backend is fully synced.
if !synced {
let progress = self.bit.sync_progress();
log::info!(
"Block chain synchronization progress: {:.2}% ({} blocks / {} headers)",
progress.rounded_up_progress() * 100.0,
progress.blocks,
progress.headers
);
synced = progress.is_complete();
if !synced {
continue;
}
}

#[cfg(test)]
pub fn test_stop(&mut self) {
self.shutdown.store(true, atomic::Ordering::Relaxed);
looper::poll(&self.bit, &self.db, &self.secp, &self.descs);
}
}
}
Loading

0 comments on commit 2aa8874

Please sign in to comment.