diff --git a/bitcoin/src/error.rs b/bitcoin/src/error.rs index 119cfb8c9..3aa704274 100644 --- a/bitcoin/src/error.rs +++ b/bitcoin/src/error.rs @@ -79,6 +79,8 @@ pub enum Error { AddressError(#[from] AddressError), #[error("Failed to fetch coinbase tx")] CoinbaseFetchingFailure, + #[error("Expected a value in a rpc result that is missing")] + MissingValue, } impl Error { diff --git a/bitcoin/src/iter.rs b/bitcoin/src/iter.rs index 81dba865e..c2e699672 100644 --- a/bitcoin/src/iter.rs +++ b/bitcoin/src/iter.rs @@ -1,4 +1,4 @@ -use crate::{BitcoinCoreApi, BitcoinRpcError, Error}; +use crate::{BitcoinRpcError, DynBitcoinCoreApi, Error}; use bitcoincore_rpc::{ bitcoin::{Block, BlockHash, Transaction}, jsonrpc::Error as JsonRpcError, @@ -6,9 +6,7 @@ use bitcoincore_rpc::{ }; use futures::{prelude::*, stream::StreamExt}; use log::trace; -use std::{iter, sync::Arc}; - -type DynBitcoinCoreApi = Arc; +use std::iter; /// Stream over transactions, starting with this in the mempool and continuing with /// transactions from previous in-chain block. The stream ends after the block at diff --git a/bitcoin/src/lib.rs b/bitcoin/src/lib.rs index fe5148c30..3247bd360 100644 --- a/bitcoin/src/lib.rs +++ b/bitcoin/src/lib.rs @@ -10,6 +10,7 @@ mod addr; mod electrs; mod error; mod iter; +pub mod relay; use async_trait::async_trait; use backoff::{backoff::Backoff, future::retry, ExponentialBackoff}; @@ -48,6 +49,7 @@ pub use electrs::{ElectrsClient, Error as ElectrsError}; pub use error::{BitcoinRpcError, ConversionError, Error}; pub use iter::{reverse_stream_transactions, stream_blocks, stream_in_chain_transactions}; use log::{info, trace, warn}; +pub use relay::*; use serde_json::error::Category as SerdeJsonCategory; pub use sp_core::H256; use std::{ @@ -101,6 +103,8 @@ const RANDOMIZATION_FACTOR: f64 = 0.25; const DERIVATION_KEY_LABEL: &str = "derivation-key"; const DEPOSIT_LABEL: &str = "deposit"; +pub type DynBitcoinCoreApi = Arc; + fn get_exponential_backoff() -> ExponentialBackoff { ExponentialBackoff { current_interval: INITIAL_INTERVAL, diff --git a/vault/src/relay/backing.rs b/bitcoin/src/relay/backing.rs similarity index 79% rename from vault/src/relay/backing.rs rename to bitcoin/src/relay/backing.rs index c8faef6f9..8b83a22c5 100644 --- a/vault/src/relay/backing.rs +++ b/bitcoin/src/relay/backing.rs @@ -1,48 +1,46 @@ -use super::Error; -use crate::service::DynBitcoinCoreApi; +use crate::{serialize, BitcoinCoreApi, DynBitcoinCoreApi, Error as BitcoinError}; use async_trait::async_trait; -use bitcoin::{serialize, BitcoinCoreApi, Error as BitcoinError}; #[async_trait] pub trait Backing { /// Returns the height of the longest chain - async fn get_block_count(&self) -> Result; + async fn get_block_count(&self) -> Result; /// Returns the raw header of a block in storage /// /// # Arguments /// /// * `height` - The height of the block to fetch - async fn get_block_header(&self, height: u32) -> Result>, Error>; + async fn get_block_header(&self, height: u32) -> Result>, BitcoinError>; /// Returns the (little endian) hash of a block /// /// # Arguments /// /// * `height` - The height of the block to fetch - async fn get_block_hash(&self, height: u32) -> Result, Error>; + async fn get_block_hash(&self, height: u32) -> Result, BitcoinError>; } #[async_trait] impl Backing for DynBitcoinCoreApi { - async fn get_block_count(&self) -> Result { + async fn get_block_count(&self) -> Result { let count = BitcoinCoreApi::get_block_count(&**self).await?; return Ok(count as u32); } - async fn get_block_header(&self, height: u32) -> Result>, Error> { + async fn get_block_header(&self, height: u32) -> Result>, BitcoinError> { let block_hash = match BitcoinCoreApi::get_block_hash(&**self, height).await { Ok(h) => h, Err(BitcoinError::InvalidBitcoinHeight) => { return Ok(None); } - Err(err) => return Err(err.into()), + Err(err) => return Err(err), }; let block_header = BitcoinCoreApi::get_block_header(&**self, &block_hash).await?; Ok(Some(serialize(&block_header))) } - async fn get_block_hash(&self, height: u32) -> Result, Error> { + async fn get_block_hash(&self, height: u32) -> Result, BitcoinError> { let block_hash = BitcoinCoreApi::get_block_hash(&**self, height) .await .map(|hash| serialize(&hash))?; diff --git a/vault/src/relay/error.rs b/bitcoin/src/relay/error.rs similarity index 74% rename from vault/src/relay/error.rs rename to bitcoin/src/relay/error.rs index 4494a6a0b..9e7c8dd05 100644 --- a/vault/src/relay/error.rs +++ b/bitcoin/src/relay/error.rs @@ -1,14 +1,13 @@ #![allow(clippy::enum_variant_names)] -use bitcoin::Error as BitcoinError; -use runtime::Error as RuntimeError; +use crate::Error as BitcoinError; use thiserror::Error; #[cfg(test)] use std::mem::discriminant; #[derive(Error, Debug)] -pub enum Error { +pub enum Error { #[error("Client already initialized")] AlreadyInitialized, #[error("Client has not been initialized")] @@ -28,13 +27,7 @@ pub enum Error { #[error("BitcoinError: {0}")] BitcoinError(#[from] BitcoinError), + // note: we can't have two #[from]s when one is generic. We'll use map_err for the runtime error #[error("RuntimeError: {0}")] - RuntimeError(#[from] RuntimeError), -} - -#[cfg(test)] -impl PartialEq for Error { - fn eq(&self, other: &Self) -> bool { - discriminant(self) == discriminant(other) - } + RuntimeError(RuntimeError), } diff --git a/bitcoin/src/relay/issuing.rs b/bitcoin/src/relay/issuing.rs new file mode 100644 index 000000000..a1901ee75 --- /dev/null +++ b/bitcoin/src/relay/issuing.rs @@ -0,0 +1,61 @@ +use async_trait::async_trait; +use std::{fmt, sync::Arc}; + +#[async_trait] +pub trait RandomDelay: fmt::Debug { + type Error; + async fn delay(&self, seed_data: &[u8; 32]) -> Result<(), Self::Error>; +} + +#[async_trait] +pub trait Issuing { + type Error; + + /// Returns true if the light client is initialized + async fn is_initialized(&self) -> Result; + + /// Initialize the light client + /// + /// # Arguments + /// + /// * `header` - Raw block header + /// * `height` - Starting height + async fn initialize(&self, header: Vec, height: u32) -> Result<(), Self::Error>; + + /// Submit a block header and wait for inclusion + /// + /// # Arguments + /// + /// * `header` - Raw block header + async fn submit_block_header( + &self, + header: Vec, + random_delay: Arc + Send + Sync>>, + ) -> Result<(), Self::Error>; + + /// Submit a batch of block headers and wait for inclusion + /// + /// # Arguments + /// + /// * `headers` - Raw block headers (multiple of 80 bytes) + async fn submit_block_header_batch(&self, headers: Vec>) -> Result<(), Self::Error>; + + /// Returns the light client's chain tip + async fn get_best_height(&self) -> Result; + + /// Returns the block hash stored at a given height, + /// this is assumed to be in little-endian format + /// + /// # Arguments + /// + /// * `height` - Height of the block to fetch + async fn get_block_hash(&self, height: u32) -> Result, Self::Error>; + + /// Returns true if the block described by the hash + /// has been stored in the light client + /// + /// # Arguments + /// + /// * `hash_le` - Hash (little-endian) of the block + async fn is_block_stored(&self, hash_le: Vec) -> Result; +} diff --git a/vault/src/relay/mod.rs b/bitcoin/src/relay/mod.rs similarity index 77% rename from vault/src/relay/mod.rs rename to bitcoin/src/relay/mod.rs index 007f319cc..9c478ff81 100644 --- a/vault/src/relay/mod.rs +++ b/bitcoin/src/relay/mod.rs @@ -1,23 +1,18 @@ -use crate::{service::DynBitcoinCoreApi, Error as VaultError}; -use runtime::InterBtcParachain; use std::{sync::Arc, time::Duration}; use tokio::time::sleep; - -use crate::delay::RandomDelay; - mod backing; mod error; mod issuing; pub use backing::Backing; pub use error::Error; -pub use issuing::Issuing; +pub use issuing::{Issuing, RandomDelay}; // 10 minutes = 600 seconds const SLEEP_TIME: Duration = Duration::from_secs(600); /// Retrieves `batch` blocks starting at block `height` from the backing blockchain -async fn collect_headers(height: u32, batch: u32, cli: &impl Backing) -> Result>, Error> { +async fn collect_headers(height: u32, batch: u32, cli: &impl Backing) -> Result>, Error> { let mut headers = Vec::new(); for h in height..height + batch { headers.push( @@ -32,24 +27,30 @@ async fn collect_headers(height: u32, batch: u32, cli: &impl Backing) -> Result< /// Computes the height at which the relayer should start to submit blocks. /// In most cases it should be from the next block after the highest block /// stored by the issuing blockchain -async fn compute_start_height(backing: &impl Backing, issuing: &impl Issuing) -> Result { - let mut start_height = issuing.get_best_height().await?; +async fn compute_start_height(backing: &impl Backing, issuing: &T) -> Result> { + let mut start_height = issuing.get_best_height().await.map_err(Error::RuntimeError)?; // check backing for discrepancy - let mut relay_hash = issuing.get_block_hash(start_height).await?; + let mut relay_hash = issuing + .get_block_hash(start_height) + .await + .map_err(Error::RuntimeError)?; let mut btc_hash = backing.get_block_hash(start_height).await?; // backwards pass while relay_hash != btc_hash { start_height = start_height.checked_sub(1).ok_or(Error::NotInitialized)?; - relay_hash = issuing.get_block_hash(start_height).await?; + relay_hash = issuing + .get_block_hash(start_height) + .await + .map_err(Error::RuntimeError)?; btc_hash = backing.get_block_hash(start_height).await?; } // forward pass (possible forks) loop { match backing.get_block_hash(start_height).await { - Ok(h) if issuing.is_block_stored(h.clone()).await? => { + Ok(h) if issuing.is_block_stored(h.clone()).await.map_err(Error::RuntimeError)? => { start_height = start_height.saturating_add(1); } _ => break, @@ -76,11 +77,11 @@ pub struct Config { pub struct Runner { backing: B, issuing: I, - random_delay: Arc>, start_height: Option, max_batch_size: u32, interval: Duration, btc_confirmations: u32, + random_delay: Arc + Send + Sync>>, } impl Runner { @@ -88,7 +89,7 @@ impl Runner { backing: B, issuing: I, conf: Config, - random_delay: Arc>, + random_delay: Arc + Send + Sync>>, ) -> Runner { Runner { backing, @@ -102,19 +103,19 @@ impl Runner { } /// Returns the block header at `height` - async fn get_block_header(&self, height: u32) -> Result, Error> { + async fn get_block_header(&self, height: u32) -> Result, Error> { loop { match self.backing.get_block_header(height).await? { Some(header) => return Ok(header), None => { - tracing::trace!("No block found at height {}, sleeping for {:?}", height, self.interval); + println!("No block found at height {}, sleeping for {:?}", height, self.interval); sleep(self.interval).await } }; } } - async fn get_num_confirmed_blocks(&self) -> Result { + async fn get_num_confirmed_blocks(&self) -> Result> { Ok(self .backing .get_block_count() @@ -124,22 +125,23 @@ impl Runner { /// Submit the next block(s) or initialize the relay, /// may submit up to `max_batch_size` blocks at a time - pub async fn submit_next(&self) -> Result<(), Error> { - if !self.issuing.is_initialized().await? { + pub async fn submit_next(&self) -> Result<(), Error> { + if !self.issuing.is_initialized().await.map_err(Error::RuntimeError)? { let start_height = self.start_height.unwrap_or(self.get_num_confirmed_blocks().await?); - tracing::info!("Initializing at height {}", start_height); + println!("Initializing at height {}", start_height); self.issuing .initialize( self.backing.get_block_header(start_height).await?.unwrap(), start_height, ) - .await?; + .await + .map_err(Error::RuntimeError)?; } let max_height = self.get_num_confirmed_blocks().await?; - tracing::trace!("Backing height: {}", max_height); + println!("Backing height: {}", max_height); let current_height = compute_start_height(&self.backing, &self.issuing).await?; - tracing::trace!("Issuing height: {}", current_height); + println!("Issuing height: {}", current_height); let batch_size = if current_height.saturating_add(self.max_batch_size) > max_height { max_height.saturating_add(1).saturating_sub(current_height) @@ -150,29 +152,33 @@ impl Runner { match batch_size { 0 => { // nothing to submit right now. Wait a little while - tracing::trace!("Waiting for the next Bitcoin block..."); + println!("Waiting for the next Bitcoin block..."); sleep(self.interval).await; } 1 => { // submit a single block header - tracing::info!("Processing block at height {}", current_height); + println!("Processing block at height {}", current_height); let header = self.get_block_header(current_height).await?; // TODO: check if block already stored self.issuing .submit_block_header(header, self.random_delay.clone()) - .await?; - tracing::info!("Submitted block at height {}", current_height); + .await + .map_err(Error::RuntimeError)?; + println!("Submitted block at height {}", current_height); } _ => { - tracing::info!( + println!( "Processing blocks {} -> {} [{}]", current_height, current_height + batch_size, batch_size ); let headers = collect_headers(current_height, batch_size, &self.backing).await?; - self.issuing.submit_block_header_batch(headers).await?; - tracing::info!( + self.issuing + .submit_block_header_batch(headers) + .await + .map_err(Error::RuntimeError)?; + println!( "Submitted blocks {} -> {} [{}]", current_height, current_height + batch_size, @@ -185,31 +191,12 @@ impl Runner { } } -pub async fn run_relayer(runner: Runner) -> Result<(), VaultError> { - loop { - match runner.submit_next().await { - Ok(_) => (), - Err(Error::RuntimeError(ref err)) if err.is_duplicate_block() => { - tracing::info!("Attempted to submit block that already exists") - } - Err(Error::RuntimeError(ref err)) if err.is_rpc_disconnect_error() => { - return Err(VaultError::ClientShutdown); - } - Err(Error::BitcoinError(err)) if err.is_transport_error() => { - return Err(VaultError::ClientShutdown); - } - Err(err) => { - tracing::error!("Failed to submit_next: {}", err); - } - } - } -} - #[cfg(test)] mod tests { - use crate::delay::{RandomDelay, ZeroDelay}; + use crate::relay::RandomDelay; use super::*; + use async_trait::async_trait; use std::{ cell::{Ref, RefCell, RefMut}, @@ -217,6 +204,17 @@ mod tests { rc::Rc, }; + #[derive(Clone, Debug)] + pub struct ZeroDelay; + + #[async_trait] + impl RandomDelay for ZeroDelay { + type Error = DummyError; + async fn delay(&self, _seed_data: &[u8; 32]) -> Result<(), Self::Error> { + Ok(()) + } + } + struct DummyIssuing { headers: Rc>>>, } @@ -239,29 +237,38 @@ mod tests { } } + #[derive(Debug, PartialEq)] + pub enum DummyError { + AlreadyInitialized, + CannotFetchBestHeight, + BlockExists, + BlockHashNotFound, + } #[async_trait] impl Issuing for DummyIssuing { - async fn is_initialized(&self) -> Result { + type Error = DummyError; + + async fn is_initialized(&self) -> Result { Ok(!self.get_headers().is_empty()) } - async fn initialize(&self, header: Vec, height: u32) -> Result<(), Error> { + async fn initialize(&self, header: Vec, height: u32) -> Result<(), Self::Error> { if self.get_headers().is_empty() { self.get_headers_mut().insert(height, header); Ok(()) } else { - Err(Error::AlreadyInitialized) + Err(DummyError::AlreadyInitialized) } } async fn submit_block_header( &self, header: Vec, - _random_delay: Arc>, - ) -> Result<(), Error> { + _random_delay: Arc + Send + Sync>>, + ) -> Result<(), Self::Error> { let is_stored = self.is_block_stored(header.clone()).await?; if is_stored { - Err(Error::BlockExists) + Err(DummyError::BlockExists) } else { let height = self.get_best_height().await? + 1; // NOTE: assume hash(header) == header @@ -270,7 +277,7 @@ mod tests { } } - async fn submit_block_header_batch(&self, headers: Vec>) -> Result<(), Error> { + async fn submit_block_header_batch(&self, headers: Vec>) -> Result<(), Self::Error> { for header in headers { self.submit_block_header(header.to_vec(), Arc::new(Box::new(ZeroDelay))) .await?; @@ -278,19 +285,22 @@ mod tests { Ok(()) } - async fn get_best_height(&self) -> Result { + async fn get_best_height(&self) -> Result { self.get_headers() .keys() .max() .copied() - .ok_or(Error::CannotFetchBestHeight) + .ok_or(DummyError::CannotFetchBestHeight) } - async fn get_block_hash(&self, height: u32) -> Result, Error> { - self.get_headers().get(&height).cloned().ok_or(Error::BlockHashNotFound) + async fn get_block_hash(&self, height: u32) -> Result, Self::Error> { + self.get_headers() + .get(&height) + .cloned() + .ok_or(DummyError::BlockHashNotFound) } - async fn is_block_stored(&self, hash: Vec) -> Result { + async fn is_block_stored(&self, hash: Vec) -> Result { Ok(self.get_headers().iter().any(|(_, h)| &h[..] == &hash[..])) } } @@ -307,16 +317,16 @@ mod tests { #[async_trait] impl Backing for DummyBacking { - async fn get_block_count(&self) -> Result { - self.hashes.keys().max().copied().ok_or(Error::CannotFetchBestHeight) + async fn get_block_count(&self) -> Result { + self.hashes.keys().max().copied().ok_or(crate::Error::ConnectionRefused) // arbitrary error } - async fn get_block_header(&self, height: u32) -> Result>, Error> { + async fn get_block_header(&self, height: u32) -> Result>, crate::Error> { Ok(self.hashes.get(&height).cloned()) } - async fn get_block_hash(&self, height: u32) -> Result, Error> { - self.hashes.get(&height).cloned().ok_or(Error::BlockHashNotFound) + async fn get_block_hash(&self, height: u32) -> Result, crate::Error> { + self.hashes.get(&height).cloned().ok_or(crate::Error::ConnectionRefused) // arbitrary error } } @@ -335,18 +345,18 @@ mod tests { assert_eq!( issuing.initialize(make_hash("x"), 1).await, - Err(Error::AlreadyInitialized) + Err(DummyError::AlreadyInitialized) ); assert_eq!(issuing.get_best_height().await, Ok(4)); assert_eq!(issuing.get_block_hash(2).await, Ok(make_hash("a"))); - assert_eq!(issuing.get_block_hash(5).await, Err(Error::BlockHashNotFound)); + assert_eq!(issuing.get_block_hash(5).await, Err(DummyError::BlockHashNotFound)); assert_eq!(issuing.is_block_stored(make_hash("a")).await, Ok(true)); assert_eq!(issuing.is_block_stored(make_hash("x")).await, Ok(false)); assert_eq!( issuing .submit_block_header(make_hash("a"), Arc::new(Box::new(ZeroDelay))) .await, - Err(Error::BlockExists) + Err(DummyError::BlockExists) ); assert_eq!( issuing @@ -362,7 +372,7 @@ mod tests { let hashes = make_hashes(vec![(2, "a"), (3, "b"), (4, "c")]); let backing = DummyBacking::new(hashes.clone()); let issuing = DummyIssuing::new(hashes); - assert_eq!(Ok(5), compute_start_height(&backing, &issuing).await); + assert_eq!(5, compute_start_height(&backing, &issuing).await.unwrap()); } #[tokio::test] @@ -371,7 +381,7 @@ mod tests { let issuing_hashes = make_hashes(vec![(2, "a"), (3, "b")]); let backing = DummyBacking::new(backing_hashes); let issuing = DummyIssuing::new(issuing_hashes); - assert_eq!(Ok(4), compute_start_height(&backing, &issuing).await); + assert_eq!(4, compute_start_height(&backing, &issuing).await.unwrap()); } #[tokio::test] @@ -381,11 +391,11 @@ mod tests { let issuing_hashes = make_hashes(vec![(2, "a"), (3, "b"), (4, "d"), (0, "c")]); let backing = DummyBacking::new(backing_hashes); let issuing = DummyIssuing::new(issuing_hashes); - assert_eq!(Ok(5), compute_start_height(&backing, &issuing).await); + assert_eq!(5, compute_start_height(&backing, &issuing).await.unwrap()); } #[tokio::test] - async fn new_runner_with_best() -> Result<(), Error> { + async fn new_runner_with_best() { let hashes = make_hashes(vec![(2, "a"), (3, "b"), (4, "c")]); let backing = DummyBacking::new(hashes.clone()); let issuing = DummyIssuing::new(hashes); @@ -402,11 +412,10 @@ mod tests { ); assert_eq!(runner.issuing.get_best_height().await.unwrap(), 4); - Ok(()) } #[tokio::test] - async fn catchup_when_out_of_sync() -> Result<(), Error> { + async fn catchup_when_out_of_sync() { let backing_hashes = make_hashes(vec![(2, "a"), (3, "b"), (4, "c"), (5, "d"), (6, "e")]); let issuing_hashes = make_hashes(vec![(2, "a"), (3, "b")]); let backing = DummyBacking::new(backing_hashes); @@ -423,24 +432,23 @@ mod tests { Arc::new(Box::new(ZeroDelay)), ); - let height_before = runner.issuing.get_best_height().await?; + let height_before = runner.issuing.get_best_height().await.unwrap(); assert_eq!(height_before, 3); - runner.submit_next().await?; - let height_after = runner.issuing.get_best_height().await?; + runner.submit_next().await.unwrap(); + let height_after = runner.issuing.get_best_height().await.unwrap(); assert_eq!(height_after, 6); - let best_height = runner.backing.get_block_count().await?; + let best_height = runner.backing.get_block_count().await.unwrap(); assert_eq!(height_after, best_height); - assert!(runner.issuing.is_block_stored(make_hash("c")).await?); - assert!(runner.issuing.is_block_stored(make_hash("d")).await?); - assert!(runner.issuing.is_block_stored(make_hash("e")).await?); - Ok(()) + assert!(runner.issuing.is_block_stored(make_hash("c")).await.unwrap()); + assert!(runner.issuing.is_block_stored(make_hash("d")).await.unwrap()); + assert!(runner.issuing.is_block_stored(make_hash("e")).await.unwrap()); } #[tokio::test] - async fn submit_next_success() -> Result<(), Error> { + async fn submit_next_success() { let backing_hashes = make_hashes(vec![(2, "a"), (3, "b"), (4, "c"), (5, "d")]); let issuing_hashes = make_hashes(vec![(2, "a"), (3, "b")]); let backing = DummyBacking::new(backing_hashes); @@ -457,20 +465,19 @@ mod tests { Arc::new(Box::new(ZeroDelay)), ); - let height_before = runner.issuing.get_best_height().await?; + let height_before = runner.issuing.get_best_height().await.unwrap(); assert_eq!(height_before, 3); - runner.submit_next().await?; - let height_after = runner.issuing.get_best_height().await?; + runner.submit_next().await.unwrap(); + let height_after = runner.issuing.get_best_height().await.unwrap(); assert_eq!(height_after, 4); - assert!(runner.issuing.is_block_stored(make_hash("c")).await?); - assert!(!runner.issuing.is_block_stored(make_hash("d")).await?); - Ok(()) + assert!(runner.issuing.is_block_stored(make_hash("c")).await.unwrap()); + assert!(!runner.issuing.is_block_stored(make_hash("d")).await.unwrap()); } #[tokio::test] - async fn submit_next_with_1_confirmation_batch_submission_succeeds() -> Result<(), Error> { + async fn submit_next_with_1_confirmation_batch_submission_succeeds() { let backing_hashes = make_hashes(vec![(2, "a"), (3, "b"), (4, "c"), (5, "d")]); let issuing_hashes = make_hashes(vec![(2, "a")]); let backing = DummyBacking::new(backing_hashes); @@ -487,23 +494,22 @@ mod tests { Arc::new(Box::new(ZeroDelay)), ); - let height_before = runner.issuing.get_best_height().await?; + let height_before = runner.issuing.get_best_height().await.unwrap(); assert_eq!(height_before, 2); - runner.submit_next().await?; - runner.submit_next().await?; + runner.submit_next().await.unwrap(); + runner.submit_next().await.unwrap(); - let height_after = runner.issuing.get_best_height().await?; + let height_after = runner.issuing.get_best_height().await.unwrap(); assert_eq!(height_after, 4); - assert!(runner.issuing.is_block_stored(make_hash("c")).await?); + assert!(runner.issuing.is_block_stored(make_hash("c")).await.unwrap()); // this block has not been confirmed yet, so we should not have submitted it - assert!(!runner.issuing.is_block_stored(make_hash("d")).await?); - Ok(()) + assert!(!runner.issuing.is_block_stored(make_hash("d")).await.unwrap()); } #[tokio::test] - async fn submit_next_with_1_confirmation_single_submission_succeeds() -> Result<(), Error> { + async fn submit_next_with_1_confirmation_single_submission_succeeds() { let backing_hashes = make_hashes(vec![(2, "a"), (3, "b"), (4, "c"), (5, "d")]); let issuing_hashes = make_hashes(vec![(2, "a")]); let backing = DummyBacking::new(backing_hashes); @@ -520,24 +526,23 @@ mod tests { Arc::new(Box::new(ZeroDelay)), ); - let height_before = runner.issuing.get_best_height().await?; + let height_before = runner.issuing.get_best_height().await.unwrap(); assert_eq!(height_before, 2); for _ in 0..10 { - runner.submit_next().await?; + runner.submit_next().await.unwrap(); } - let height_after = runner.issuing.get_best_height().await?; + let height_after = runner.issuing.get_best_height().await.unwrap(); assert_eq!(height_after, 4); - assert!(runner.issuing.is_block_stored(make_hash("c")).await?); + assert!(runner.issuing.is_block_stored(make_hash("c")).await.unwrap()); // this block has not been confirmed yet, so we should not have submitted it - assert!(!runner.issuing.is_block_stored(make_hash("d")).await?); - Ok(()) + assert!(!runner.issuing.is_block_stored(make_hash("d")).await.unwrap()); } #[tokio::test] - async fn submit_next_with_2_confirmation_succeeds() -> Result<(), Error> { + async fn submit_next_with_2_confirmation_succeeds() { let backing_hashes = make_hashes(vec![(2, "a"), (3, "b"), (4, "c"), (5, "d")]); let issuing_hashes = make_hashes(vec![(2, "a")]); let backing = DummyBacking::new(backing_hashes); @@ -554,21 +559,20 @@ mod tests { Arc::new(Box::new(ZeroDelay)), ); - let height_before = runner.issuing.get_best_height().await?; + let height_before = runner.issuing.get_best_height().await.unwrap(); assert_eq!(height_before, 2); for _ in 0..10 { - runner.submit_next().await?; + runner.submit_next().await.unwrap(); } - let height_after = runner.issuing.get_best_height().await?; + let height_after = runner.issuing.get_best_height().await.unwrap(); assert_eq!(height_after, 3); - assert!(runner.issuing.is_block_stored(make_hash("b")).await?); + assert!(runner.issuing.is_block_stored(make_hash("b")).await.unwrap()); // these blocks have not been confirmed yet, so we should not have submitted it - assert!(!runner.issuing.is_block_stored(make_hash("c")).await?); - assert!(!runner.issuing.is_block_stored(make_hash("d")).await?); - Ok(()) + assert!(!runner.issuing.is_block_stored(make_hash("c")).await.unwrap()); + assert!(!runner.issuing.is_block_stored(make_hash("d")).await.unwrap()); } } diff --git a/runtime/src/error.rs b/runtime/src/error.rs index aec11e1f2..d494b7b72 100644 --- a/runtime/src/error.rs +++ b/runtime/src/error.rs @@ -64,6 +64,8 @@ pub enum Error { CurrencyNotFound, #[error("Operation not supported on token variant")] TokenUnsupported, + #[error("Error decoding hex string")] + HexDecodeError, #[error("Client does not support spec_version: expected {0}..{1}, got {2}")] InvalidSpecVersion(u32, u32, u32), diff --git a/vault/src/delay.rs b/vault/src/delay.rs index b060471cd..3d32feb7d 100644 --- a/vault/src/delay.rs +++ b/vault/src/delay.rs @@ -1,13 +1,8 @@ use async_trait::async_trait; -use bitcoin::{sha256, Hash}; +use bitcoin::{relay::RandomDelay, sha256, Hash}; use runtime::{AccountId, Error as RuntimeError, InterBtcParachain, UtilFuncs, VaultRegistryPallet}; use std::fmt; -#[async_trait] -pub trait RandomDelay: fmt::Debug { - async fn delay(&self, seed_data: &[u8; 32]) -> Result<(), RuntimeError>; -} - #[derive(Clone)] pub struct OrderedVaultsDelay { btc_parachain: InterBtcParachain, @@ -38,6 +33,7 @@ impl fmt::Debug for OrderedVaultsDelay { #[async_trait] impl RandomDelay for OrderedVaultsDelay { + type Error = RuntimeError; /// Calculates a delay based on randomly ordering the vaults by hashing their /// account ID with a piece of seed data. /// Then awaits a corresponding amount of blocks on the parachain, with @@ -71,6 +67,7 @@ pub struct ZeroDelay; #[async_trait] impl RandomDelay for ZeroDelay { + type Error = RuntimeError; async fn delay(&self, _seed_data: &[u8; 32]) -> Result<(), RuntimeError> { Ok(()) } diff --git a/vault/src/issue.rs b/vault/src/issue.rs index 9bc8dea54..ac6237ba6 100644 --- a/vault/src/issue.rs +++ b/vault/src/issue.rs @@ -1,12 +1,12 @@ use crate::{ - delay::RandomDelay, metrics::publish_expected_bitcoin_balance, service::DynBitcoinCoreApi, system::DatabaseConfig, - Error, Event, IssueRequests, VaultIdManager, + metrics::publish_expected_bitcoin_balance, service::DynBitcoinCoreApi, system::DatabaseConfig, Error, Event, + IssueRequests, VaultIdManager, }; -use bitcoin::{BlockHash, Error as BitcoinError, Hash, PublicKey, Transaction, TransactionExt}; +use bitcoin::{relay::RandomDelay, BlockHash, Error as BitcoinError, Hash, PublicKey, Transaction, TransactionExt}; use futures::{channel::mpsc::Sender, future, SinkExt, StreamExt, TryFutureExt}; use runtime::{ - AccountId, BtcAddress, BtcPublicKey, BtcRelayPallet, CancelIssueEvent, ExecuteIssueEvent, H256Le, - InterBtcIssueRequest, InterBtcParachain, IssuePallet, IssueRequestStatus, PartialAddress, PrettyPrint, + AccountId, BtcAddress, BtcPublicKey, BtcRelayPallet, CancelIssueEvent, Error as RuntimeError, ExecuteIssueEvent, + H256Le, InterBtcIssueRequest, InterBtcParachain, IssuePallet, IssueRequestStatus, PartialAddress, PrettyPrint, RequestIssueEvent, UtilFuncs, H256, }; use sha2::{Digest, Sha256}; @@ -46,7 +46,7 @@ pub async fn process_issue_requests( issue_set: Arc, btc_start_height: u32, num_confirmations: u32, - random_delay: Arc>, + random_delay: Arc + Send + Sync>>, ) -> Result<(), Error> { // NOTE: we should not stream transactions if using the light client // since it is quite expensive to fetch all transactions per block @@ -228,7 +228,7 @@ async fn process_transaction_and_execute_issue( num_confirmations: u32, block_hash: BlockHash, transaction: Transaction, - random_delay: Arc>, + random_delay: Arc + Send + Sync>>, ) -> Result<(), Error> { let addresses: Vec = transaction .extract_output_addresses() diff --git a/vault/src/lib.rs b/vault/src/lib.rs index 8dac97932..c0403d82a 100644 --- a/vault/src/lib.rs +++ b/vault/src/lib.rs @@ -31,9 +31,9 @@ pub mod service { }, metrics::monitor_bridge_metrics, redeem::listen_for_redeem_requests, - relay::{Config, Runner}, replace::{listen_for_accept_replace, listen_for_execute_replace, listen_for_replace_requests}, }; + pub use bitcoin::relay::{Config, Runner}; } use governor::Quota; use nonzero_ext::*; @@ -43,7 +43,7 @@ pub use system::{VaultService, VaultServiceConfig, ABOUT, AUTHORS, NAME, VERSION use runtime::{InterBtcParachain, VaultId, VaultRegistryPallet}; pub use crate::{cancellation::Event, error::Error, types::IssueRequests}; -pub use delay::{OrderedVaultsDelay, RandomDelay, ZeroDelay}; +pub use delay::{OrderedVaultsDelay, ZeroDelay}; pub use system::VaultIdManager; pub(crate) async fn deposit_collateral(api: &InterBtcParachain, vault_id: &VaultId, amount: u128) -> Result<(), Error> { diff --git a/vault/src/relay.rs b/vault/src/relay.rs new file mode 100644 index 000000000..ac49ae918 --- /dev/null +++ b/vault/src/relay.rs @@ -0,0 +1,102 @@ +use crate::Error; +use async_trait::async_trait; +use bitcoin::{ + relay::{Error as RelayError, Issuing, RandomDelay, Runner}, + sha256, DynBitcoinCoreApi, Hash, +}; +use runtime::{BtcRelayPallet, Error as RuntimeError, H256Le, InterBtcParachain, RawBlockHeader}; +use std::sync::Arc; + +pub struct SubxtIssuer(InterBtcParachain); + +impl From for SubxtIssuer { + fn from(value: InterBtcParachain) -> Self { + Self(value) + } +} + +#[async_trait] +impl Issuing for SubxtIssuer { + type Error = RuntimeError; + async fn is_initialized(&self) -> Result { + let hash = BtcRelayPallet::get_best_block(&self.0).await?; + Ok(!hash.is_zero()) + } + + async fn initialize(&self, header: Vec, height: u32) -> Result<(), RuntimeError> { + BtcRelayPallet::initialize_btc_relay(&self.0, RawBlockHeader(header), height) + .await + .map_err(Into::into) + } + + #[tracing::instrument(name = "submit_block_header", skip(self, header))] + async fn submit_block_header( + &self, + header: Vec, + random_delay: Arc + Send + Sync>>, + ) -> Result<(), RuntimeError> { + let raw_block_header = RawBlockHeader(header.clone()); + + // wait a random amount of blocks, to avoid all vaults flooding the parachain with + // this transaction + (*random_delay) + .delay(sha256::Hash::hash(header.as_slice()).as_byte_array()) + .await?; + if self + .is_block_stored(raw_block_header.hash().to_bytes_le().to_vec()) + .await? + { + return Ok(()); + } + BtcRelayPallet::store_block_header(&self.0, raw_block_header) + .await + .map_err(Into::into) + } + + #[tracing::instrument(name = "submit_block_header_batch", skip(self, headers))] + async fn submit_block_header_batch(&self, headers: Vec>) -> Result<(), RuntimeError> { + BtcRelayPallet::store_block_headers( + &self.0, + headers + .iter() + .map(|header| RawBlockHeader(header.to_vec())) + .collect::>(), + ) + .await + .map_err(Into::into) + } + + async fn get_best_height(&self) -> Result { + BtcRelayPallet::get_best_block_height(&self.0).await.map_err(Into::into) + } + + async fn get_block_hash(&self, height: u32) -> Result, RuntimeError> { + let hash = BtcRelayPallet::get_block_hash(&self.0, height).await?; + hex::decode(hash.to_hex_le()).map_err(|_| RuntimeError::HexDecodeError) + } + + async fn is_block_stored(&self, hash_le: Vec) -> Result { + let head = BtcRelayPallet::get_block_header(&self.0, H256Le::from_bytes_le(&hash_le)).await?; + Ok(head.block_height > 0) + } +} + +pub async fn run_relayer(runner: Runner) -> Result<(), Error> { + loop { + match runner.submit_next().await { + Ok(_) => (), + Err(RelayError::RuntimeError(ref err)) if err.is_duplicate_block() => { + println!("Attempted to submit block that already exists") + } + Err(RelayError::RuntimeError(ref err)) if err.is_rpc_disconnect_error() => { + return Err(Error::ClientShutdown); + } + Err(RelayError::BitcoinError(err)) if err.is_transport_error() => { + return Err(Error::ClientShutdown); + } + Err(err) => { + println!("Failed to submit_next: {}", err); + } + } + } +} diff --git a/vault/src/relay/issuing.rs b/vault/src/relay/issuing.rs deleted file mode 100644 index 7f1a5f85d..000000000 --- a/vault/src/relay/issuing.rs +++ /dev/null @@ -1,122 +0,0 @@ -use super::Error; -use crate::delay::RandomDelay; -use async_trait::async_trait; -use bitcoin::{sha256, Hash}; -use runtime::{BtcRelayPallet, H256Le, InterBtcParachain, RawBlockHeader}; -use std::sync::Arc; - -#[async_trait] -pub trait Issuing { - /// Returns true if the light client is initialized - async fn is_initialized(&self) -> Result; - - /// Initialize the light client - /// - /// # Arguments - /// - /// * `header` - Raw block header - /// * `height` - Starting height - async fn initialize(&self, header: Vec, height: u32) -> Result<(), Error>; - - /// Submit a block header and wait for inclusion - /// - /// # Arguments - /// - /// * `header` - Raw block header - async fn submit_block_header( - &self, - header: Vec, - random_delay: Arc>, - ) -> Result<(), Error>; - - /// Submit a batch of block headers and wait for inclusion - /// - /// # Arguments - /// - /// * `headers` - Raw block headers (multiple of 80 bytes) - async fn submit_block_header_batch(&self, headers: Vec>) -> Result<(), Error>; - - /// Returns the light client's chain tip - async fn get_best_height(&self) -> Result; - - /// Returns the block hash stored at a given height, - /// this is assumed to be in little-endian format - /// - /// # Arguments - /// - /// * `height` - Height of the block to fetch - async fn get_block_hash(&self, height: u32) -> Result, Error>; - - /// Returns true if the block described by the hash - /// has been stored in the light client - /// - /// # Arguments - /// - /// * `hash_le` - Hash (little-endian) of the block - async fn is_block_stored(&self, hash_le: Vec) -> Result; -} - -#[async_trait] -impl Issuing for InterBtcParachain { - async fn is_initialized(&self) -> Result { - let hash = BtcRelayPallet::get_best_block(self).await?; - Ok(!hash.is_zero()) - } - - async fn initialize(&self, header: Vec, height: u32) -> Result<(), Error> { - BtcRelayPallet::initialize_btc_relay(self, RawBlockHeader(header), height) - .await - .map_err(Into::into) - } - - #[tracing::instrument(name = "submit_block_header", skip(self, header))] - async fn submit_block_header( - &self, - header: Vec, - random_delay: Arc>, - ) -> Result<(), Error> { - let raw_block_header = RawBlockHeader(header.clone()); - - // wait a random amount of blocks, to avoid all vaults flooding the parachain with - // this transaction - (*random_delay) - .delay(sha256::Hash::hash(header.as_slice()).as_byte_array()) - .await?; - if self - .is_block_stored(raw_block_header.hash().to_bytes_le().to_vec()) - .await? - { - return Ok(()); - } - BtcRelayPallet::store_block_header(self, raw_block_header) - .await - .map_err(Into::into) - } - - #[tracing::instrument(name = "submit_block_header_batch", skip(self, headers))] - async fn submit_block_header_batch(&self, headers: Vec>) -> Result<(), Error> { - BtcRelayPallet::store_block_headers( - self, - headers - .iter() - .map(|header| RawBlockHeader(header.to_vec())) - .collect::>(), - ) - .await - .map_err(Into::into) - } - - async fn get_best_height(&self) -> Result { - BtcRelayPallet::get_best_block_height(self).await.map_err(Into::into) - } - - async fn get_block_hash(&self, height: u32) -> Result, Error> { - let hash = BtcRelayPallet::get_block_hash(self, height).await?; - hex::decode(hash.to_hex_le()).map_err(|_| Error::DecodeHash) - } - - async fn is_block_stored(&self, hash_le: Vec) -> Result { - let head = BtcRelayPallet::get_block_header(self, H256Le::from_bytes_le(&hash_le)).await?; - Ok(head.block_height > 0) - } -} diff --git a/vault/src/system.rs b/vault/src/system.rs index 156af48d0..8cf3ac69c 100644 --- a/vault/src/system.rs +++ b/vault/src/system.rs @@ -1,5 +1,5 @@ use crate::{ - delay::{OrderedVaultsDelay, RandomDelay, ZeroDelay}, + delay::{OrderedVaultsDelay, ZeroDelay}, error::Error, faucet, issue, metrics::{poll_metrics, publish_tokio_metrics, PerCurrencyMetrics}, @@ -9,7 +9,7 @@ use crate::{ }; use async_trait::async_trait; use backoff::Error as BackoffError; -use bitcoin::{Address, ConversionError, Error as BitcoinError, Network, PublicKey}; +use bitcoin::{relay::RandomDelay, Address, ConversionError, Error as BitcoinError, Network, PublicKey}; use clap::Parser; use futures::{ channel::{mpsc, mpsc::Sender}, @@ -686,7 +686,8 @@ impl VaultService { let oldest_issue_btc_height = issue::initialize_issue_set(&self.btc_rpc_master_wallet, &self.btc_parachain, &issue_set).await?; - let random_delay: Arc> = if self.config.no_random_delay { + let random_delay: Arc + Send + Sync>> = if self.config.no_random_delay + { Arc::new(Box::new(ZeroDelay)) } else { Arc::new(Box::new( @@ -826,7 +827,7 @@ impl VaultService { !self.config.no_bitcoin_block_relay, run_relayer(Runner::new( self.btc_rpc_master_wallet.clone(), - self.btc_parachain.clone(), + self.btc_parachain.clone().into(), Config { start_height: self.config.bitcoin_relay_start_height, max_batch_size: self.config.max_batch_size, diff --git a/vault/tests/vault_integration_tests.rs b/vault/tests/vault_integration_tests.rs index 1c249c3e8..3b5e35fe7 100644 --- a/vault/tests/vault_integration_tests.rs +++ b/vault/tests/vault_integration_tests.rs @@ -969,12 +969,11 @@ impl InterBtcParachainExt for InterBtcParachain { #[cfg(feature = "uses-bitcoind")] mod test_with_bitcoind { - use bitcoin::{BitcoinCore, BitcoinCoreApi, Hash, Transaction, TransactionExt}; + use super::*; + use bitcoin::{relay::Config, BitcoinCore, BitcoinCoreApi, Hash, Transaction, TransactionExt}; use runtime::BtcRelayPallet; use std::cmp::max; - use vault::{delay::ZeroDelay, relay::Config, service::Runner}; - - use super::*; + use vault::{delay::ZeroDelay, service::Runner}; async fn get_bitcoin_core() -> BitcoinCore { use bitcoin::{cli::BitcoinOpts, Network}; @@ -1190,7 +1189,7 @@ mod test_with_bitcoind { let height = bitcoin_core.get_block_count().await.unwrap() as u32; let relayer = Runner::new( btc_rpc.clone(), - user_provider.clone(), + user_provider.clone().into(), Config { start_height: Some(max(1, height.saturating_sub(200))), /* important to skip the genesis block * since it has nVersion < 4, so it would