diff --git a/Cargo.lock b/Cargo.lock index 80d394344..566ea0837 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2011,6 +2011,7 @@ dependencies = [ "runtime", "serde", "serde_json", + "service", "sp-keyring", "thiserror", "tokio 0.2.25", @@ -2743,7 +2744,7 @@ dependencies = [ "http 0.2.3", "indexmap", "slab", - "tokio 1.4.0", + "tokio 1.5.0", "tokio-util 0.6.5", "tracing", ] @@ -3045,7 +3046,7 @@ dependencies = [ "itoa", "pin-project 1.0.5", "socket2 0.3.19", - "tokio 1.4.0", + "tokio 1.5.0", "tower-service", "tracing", "want 0.3.0", @@ -3081,7 +3082,7 @@ dependencies = [ "log 0.4.14", "rustls 0.19.0", "rustls-native-certs 0.5.0", - "tokio 1.4.0", + "tokio 1.5.0", "tokio-rustls 0.22.0", "webpki", ] @@ -5963,6 +5964,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "feb3b2b1033b8a60b4da6ee470325f887758c95d5320f52f9ce0df055a55940e" +[[package]] +name = "polkabtc-telemetry-types" +version = "0.1.0" +source = "git+https://github.com/interlay/polkabtc-telemetry?rev=fb5fe36#fb5fe362f679f1eed60e8ba631540f0133f116f4" +dependencies = [ + "serde", + "serde_json", + "sp-core", +] + [[package]] name = "polkadot-core-primitives" version = "0.7.30" @@ -8301,6 +8312,28 @@ dependencies = [ "serde", ] +[[package]] +name = "service" +version = "0.6.0" +dependencies = [ + "async-trait", + "bitcoin 0.6.0", + "clap 3.0.0-beta.2", + "futures 0.3.13", + "hyper 0.13.10", + "hyper-tls", + "polkabtc-telemetry-types", + "runtime", + "serde", + "serde_json", + "sp-core", + "thiserror", + "tokio 0.2.25", + "tracing", + "tracing-futures", + "tracing-subscriber", +] + [[package]] name = "sha-1" version = "0.8.2" @@ -9233,6 +9266,7 @@ dependencies = [ "parity-scale-codec", "runtime", "serde", + "service", "sp-core", "sp-keyring", "thiserror", @@ -9774,9 +9808,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134af885d758d645f0f0505c9a8b3f9bf8a348fd822e112ab5248138348f1722" +checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5" dependencies = [ "autocfg 1.0.1", "bytes 1.0.1", @@ -9912,7 +9946,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ "rustls 0.19.0", - "tokio 1.4.0", + "tokio 1.5.0", "webpki", ] @@ -10046,7 +10080,7 @@ dependencies = [ "futures-sink", "log 0.4.14", "pin-project-lite 0.2.6", - "tokio 1.4.0", + "tokio 1.5.0", ] [[package]] @@ -10398,6 +10432,7 @@ dependencies = [ "runtime", "serde", "serde_json", + "service", "sha2 0.8.2", "sp-arithmetic", "sp-core", diff --git a/Cargo.toml b/Cargo.toml index ec1a04365..244289829 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,5 +6,6 @@ members = [ "testdata-gen", "vault", "bitcoin", - "faucet" + "faucet", + "service" ] diff --git a/faucet/Cargo.toml b/faucet/Cargo.toml index fb689ad32..a72103b4c 100644 --- a/faucet/Cargo.toml +++ b/faucet/Cargo.toml @@ -24,6 +24,7 @@ git-version = "0.3.4" # Workspace dependencies runtime = { path = "../runtime" } +service = { path = "../service" } [dev-dependencies] diff --git a/faucet/src/main.rs b/faucet/src/main.rs index 7bf9106de..a35d3ed83 100644 --- a/faucet/src/main.rs +++ b/faucet/src/main.rs @@ -5,7 +5,8 @@ mod system; use clap::Clap; use error::Error; use git_version::git_version; -use runtime::{substrate_subxt::PairSigner, ConnectionManager, PolkaBtcRuntime}; +use runtime::{substrate_subxt::PairSigner, PolkaBtcRuntime}; +use service::{ConnectionManager, ServiceConfig}; use system::{FaucetService, FaucetServiceConfig}; const VERSION: &str = git_version!(args = ["--tags"]); @@ -24,25 +25,13 @@ struct Opts { #[clap(flatten)] parachain: runtime::cli::ConnectionOpts, - /// Address to listen on for JSON-RPC requests. - #[clap(long, default_value = "[::0]:3033")] - http_addr: String, - - /// Comma separated list of allowed origins. - #[clap(long, default_value = "*")] - rpc_cors_domain: String, - - /// DOT allowance per request for regular users. - #[clap(long, default_value = "1")] - user_allowance: u128, - - /// DOT allowance per request for vaults. - #[clap(long, default_value = "500")] - vault_allowance: u128, + /// Settings specific to the faucet client. + #[clap(flatten)] + faucet: FaucetServiceConfig, - /// DOT allowance per request for vaults. - #[clap(long, default_value = "500")] - staked_relayer_allowance: u128, + /// General service settings. + #[clap(flatten)] + service: ServiceConfig, } #[tokio::main] @@ -55,20 +44,9 @@ async fn main() -> Result<(), Error> { let (key_pair, _) = opts.account_info.get_key_pair()?; let signer = PairSigner::::new(key_pair); - ConnectionManager::<_, _, FaucetService>::new( - opts.parachain.polka_btc_url.clone(), - signer.clone(), - FaucetServiceConfig { - http_addr: opts.http_addr.parse()?, - rpc_cors_domain: opts.rpc_cors_domain, - user_allowance: opts.user_allowance, - vault_allowance: opts.vault_allowance, - staked_relayer_allowance: opts.staked_relayer_allowance, - }, - opts.parachain.into(), - ) - .start() - .await?; + ConnectionManager::<(), _, FaucetService>::new(signer.clone(), opts.parachain, opts.service, opts.faucet) + .start() + .await?; Ok(()) } diff --git a/faucet/src/system.rs b/faucet/src/system.rs index 00db6b9d6..2c0a692c4 100644 --- a/faucet/src/system.rs +++ b/faucet/src/system.rs @@ -1,17 +1,33 @@ use crate::{http, Error}; use async_trait::async_trait; +use clap::Clap; use futures::future; use log::debug; -use runtime::{on_shutdown, wait_or_shutdown, Error as RuntimeError, PolkaBtcProvider, Service, ShutdownSender}; +use runtime::{Error as RuntimeError, PolkaBtcProvider}; +use service::{on_shutdown, wait_or_shutdown, Service, ShutdownSender}; use std::net::SocketAddr; -#[derive(Clone)] +#[derive(Clap, Clone)] pub struct FaucetServiceConfig { - pub http_addr: SocketAddr, - pub rpc_cors_domain: String, - pub user_allowance: u128, - pub vault_allowance: u128, - pub staked_relayer_allowance: u128, + /// Address to listen on for JSON-RPC requests. + #[clap(long, default_value = "[::0]:3033")] + http_addr: SocketAddr, + + /// Comma separated list of allowed origins. + #[clap(long, default_value = "*")] + rpc_cors_domain: String, + + /// DOT allowance per request for regular users. + #[clap(long, default_value = "1")] + user_allowance: u128, + + /// DOT allowance per request for vaults. + #[clap(long, default_value = "500")] + vault_allowance: u128, + + /// DOT allowance per request for vaults. + #[clap(long, default_value = "500")] + staked_relayer_allowance: u128, } pub struct FaucetService { @@ -21,12 +37,16 @@ pub struct FaucetService { } #[async_trait] -impl Service for FaucetService { - async fn initialize(_config: &FaucetServiceConfig) -> Result<(), RuntimeError> { - Ok(()) - } +impl Service<(), FaucetServiceConfig> for FaucetService { + const NAME: &'static str = env!("CARGO_PKG_NAME"); + const VERSION: &'static str = env!("CARGO_PKG_VERSION"); - fn new_service(btc_parachain: PolkaBtcProvider, config: FaucetServiceConfig, shutdown: ShutdownSender) -> Self { + fn new_service( + btc_parachain: PolkaBtcProvider, + _bitcoin_core: (), + config: FaucetServiceConfig, + shutdown: ShutdownSender, + ) -> Self { FaucetService::new(btc_parachain, config, shutdown) } @@ -74,6 +94,7 @@ impl FaucetService { close_handle.close(); }); + log::info!("Running..."); let _ = future::join(block_listener, http_server).await; Ok(()) diff --git a/runtime/src/cli.rs b/runtime/src/cli.rs index a9a9a7c44..b792c4aaf 100644 --- a/runtime/src/cli.rs +++ b/runtime/src/cli.rs @@ -1,12 +1,8 @@ -use crate::{ - error::{Error, KeyLoadingError}, - ConnectionManagerConfig, RestartPolicy, -}; - +use crate::error::{Error, KeyLoadingError}; use clap::Clap; use sp_core::{sr25519::Pair, Pair as _}; use sp_keyring::AccountKeyring; -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, num::ParseIntError, time::Duration}; #[derive(Clap, Debug, Clone)] pub struct ProviderUserOpts { @@ -55,6 +51,10 @@ fn get_credentials_from_file(file_path: &str, keyname: &str) -> Result Result { + Ok(Duration::from_millis(u64::from_str_radix(src, 10)?)) +} + #[derive(Clap, Debug, Clone)] pub struct ConnectionOpts { /// Parachain websocket URL. @@ -62,12 +62,8 @@ pub struct ConnectionOpts { pub polka_btc_url: String, /// Timeout in milliseconds to wait for connection to btc-parachain. - #[clap(long, default_value = "60000")] - pub polka_btc_connection_timeout_ms: u64, - - /// What to do if the connection to the btc-parachain drops. - #[clap(long, default_value = "always")] - pub restart_policy: RestartPolicy, + #[clap(long, parse(try_from_str = parse_duration_ms), default_value = "60000")] + pub polka_btc_connection_timeout_ms: Duration, /// Maximum number of concurrent requests #[clap(long)] @@ -77,14 +73,3 @@ pub struct ConnectionOpts { #[clap(long)] pub max_notifs_per_subscription: Option, } - -impl From for ConnectionManagerConfig { - fn from(opts: ConnectionOpts) -> ConnectionManagerConfig { - ConnectionManagerConfig { - connection_timeout: Duration::from_millis(opts.polka_btc_connection_timeout_ms), - restart_policy: opts.restart_policy, - max_concurrent_requests: opts.max_concurrent_requests, - max_notifs_per_subscription: opts.max_notifs_per_subscription, - } - } -} diff --git a/runtime/src/conn.rs b/runtime/src/conn.rs index 8d72c4e92..ab200c5a3 100644 --- a/runtime/src/conn.rs +++ b/runtime/src/conn.rs @@ -1,40 +1,19 @@ -use crate::{error::JsonRpseeError, Error, PolkaBtcSigner}; -use async_trait::async_trait; -use futures::{future::Either, Future, FutureExt}; +use crate::{error::JsonRpseeError, Error}; use jsonrpsee_ws_client::{WsClient, WsConfig}; -use log::{info, trace}; -use std::{marker::PhantomData, str::FromStr, sync::Arc, time::Duration}; -use substrate_subxt::RpcClient; +use std::time::Duration; use tokio::time::{delay_for, timeout}; const RETRY_TIMEOUT: Duration = Duration::from_millis(1000); const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); -pub type ShutdownSender = tokio::sync::broadcast::Sender>; - -#[async_trait] -pub trait Provider { - async fn connect_provider(rpc_client: T, signer: PolkaBtcSigner) -> Result - where - Self: Sized, - T: Into + Send; -} - -#[async_trait] -pub trait Service { - async fn initialize(config: &C) -> Result<(), Error>; - fn new_service(provider: P, config: C, shutdown: ShutdownSender) -> Self; - async fn start(&self) -> Result<(), Error>; -} - -pub(crate) async fn new_websocket_client( +pub(crate) fn new_websocket_config( url: &str, max_concurrent_requests: Option, max_notifs_per_subscription: Option, -) -> Result { +) -> Result { let parsed_url = url::Url::parse(&url)?; - let path = parsed_url.path(); - let config = WsConfig { + let path = parsed_url.path().to_string(); + Ok(WsConfig { url, max_request_body_size: 10 * 1024 * 1024, request_timeout: None, @@ -43,27 +22,28 @@ pub(crate) async fn new_websocket_client( handshake_url: path.into(), max_concurrent_requests: max_concurrent_requests.unwrap_or(1024), max_notifs_per_subscription: max_notifs_per_subscription.unwrap_or(256), - }; + }) +} + +pub(crate) async fn new_websocket_client(config: WsConfig<'_>) -> Result { Ok(WsClient::new(config).await?) } pub(crate) async fn new_websocket_client_with_retry( - url: &str, - max_concurrent_requests: Option, - max_notifs_per_subscription: Option, + config: WsConfig<'_>, connection_timeout: Duration, ) -> Result { - info!("Connecting to the btc-parachain..."); + log::info!("Connecting to the btc-parachain..."); timeout(connection_timeout, async move { loop { - match new_websocket_client(url, max_concurrent_requests, max_notifs_per_subscription).await { + match new_websocket_client(config.clone()).await { Err(Error::JsonRpseeError(JsonRpseeError::TransportError(err))) => { - trace!("could not connect to parachain: {}", err); + log::trace!("could not connect to parachain: {}", err); delay_for(RETRY_TIMEOUT).await; continue; } Ok(rpc) => { - info!("Connected!"); + log::info!("Connected!"); return Ok(rpc); } Err(err) => return Err(err), @@ -72,114 +52,3 @@ pub(crate) async fn new_websocket_client_with_retry( }) .await? } - -#[derive(Clone, Debug)] -pub enum RestartPolicy { - Never, - Always, -} - -impl FromStr for RestartPolicy { - type Err = String; - fn from_str(code: &str) -> Result { - match code { - "never" => Ok(RestartPolicy::Never), - "always" => Ok(RestartPolicy::Always), - _ => Err("Could not parse input as RestartPolicy".to_string()), - } - } -} - -/// Connection settings for the service -pub struct ManagerConfig { - /// Fail to connect to server if elapsed - pub connection_timeout: Duration, - /// Whether to restart the client - pub restart_policy: RestartPolicy, - /// Maximum number of concurrent requests - pub max_concurrent_requests: Option, - /// Maximum notification capacity for each subscription - pub max_notifs_per_subscription: Option, -} - -pub struct Manager> { - url: String, - signer: PolkaBtcSigner, - service_config: C, - manager_config: ManagerConfig, - _marker: PhantomData<(P, S)>, -} - -impl> Manager { - pub fn new(url: String, signer: PolkaBtcSigner, service_config: C, manager_config: ManagerConfig) -> Self { - Self { - url, - signer, - service_config, - manager_config, - _marker: PhantomData::default(), - } - } - - pub async fn start(&self) -> Result<(), Error> { - loop { - let config = self.service_config.clone(); - let (shutdown_tx, _) = tokio::sync::broadcast::channel(16); - - S::initialize(&config).await?; - - // only open connection to parachain after bitcoind sync to prevent timeout - let ws_client = new_websocket_client_with_retry( - &self.url, - self.manager_config.max_concurrent_requests, - self.manager_config.max_notifs_per_subscription, - self.manager_config.connection_timeout, - ) - .await?; - let ws_client = Arc::new(ws_client); - let signer = self.signer.clone(); - - let provider = P::connect_provider(ws_client, signer).await?; - let service = S::new_service(provider, config, shutdown_tx); - service.start().await?; - - info!("Disconnected"); - match self.manager_config.restart_policy { - RestartPolicy::Never => return Err(Error::ClientShutdown), - RestartPolicy::Always => continue, - }; - } - } -} - -pub async fn wait_or_shutdown(shutdown_tx: ShutdownSender, future2: F) -where - F: Future>, -{ - let mut shutdown_rx = shutdown_tx.subscribe(); - - let future1 = shutdown_rx.recv().fuse(); - let future2 = future2.fuse(); - - futures::pin_mut!(future1); - futures::pin_mut!(future2); - - match futures::future::select(future1, future2).await { - Either::Left((_, _)) => { - trace!("Received shutdown signal"); - } - Either::Right((_, _)) => { - trace!("Sending shutdown signal"); - // TODO: shutdown signal should be error - let _ = shutdown_tx.send(Some(())); - } - }; -} - -pub async fn on_shutdown(shutdown_tx: ShutdownSender, future2: impl Future) { - let mut shutdown_rx = shutdown_tx.subscribe(); - let future1 = shutdown_rx.recv().fuse(); - - let _ = future1.await; - future2.await; -} diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index ce42c760b..1e68f8b07 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -12,10 +12,6 @@ mod tests; #[cfg(feature = "testing-utils")] pub mod integration; -pub use conn::{ - on_shutdown, wait_or_shutdown, Manager as ConnectionManager, ManagerConfig as ConnectionManagerConfig, Provider, - RestartPolicy, Service, ShutdownSender, -}; pub use error::{Error, SubxtError}; pub use pallets::*; pub use rpc::{ diff --git a/runtime/src/rpc.rs b/runtime/src/rpc.rs index 17cfcf3aa..0c978acac 100644 --- a/runtime/src/rpc.rs +++ b/runtime/src/rpc.rs @@ -52,7 +52,8 @@ impl PolkaBtcProvider { } pub async fn from_url(url: &str, signer: PolkaBtcSigner) -> Result { - let ws_client = new_websocket_client(url, None, None).await?; + let ws_config = new_websocket_config(url, None, None)?; + let ws_client = new_websocket_client(ws_config).await?; Self::new(ws_client, signer).await } @@ -61,7 +62,18 @@ impl PolkaBtcProvider { signer: PolkaBtcSigner, connection_timeout: Duration, ) -> Result { - let ws_client = new_websocket_client_with_retry(url, None, None, connection_timeout).await?; + Self::from_url_and_config_with_retry(url, signer, None, None, connection_timeout).await + } + + pub async fn from_url_and_config_with_retry( + url: &str, + signer: PolkaBtcSigner, + max_concurrent_requests: Option, + max_notifs_per_subscription: Option, + connection_timeout: Duration, + ) -> Result { + let ws_config = new_websocket_config(url, max_concurrent_requests, max_notifs_per_subscription)?; + let ws_client = new_websocket_client_with_retry(ws_config, connection_timeout).await?; Self::new(ws_client, signer).await } @@ -236,17 +248,6 @@ impl PolkaBtcProvider { } } -#[async_trait] -impl Provider for PolkaBtcProvider { - async fn connect_provider(rpc_client: T, signer: PolkaBtcSigner) -> Result - where - Self: Sized, - T: Into + Send, - { - Self::new(rpc_client, signer).await - } -} - #[async_trait] pub trait UtilFuncs { /// Gets the current height of the parachain diff --git a/service/Cargo.toml b/service/Cargo.toml new file mode 100644 index 000000000..47c6ad832 --- /dev/null +++ b/service/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "service" +version = "0.6.0" +authors = ["Interlay "] +edition = "2018" + +[dependencies] +async-trait = "0.1.40" +futures = "0.3.5" +clap = "3.0.0-beta.2" +thiserror = "1.0" + +tokio = { version = "0.2.22", features = ["full"] } +hyper = { version = "0.13" } +hyper-tls = "0.4.3" + +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" + +tracing = { version = "0.1", features = ["log"] } +tracing-subscriber = { version = "0.2.12", features = ["registry", "env-filter", "fmt"] } +tracing-futures = { version = "0.2.5" } + +polkabtc-telemetry = { package = "polkabtc-telemetry-types", git = "https://github.com/interlay/polkabtc-telemetry", rev = "fb5fe36" } + +# Workspace dependencies +bitcoin = { path = "../bitcoin", features = ["cli"] } +runtime = { path = "../runtime" } + +# Substrate dependencies +sp-core = { git = "https://github.com/paritytech/substrate", branch = "rococo-v1" } diff --git a/service/src/cli.rs b/service/src/cli.rs new file mode 100644 index 000000000..84a062d92 --- /dev/null +++ b/service/src/cli.rs @@ -0,0 +1,66 @@ +use clap::Clap; +use std::str::FromStr; + +#[derive(Clone, Debug)] +pub enum RestartPolicy { + Never, + Always, +} + +impl FromStr for RestartPolicy { + type Err = String; + fn from_str(code: &str) -> Result { + match code { + "never" => Ok(RestartPolicy::Never), + "always" => Ok(RestartPolicy::Always), + _ => Err("Could not parse input as RestartPolicy".to_string()), + } + } +} + +#[derive(Clone, Debug)] +pub enum LoggingFormat { + Full, + Json, +} + +impl Default for LoggingFormat { + fn default() -> Self { + LoggingFormat::Full + } +} + +impl FromStr for LoggingFormat { + type Err = String; + fn from_str(code: &str) -> Result { + match code { + "full" => Ok(LoggingFormat::Full), + "json" => Ok(LoggingFormat::Json), + _ => Err("Could not parse input as LoggingFormat".to_string()), + } + } +} + +impl LoggingFormat { + pub fn init_subscriber(&self) { + match *self { + Self::Full => crate::trace::init_subscriber(), + Self::Json => crate::trace::init_json_subscriber(), + } + } +} + +#[derive(Clap, Debug, Clone)] +pub struct ServiceConfig { + /// Restart or stop on error. + #[clap(long, default_value = "always")] + pub restart_policy: RestartPolicy, + + /// Logging output format. + #[clap(long, default_value = "full")] + pub logging_format: LoggingFormat, + + /// Telemetry endpoint. + #[clap(long)] + pub telemetry_url: Option, +} diff --git a/service/src/error.rs b/service/src/error.rs new file mode 100644 index 000000000..12ad4ee2c --- /dev/null +++ b/service/src/error.rs @@ -0,0 +1,16 @@ +use hyper::{http::Error as HyperHttpError, Error as HyperError}; +use serde_json::Error as SerdeJsonError; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Received an invalid response")] + InvalidResponse, + + #[error("SerdeJsonError: {0}")] + SerdeJsonError(#[from] SerdeJsonError), + #[error("HyperError: {0}")] + HyperError(#[from] HyperError), + #[error("HyperHttpError: {0}")] + HyperHttpError(#[from] HyperHttpError), +} diff --git a/service/src/lib.rs b/service/src/lib.rs new file mode 100644 index 000000000..d9985331e --- /dev/null +++ b/service/src/lib.rs @@ -0,0 +1,150 @@ +use async_trait::async_trait; +use bitcoin::BitcoinCore; +use futures::{future::Either, Future, FutureExt}; +use runtime::{ + cli::ConnectionOpts as ParachainConfig, Error as RuntimeError, PolkaBtcProvider as BtcParachain, PolkaBtcSigner, +}; +use std::marker::PhantomData; + +mod cli; +mod error; +mod telemetry; +mod trace; + +use telemetry::TelemetryClient; + +pub use cli::{LoggingFormat, RestartPolicy, ServiceConfig}; +pub use error::Error; +pub use trace::init_subscriber; + +pub type ShutdownSender = tokio::sync::broadcast::Sender>; + +#[async_trait] +pub trait Service { + const NAME: &'static str; + const VERSION: &'static str; + + async fn initialize(_: &Bitcoin) -> Result<(), RuntimeError> { + Ok(()) + } + fn new_service(btc_parachain: BtcParachain, bitcoin: Bitcoin, config: Config, shutdown: ShutdownSender) -> Self; + async fn start(&self) -> Result<(), RuntimeError>; +} + +pub struct ConnectionManager> { + signer: PolkaBtcSigner, + bitcoin: Bitcoin, + parachain_config: ParachainConfig, + service_config: ServiceConfig, + config: Config, + _marker: PhantomData, +} + +impl> ConnectionManager { + pub fn new( + signer: PolkaBtcSigner, + bitcoin: BitcoinCore, + parachain_config: ParachainConfig, + service_config: ServiceConfig, + config: Config, + ) -> Self { + Self { + signer, + bitcoin, + parachain_config, + service_config, + config, + _marker: PhantomData::default(), + } + } +} + +impl> ConnectionManager<(), Config, S> { + pub fn new( + signer: PolkaBtcSigner, + parachain_config: ParachainConfig, + service_config: ServiceConfig, + config: Config, + ) -> Self { + Self { + signer, + bitcoin: (), + parachain_config, + service_config, + config, + _marker: PhantomData::default(), + } + } +} + +impl> + ConnectionManager +{ + pub async fn start(&self) -> Result<(), RuntimeError> { + if let Some(uri) = &self.service_config.telemetry_url { + // run telemetry client heartbeat + let telemetry_client = TelemetryClient::new(uri.clone(), self.signer.clone()); + tokio::spawn(async move { telemetry::do_update(&telemetry_client, S::NAME, S::VERSION).await }); + } + + loop { + let config = self.config.clone(); + let (shutdown_tx, _) = tokio::sync::broadcast::channel(16); + + let bitcoin_core = &self.bitcoin; + S::initialize(bitcoin_core).await?; + + // only open connection to parachain after bitcoind sync to prevent timeout + let signer = self.signer.clone(); + let btc_parachain = BtcParachain::from_url_and_config_with_retry( + &self.parachain_config.polka_btc_url, + signer, + self.parachain_config.max_concurrent_requests, + self.parachain_config.max_notifs_per_subscription, + self.parachain_config.polka_btc_connection_timeout_ms, + ) + .await?; + + let service = S::new_service(btc_parachain, bitcoin_core.clone(), config, shutdown_tx); + service.start().await?; + + tracing::info!("Disconnected"); + match self.service_config.restart_policy { + RestartPolicy::Never => return Err(RuntimeError::ClientShutdown), + RestartPolicy::Always => continue, + }; + } + } +} + +pub async fn wait_or_shutdown(shutdown_tx: ShutdownSender, future2: F) +where + F: Future>, +{ + let mut shutdown_rx = shutdown_tx.subscribe(); + + let future1 = shutdown_rx.recv().fuse(); + let future2 = future2.fuse(); + + futures::pin_mut!(future1); + futures::pin_mut!(future2); + + match futures::future::select(future1, future2).await { + Either::Left((_, _)) => { + tracing::trace!("Received shutdown signal"); + } + Either::Right((_, _)) => { + tracing::trace!("Sending shutdown signal"); + // TODO: shutdown signal should be error + let _ = shutdown_tx.send(Some(())); + } + }; +} + +pub async fn on_shutdown(shutdown_tx: ShutdownSender, future2: impl Future) { + let mut shutdown_rx = shutdown_tx.subscribe(); + let future1 = shutdown_rx.recv().fuse(); + + let _ = future1.await; + future2.await; +} diff --git a/service/src/telemetry.rs b/service/src/telemetry.rs new file mode 100644 index 000000000..9a11200ee --- /dev/null +++ b/service/src/telemetry.rs @@ -0,0 +1,64 @@ +use std::time::Duration; + +use crate::Error; +use hyper::{client::HttpConnector, Body, Client, Method, Request, StatusCode}; +use hyper_tls::HttpsConnector; +use polkabtc_telemetry::{ClientInfo, Message, Payload}; +use runtime::PolkaBtcSigner; +use sp_core::sr25519::Pair; +use tokio::time; + +const TELEMETRY_PERIOD: Duration = Duration::from_secs(3600); + +/// Wrapper over a HTTPS enabled Hyper client, with the +/// ability to sign outgoing messages. +pub(crate) struct TelemetryClient { + uri: String, + client: Client>, + pair: Pair, +} + +impl TelemetryClient { + pub(crate) fn new(uri: String, signer: PolkaBtcSigner) -> Self { + let https = HttpsConnector::new(); + let client = Client::builder().build::<_, Body>(https); + + let pair = signer.signer().clone(); + + Self { uri, client, pair } + } + + pub(crate) async fn update(&self, name: &str, version: &str) -> Result<(), Error> { + let payload = Payload::UpdateClient(ClientInfo { + name: name.to_string(), + version: version.to_string(), + }); + let message = Message::from_payload_and_signer(payload, &self.pair); + + let request = Request::builder() + .method(Method::POST) + .uri(&self.uri) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_string(&message)?))?; + let response = self.client.request(request).await?; + + if response.status() != StatusCode::OK { + Err(Error::InvalidResponse) + } else { + Ok(()) + } + } +} + +/// Run recurring update to telemetry service. +pub(crate) async fn do_update(telemetry_client: &TelemetryClient, name: &str, version: &str) { + let mut interval = time::interval(TELEMETRY_PERIOD); + + loop { + interval.tick().await; + tracing::info!("Updating telemetry"); + if let Err(err) = telemetry_client.update(name, version).await { + tracing::error!("Failed to update telemetry: {}", err); + } + } +} diff --git a/service/src/trace.rs b/service/src/trace.rs new file mode 100644 index 000000000..3c649b85d --- /dev/null +++ b/service/src/trace.rs @@ -0,0 +1,25 @@ +use tracing_subscriber::{fmt, layer::SubscriberExt, prelude::*, EnvFilter}; + +fn init_filter() -> EnvFilter { + EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new("info")) + .unwrap() +} + +pub fn init_json_subscriber() { + let fmt_layer = fmt::layer().json(); + + let _ = tracing_subscriber::registry() + .with(init_filter()) + .with(fmt_layer) + .try_init(); +} + +pub fn init_subscriber() { + let fmt_layer = fmt::layer(); + + let _ = tracing_subscriber::registry() + .with(init_filter()) + .with(fmt_layer) + .try_init(); +} diff --git a/staked-relayer/Cargo.toml b/staked-relayer/Cargo.toml index eadfe416d..d8eb370a6 100644 --- a/staked-relayer/Cargo.toml +++ b/staked-relayer/Cargo.toml @@ -26,6 +26,7 @@ jsonrpc-core-client = { version = "17.0.0", features = ["http", "tls"] } # Workspace dependencies bitcoin = { path = "../bitcoin", features = ["cli"] } runtime = { path = "../runtime" } +service = { path = "../service" } # Substrate dependencies sp-core = { git = "https://github.com/paritytech/substrate", branch = "rococo-v1" } diff --git a/staked-relayer/src/lib.rs b/staked-relayer/src/lib.rs index a6b1090c9..c68666720 100644 --- a/staked-relayer/src/lib.rs +++ b/staked-relayer/src/lib.rs @@ -19,17 +19,3 @@ pub mod service { vault::{listen_for_vaults_registered, listen_for_wallet_updates, report_vault_thefts}, }; } - -use tracing_subscriber::{fmt, prelude::*, EnvFilter}; - -pub fn init_subscriber() { - let fmt_layer = fmt::layer(); - let filter_layer = EnvFilter::try_from_default_env() - .or_else(|_| EnvFilter::try_new("info")) - .unwrap(); - - let _ = tracing_subscriber::registry() - .with(filter_layer) - .with(fmt_layer) - .try_init(); -} diff --git a/staked-relayer/src/main.rs b/staked-relayer/src/main.rs index 0e105d8e9..c99041599 100644 --- a/staked-relayer/src/main.rs +++ b/staked-relayer/src/main.rs @@ -1,14 +1,8 @@ -use staked_relayer::{system::*, Error}; - use clap::Clap; -use git_version::git_version; -use runtime::{substrate_subxt::PairSigner, ConnectionManager, PolkaBtcRuntime}; -use std::time::Duration; +use runtime::{substrate_subxt::PairSigner, PolkaBtcRuntime}; +use service::{ConnectionManager, ServiceConfig}; -const VERSION: &str = git_version!(args = ["--tags"]); -const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); -const NAME: &str = env!("CARGO_PKG_NAME"); -const ABOUT: &str = env!("CARGO_PKG_DESCRIPTION"); +use staked_relayer::{system::*, Error}; #[derive(Clap)] #[clap(name = NAME, version = VERSION, author = AUTHORS, about = ABOUT)] @@ -25,50 +19,18 @@ struct Opts { #[clap(flatten)] bitcoin: bitcoin::cli::BitcoinOpts, - /// Starting height for vault theft checks, if not defined - /// automatically start from the chain tip. - #[clap(long)] - bitcoin_theft_start_height: Option, - - /// Timeout in milliseconds to poll Bitcoin. - #[clap(long, default_value = "6000")] - bitcoin_poll_timeout_ms: u64, - - /// Starting height to relay block headers, if not defined - /// use the best height as reported by the relay module. - #[clap(long)] - bitcoin_relay_start_height: Option, - - /// Max batch size for combined block header submission. - #[clap(long, default_value = "16")] - max_batch_size: u32, - - /// Default deposit for all automated status proposals. - #[clap(long, default_value = "100")] - status_update_deposit: u128, - - /// Comma separated list of allowed origins. - #[clap(long, default_value = "*")] - rpc_cors_domain: String, - - /// Automatically register the relayer with the given stake (in Planck). - #[clap(long)] - auto_register_with_stake: Option, - - /// Automatically register the staked relayer with collateral received from the faucet and a newly generated - /// address. The parameter is the URL of the faucet - #[clap(long, conflicts_with("auto-register-with-stake"))] - auto_register_with_faucet_url: Option, + /// Settings specific to the relayer client. + #[clap(flatten)] + relayer: RelayerServiceConfig, - /// Number of confirmations a block needs to have before it is submitted. - #[clap(long, default_value = "0")] - required_btc_confirmations: u32, + /// General service settings. + #[clap(flatten)] + service: ServiceConfig, } async fn start() -> Result<(), Error> { - staked_relayer::init_subscriber(); - let opts: Opts = Opts::parse(); + opts.service.logging_format.init_subscriber(); let (key_pair, _) = opts.account_info.get_key_pair()?; let signer = PairSigner::::new(key_pair); @@ -76,21 +38,11 @@ async fn start() -> Result<(), Error> { let bitcoin_core = opts.bitcoin.new_client(None)?; ConnectionManager::<_, _, RelayerService>::new( - opts.parachain.polka_btc_url.clone(), signer.clone(), - RelayerServiceConfig { - bitcoin_core, - auto_register_with_stake: opts.auto_register_with_stake, - auto_register_with_faucet_url: opts.auto_register_with_faucet_url, - bitcoin_theft_start_height: opts.bitcoin_theft_start_height, - bitcoin_relay_start_height: opts.bitcoin_relay_start_height, - max_batch_size: opts.max_batch_size, - bitcoin_timeout: Duration::from_millis(opts.bitcoin_poll_timeout_ms), - required_btc_confirmations: opts.required_btc_confirmations, - status_update_deposit: opts.status_update_deposit, - rpc_cors_domain: opts.rpc_cors_domain, - }, - opts.parachain.into(), + bitcoin_core, + opts.parachain, + opts.service, + opts.relayer, ) .start() .await?; diff --git a/staked-relayer/src/system.rs b/staked-relayer/src/system.rs index 1696972cb..368c5ae3c 100644 --- a/staked-relayer/src/system.rs +++ b/staked-relayer/src/system.rs @@ -1,44 +1,88 @@ use crate::{relay::*, service::*, utils::*, Error, Vaults}; use async_trait::async_trait; use bitcoin::{BitcoinCore, BitcoinCoreApi}; +use clap::Clap; use futures::executor::block_on; +use git_version::git_version; use runtime::{ - pallets::sla::UpdateRelayerSLAEvent, wait_or_shutdown, Error as RuntimeError, PolkaBtcProvider, PolkaBtcRuntime, - Service, ShutdownSender, StakedRelayerPallet, UtilFuncs, VaultRegistryPallet, + cli::parse_duration_ms, pallets::sla::UpdateRelayerSLAEvent, Error as RuntimeError, PolkaBtcProvider, + PolkaBtcRuntime, StakedRelayerPallet, UtilFuncs, VaultRegistryPallet, }; +use service::{wait_or_shutdown, Service, ShutdownSender}; use std::{sync::Arc, time::Duration}; -#[derive(Clone)] +pub const VERSION: &str = git_version!(args = ["--tags"]); +pub const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); +pub const NAME: &str = env!("CARGO_PKG_NAME"); +pub const ABOUT: &str = env!("CARGO_PKG_DESCRIPTION"); + +#[derive(Clone, Clap)] pub struct RelayerServiceConfig { - /// the bitcoin RPC handle - pub bitcoin_core: BitcoinCore, - pub auto_register_with_stake: Option, - pub auto_register_with_faucet_url: Option, + /// Starting height for vault theft checks, if not defined + /// automatically start from the chain tip. + #[clap(long)] pub bitcoin_theft_start_height: Option, + + /// Timeout in milliseconds to poll Bitcoin. + #[clap(long, parse(try_from_str = parse_duration_ms), default_value = "6000")] + pub bitcoin_poll_timeout_ms: Duration, + + /// Starting height to relay block headers, if not defined + /// use the best height as reported by the relay module. + #[clap(long)] pub bitcoin_relay_start_height: Option, + + /// Max batch size for combined block header submission. + #[clap(long, default_value = "16")] pub max_batch_size: u32, - pub bitcoin_timeout: Duration, - pub required_btc_confirmations: u32, + + /// Default deposit for all automated status proposals. + #[clap(long, default_value = "100")] pub status_update_deposit: u128, + + /// Comma separated list of allowed origins. + #[clap(long, default_value = "*")] pub rpc_cors_domain: String, + + /// Automatically register the relayer with the given stake (in Planck). + #[clap(long)] + pub auto_register_with_stake: Option, + + /// Automatically register the staked relayer with collateral received from the faucet and a newly generated + /// address. The parameter is the URL of the faucet + #[clap(long, conflicts_with("auto-register-with-stake"))] + pub auto_register_with_faucet_url: Option, + + /// Number of confirmations a block needs to have before it is submitted. + #[clap(long, default_value = "0")] + pub required_btc_confirmations: u32, } pub struct RelayerService { btc_parachain: PolkaBtcProvider, + bitcoin_core: BitcoinCore, config: RelayerServiceConfig, shutdown: ShutdownSender, } #[async_trait] -impl Service for RelayerService { - async fn initialize(config: &RelayerServiceConfig) -> Result<(), RuntimeError> { - Self::connect_bitcoin(&config.bitcoin_core) +impl Service for RelayerService { + const NAME: &'static str = NAME; + const VERSION: &'static str = VERSION; + + async fn initialize(bitcoin_core: &BitcoinCore) -> Result<(), RuntimeError> { + Self::connect_bitcoin(bitcoin_core) .await .map_err(|err| RuntimeError::Other(err.to_string())) } - fn new_service(btc_parachain: PolkaBtcProvider, config: RelayerServiceConfig, shutdown: ShutdownSender) -> Self { - RelayerService::new(btc_parachain, config, shutdown) + fn new_service( + btc_parachain: PolkaBtcProvider, + bitcoin_core: BitcoinCore, + config: RelayerServiceConfig, + shutdown: ShutdownSender, + ) -> Self { + RelayerService::new(btc_parachain, bitcoin_core, config, shutdown) } async fn start(&self) -> Result<(), RuntimeError> { @@ -57,16 +101,22 @@ impl RelayerService { Ok(()) } - fn new(btc_parachain: PolkaBtcProvider, config: RelayerServiceConfig, shutdown: ShutdownSender) -> Self { + fn new( + btc_parachain: PolkaBtcProvider, + bitcoin_core: BitcoinCore, + config: RelayerServiceConfig, + shutdown: ShutdownSender, + ) -> Self { Self { btc_parachain, + bitcoin_core, config, shutdown, } } async fn run_service(&self) -> Result<(), Error> { - let bitcoin_core = self.config.bitcoin_core.clone(); + let bitcoin_core = self.bitcoin_core.clone(); if let Some(stake) = self.config.auto_register_with_stake { if !is_registered(&self.btc_parachain).await? { @@ -116,7 +166,7 @@ impl RelayerService { self.btc_parachain.clone(), bitcoin_theft_start_height, vaults.clone(), - self.config.bitcoin_timeout, + self.config.bitcoin_poll_timeout_ms, ), ); @@ -168,7 +218,7 @@ impl RelayerService { Config { start_height: self.config.bitcoin_relay_start_height, max_batch_size: self.config.max_batch_size, - timeout: Some(self.config.bitcoin_timeout), + timeout: Some(self.config.bitcoin_poll_timeout_ms), required_btc_confirmations: self.config.required_btc_confirmations, }, ), diff --git a/staked-relayer/tests/integration_tests.rs b/staked-relayer/tests/integration_tests.rs index 621572f09..fda9f295a 100644 --- a/staked-relayer/tests/integration_tests.rs +++ b/staked-relayer/tests/integration_tests.rs @@ -27,7 +27,7 @@ const TIMEOUT: Duration = Duration::from_secs(45); #[tokio::test(threaded_scheduler)] async fn test_report_vault_theft_succeeds() { - staked_relayer::init_subscriber(); + service::init_subscriber(); let (client, _tmp_dir) = default_provider_client(AccountKeyring::Alice).await; @@ -101,7 +101,7 @@ async fn test_report_vault_theft_succeeds() { #[tokio::test(threaded_scheduler)] async fn test_register_deregister_succeeds() { - staked_relayer::init_subscriber(); + service::init_subscriber(); let (client, _tmp_dir) = default_provider_client(AccountKeyring::Alice).await; @@ -124,7 +124,7 @@ async fn test_register_deregister_succeeds() { #[ignore] #[tokio::test(threaded_scheduler)] async fn test_vote_status_no_data_succeeds() { - staked_relayer::init_subscriber(); + service::init_subscriber(); let (ref client, _tmp_dir) = default_provider_client(AccountKeyring::Alice).await; let root_provider = setup_provider(client.clone(), AccountKeyring::Alice).await; diff --git a/vault/Cargo.toml b/vault/Cargo.toml index 5d708e8b9..3848fb93f 100644 --- a/vault/Cargo.toml +++ b/vault/Cargo.toml @@ -32,6 +32,7 @@ jsonrpc-core-client = { version = "17.0.0", features = ["http", "tls"] } # Workspace dependencies bitcoin = { path = "../bitcoin", features = ["cli"] } runtime = { path = "../runtime" } +service = { path = "../service" } # Substrate dependencies sp-core = { git = "https://github.com/paritytech/substrate", branch = "rococo-v1" } diff --git a/vault/src/lib.rs b/vault/src/lib.rs index 793ab23a6..b64d6bc12 100644 --- a/vault/src/lib.rs +++ b/vault/src/lib.rs @@ -15,7 +15,6 @@ mod types; use runtime::{PolkaBtcProvider, VaultRegistryPallet}; use std::time::Duration; -use tracing_subscriber::{fmt, prelude::*, EnvFilter}; pub mod service { pub use crate::{ @@ -31,12 +30,7 @@ pub mod service { }, }; } -pub use crate::{ - cancellation::RequestEvent, - error::Error, - system::{VaultService, VaultServiceConfig}, - types::IssueRequests, -}; +pub use crate::{cancellation::RequestEvent, error::Error, system::*, types::IssueRequests}; pub(crate) async fn lock_additional_collateral(api: &PolkaBtcProvider, amount: u128) -> Result<(), Error> { let result = api.lock_additional_collateral(amount).await; @@ -50,15 +44,3 @@ pub const BITCOIN_MAX_RETRYING_TIME: Duration = Duration::from_secs(24 * 60 * 60 /// At startup we wait until a new block has arrived before we start event listeners. /// This constant defines the rate at which we check whether the chain height has increased. pub const CHAIN_HEIGHT_POLLING_INTERVAL: Duration = Duration::from_millis(500); - -pub fn init_subscriber() { - let fmt_layer = fmt::layer(); - let filter_layer = EnvFilter::try_from_default_env() - .or_else(|_| EnvFilter::try_new("info")) - .unwrap(); - - let _ = tracing_subscriber::registry() - .with(filter_layer) - .with(fmt_layer) - .try_init(); -} diff --git a/vault/src/main.rs b/vault/src/main.rs index 6335b7d98..2a6d42642 100644 --- a/vault/src/main.rs +++ b/vault/src/main.rs @@ -1,14 +1,8 @@ use clap::Clap; -use git_version::git_version; -use runtime::{substrate_subxt::PairSigner, ConnectionManager, PolkaBtcRuntime}; -use std::time::Duration; +use runtime::{substrate_subxt::PairSigner, PolkaBtcRuntime}; +use service::{ConnectionManager, ServiceConfig}; -use vault::{Error, VaultService, VaultServiceConfig}; - -const VERSION: &str = git_version!(args = ["--tags"]); -const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); -const NAME: &str = env!("CARGO_PKG_NAME"); -const ABOUT: &str = env!("CARGO_PKG_DESCRIPTION"); +use vault::{Error, VaultService, VaultServiceConfig, ABOUT, AUTHORS, NAME, VERSION}; #[derive(Clap, Debug, Clone)] #[clap(name = NAME, version = VERSION, author = AUTHORS, about = ABOUT)] @@ -25,56 +19,18 @@ pub struct Opts { #[clap(flatten)] pub bitcoin: bitcoin::cli::BitcoinOpts, - /// Comma separated list of allowed origins. - #[clap(long, default_value = "*")] - pub rpc_cors_domain: String, - - /// Automatically register the vault with the given amount of collateral and a newly generated address. - #[clap(long)] - pub auto_register_with_collateral: Option, - - /// Automatically register the vault with the collateral received from the faucet and a newly generated address. - /// The parameter is the URL of the faucet - #[clap(long, conflicts_with("auto-register-with-collateral"))] - pub auto_register_with_faucet_url: Option, - - /// Opt out of auctioning under-collateralized vaults. - #[clap(long)] - pub no_auto_auction: bool, - - /// Opt out of participation in replace requests. - #[clap(long)] - pub no_auto_replace: bool, - - /// Don't check the collateralization rate at startup. - #[clap(long)] - pub no_startup_collateral_increase: bool, - - /// Don't try to execute issues. - #[clap(long)] - pub no_issue_execution: bool, - - /// Don't run the RPC API. - #[clap(long)] - pub no_api: bool, - - /// Maximum total collateral to keep the vault securely collateralized. - #[clap(long, default_value = "1000000")] - pub max_collateral: u128, - - /// Timeout in milliseconds to repeat collateralization checks. - #[clap(long, default_value = "5000")] - pub collateral_timeout_ms: u64, + /// Settings specific to the vault client. + #[clap(flatten)] + pub vault: VaultServiceConfig, - /// How many bitcoin confirmations to wait for. If not specified, the - /// parachain settings will be used (recommended). - #[clap(long)] - pub btc_confirmations: Option, + /// General service settings. + #[clap(flatten)] + pub service: ServiceConfig, } async fn start() -> Result<(), Error> { - vault::init_subscriber(); let opts: Opts = Opts::parse(); + opts.service.logging_format.init_subscriber(); tracing::info!("Command line arguments: {:?}", opts.clone()); @@ -84,22 +40,11 @@ async fn start() -> Result<(), Error> { let bitcoin_core = opts.bitcoin.new_client(Some(wallet_name.to_string()))?; ConnectionManager::<_, _, VaultService>::new( - opts.parachain.polka_btc_url.clone(), signer.clone(), - VaultServiceConfig { - bitcoin_core, - auto_register_with_collateral: opts.auto_register_with_collateral, - auto_register_with_faucet_url: opts.auto_register_with_faucet_url, - no_startup_collateral_increase: opts.no_startup_collateral_increase, - btc_confirmations: opts.btc_confirmations, - max_collateral: opts.max_collateral, - no_auto_replace: opts.no_auto_replace, - no_auto_auction: opts.no_auto_auction, - no_issue_execution: opts.no_issue_execution, - collateral_timeout: Duration::from_millis(opts.collateral_timeout_ms), - rpc_cors_domain: opts.rpc_cors_domain, - }, - opts.parachain.into(), + bitcoin_core, + opts.parachain, + opts.service, + opts.vault, ) .start() .await?; diff --git a/vault/src/system.rs b/vault/src/system.rs index f9f3debde..6bcf4977e 100644 --- a/vault/src/system.rs +++ b/vault/src/system.rs @@ -4,46 +4,96 @@ use crate::{ }; use async_trait::async_trait; use bitcoin::{BitcoinCore, BitcoinCoreApi}; +use clap::Clap; use futures::{channel::mpsc, SinkExt}; +use git_version::git_version; use runtime::{ - pallets::sla::UpdateVaultSLAEvent, wait_or_shutdown, AccountId, BtcRelayPallet, Error as RuntimeError, - PolkaBtcHeader, PolkaBtcProvider, PolkaBtcRuntime, Service, ShutdownSender, UtilFuncs, VaultRegistryPallet, + cli::parse_duration_ms, pallets::sla::UpdateVaultSLAEvent, AccountId, BtcRelayPallet, Error as RuntimeError, + PolkaBtcHeader, PolkaBtcProvider, PolkaBtcRuntime, UtilFuncs, VaultRegistryPallet, }; +use service::{wait_or_shutdown, Service, ShutdownSender}; use std::{sync::Arc, time::Duration}; use tokio::time::delay_for; -#[derive(Clone)] +pub const VERSION: &str = git_version!(args = ["--tags"]); +pub const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); +pub const NAME: &str = env!("CARGO_PKG_NAME"); +pub const ABOUT: &str = env!("CARGO_PKG_DESCRIPTION"); + +#[derive(Clap, Clone, Debug)] pub struct VaultServiceConfig { - /// the bitcoin RPC handle - pub bitcoin_core: BitcoinCore, + /// Comma separated list of allowed origins. + #[clap(long, default_value = "*")] + pub rpc_cors_domain: String, + + /// Automatically register the vault with the given amount of collateral and a newly generated address. + #[clap(long)] pub auto_register_with_collateral: Option, + + /// Automatically register the vault with the collateral received from the faucet and a newly generated address. + /// The parameter is the URL of the faucet + #[clap(long, conflicts_with("auto-register-with-collateral"))] pub auto_register_with_faucet_url: Option, - pub rpc_cors_domain: String, - pub no_startup_collateral_increase: bool, - pub btc_confirmations: Option, - pub max_collateral: u128, - pub no_auto_replace: bool, + + /// Opt out of auctioning under-collateralized vaults. + #[clap(long)] pub no_auto_auction: bool, + + /// Opt out of participation in replace requests. + #[clap(long)] + pub no_auto_replace: bool, + + /// Don't check the collateralization rate at startup. + #[clap(long)] + pub no_startup_collateral_increase: bool, + + /// Don't try to execute issues. + #[clap(long)] pub no_issue_execution: bool, - pub collateral_timeout: Duration, + + /// Don't run the RPC API. + #[clap(long)] + pub no_api: bool, + + /// Maximum total collateral to keep the vault securely collateralized. + #[clap(long, default_value = "1000000")] + pub max_collateral: u128, + + /// Timeout in milliseconds to repeat collateralization checks. + #[clap(long, parse(try_from_str = parse_duration_ms), default_value = "5000")] + pub collateral_timeout_ms: Duration, + + /// How many bitcoin confirmations to wait for. If not specified, the + /// parachain settings will be used (recommended). + #[clap(long)] + pub btc_confirmations: Option, } pub struct VaultService { btc_parachain: PolkaBtcProvider, + bitcoin_core: BitcoinCore, config: VaultServiceConfig, shutdown: ShutdownSender, } #[async_trait] -impl Service for VaultService { - async fn initialize(config: &VaultServiceConfig) -> Result<(), RuntimeError> { - Self::connect_bitcoin(&config.bitcoin_core) +impl Service for VaultService { + const NAME: &'static str = NAME; + const VERSION: &'static str = VERSION; + + async fn initialize(bitcoin_core: &BitcoinCore) -> Result<(), RuntimeError> { + Self::connect_bitcoin(bitcoin_core) .await .map_err(|err| RuntimeError::Other(err.to_string())) } - fn new_service(btc_parachain: PolkaBtcProvider, config: VaultServiceConfig, shutdown: ShutdownSender) -> Self { - VaultService::new(btc_parachain, config, shutdown) + fn new_service( + btc_parachain: PolkaBtcProvider, + bitcoin_core: BitcoinCore, + config: VaultServiceConfig, + shutdown: ShutdownSender, + ) -> Self { + VaultService::new(btc_parachain, bitcoin_core, config, shutdown) } async fn start(&self) -> Result<(), RuntimeError> { @@ -69,16 +119,22 @@ impl VaultService { Ok(()) } - fn new(btc_parachain: PolkaBtcProvider, config: VaultServiceConfig, shutdown: ShutdownSender) -> Self { + fn new( + btc_parachain: PolkaBtcProvider, + bitcoin_core: BitcoinCore, + config: VaultServiceConfig, + shutdown: ShutdownSender, + ) -> Self { Self { btc_parachain, + bitcoin_core, config, shutdown, } } async fn run_service(&self) -> Result<(), Error> { - let bitcoin_core = self.config.bitcoin_core.clone(); + let bitcoin_core = self.bitcoin_core.clone(); let vault_id = self.btc_parachain.get_account_id().clone(); @@ -268,7 +324,7 @@ impl VaultService { self.btc_parachain.clone(), bitcoin_core.clone(), replace_event_tx.clone(), - self.config.collateral_timeout, + self.config.collateral_timeout_ms, ), ); diff --git a/vault/tests/integration_tests.rs b/vault/tests/integration_tests.rs index 1f550acc8..94fa082c9 100644 --- a/vault/tests/integration_tests.rs +++ b/vault/tests/integration_tests.rs @@ -21,7 +21,7 @@ const TIMEOUT: Duration = Duration::from_secs(60); #[tokio::test(threaded_scheduler)] async fn test_redeem_succeeds() { - vault::init_subscriber(); + service::init_subscriber(); let (client, _tmp_dir) = default_provider_client(AccountKeyring::Alice).await; @@ -61,7 +61,7 @@ async fn test_redeem_succeeds() { #[tokio::test(threaded_scheduler)] async fn test_replace_succeeds() { - vault::init_subscriber(); + service::init_subscriber(); let (client, _tmp_dir) = default_provider_client(AccountKeyring::Alice).await; @@ -127,7 +127,7 @@ async fn test_replace_succeeds() { #[tokio::test(threaded_scheduler)] async fn test_maintain_collateral_succeeds() { - vault::init_subscriber(); + service::init_subscriber(); let (client, _tmp_dir) = default_provider_client(AccountKeyring::Alice).await; @@ -173,7 +173,7 @@ async fn test_maintain_collateral_succeeds() { #[tokio::test(threaded_scheduler)] async fn test_withdraw_replace_succeeds() { - vault::init_subscriber(); + service::init_subscriber(); let (client, _tmp_dir) = default_provider_client(AccountKeyring::Alice).await; @@ -239,7 +239,7 @@ async fn test_cancellation_succeeds() { // tests cancellation of issue, redeem and replace. // issue and replace cancellation is tested through the vault's cancellation service. // cancel_redeem is called manually - vault::init_subscriber(); + service::init_subscriber(); let (client, _tmp_dir) = default_provider_client(AccountKeyring::Alice).await; let root_provider = setup_provider(client.clone(), AccountKeyring::Alice).await; @@ -389,7 +389,7 @@ async fn test_auction_replace_succeeds() { // register two vaults. Issue with old_vault at capacity. Change exchange rate such that new_vault // will auction_replace. - vault::init_subscriber(); + service::init_subscriber(); let (client, _tmp_dir) = default_provider_client(AccountKeyring::Alice).await; @@ -475,7 +475,7 @@ async fn test_auction_replace_succeeds() { #[tokio::test(threaded_scheduler)] async fn test_refund_succeeds() { - vault::init_subscriber(); + service::init_subscriber(); let (client, _tmp_dir) = default_provider_client(AccountKeyring::Alice).await; @@ -544,7 +544,7 @@ async fn test_refund_succeeds() { #[tokio::test(threaded_scheduler)] async fn test_issue_overpayment_succeeds() { - vault::init_subscriber(); + service::init_subscriber(); let (client, _tmp_dir) = default_provider_client(AccountKeyring::Alice).await; @@ -617,7 +617,7 @@ async fn test_issue_overpayment_succeeds() { #[tokio::test(threaded_scheduler)] async fn test_automatic_issue_execution_succeeds() { - vault::init_subscriber(); + service::init_subscriber(); let (client, _tmp_dir) = default_provider_client(AccountKeyring::Alice).await; @@ -682,7 +682,7 @@ async fn test_automatic_issue_execution_succeeds() { #[tokio::test(threaded_scheduler)] async fn test_execute_open_requests_succeeds() { - vault::init_subscriber(); + service::init_subscriber(); let (client, _tmp_dir) = default_provider_client(AccountKeyring::Alice).await;