Skip to content

Commit

Permalink
Merge pull request #506 from interlay/nakul/fix_merge_service_crate
Browse files Browse the repository at this point in the history
fix: merge service crate
  • Loading branch information
gregdhill authored Aug 3, 2023
2 parents 890cb73 + 8830613 commit cc8ed9c
Show file tree
Hide file tree
Showing 25 changed files with 298 additions and 316 deletions.
188 changes: 82 additions & 106 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ members = [
"vault",
"bitcoin",
"faucet",
"service",
"runner"
]

Expand Down
2 changes: 1 addition & 1 deletion faucet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ async-trait = "0.1.40"
futures = "0.3.5"
git-version = "0.3.4"
lazy_static = "1.4.0"
tracing = { version = "0.1", features = ["log"] }

reqwest = { version = "0.11.11", features = ["json"] }
url = "2.2.2"

# Workspace dependencies
runtime = { path = "../runtime" }
service = { path = "../service" }

[dev-dependencies]
serial_test = "0.9.0"
Expand Down
1 change: 1 addition & 0 deletions faucet/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod error;
mod http;
mod service;

use clap::Parser;
use error::Error;
Expand Down
48 changes: 48 additions & 0 deletions faucet/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use futures::{future::Either, Future, FutureExt};
pub use runtime::{ShutdownReceiver, ShutdownSender};

pub enum TerminationStatus<Res> {
Cancelled,
Completed(Res),
}

pub async fn on_shutdown(shutdown_tx: ShutdownSender, future2: impl Future) {
let mut shutdown_rx = shutdown_tx.subscribe();
let future1 = shutdown_rx.recv().fuse();

let _ = future1.await;
future2.await;
}

async fn run_cancelable<F, Res>(mut shutdown_rx: ShutdownReceiver, future2: F) -> TerminationStatus<Res>
where
F: Future<Output = Res>,
{
let future1 = shutdown_rx.recv().fuse();
let future2 = future2.fuse();

futures::pin_mut!(future1);
futures::pin_mut!(future2);

match futures::future::select(future1, future2).await {
Either::Left((_, _)) => TerminationStatus::Cancelled,
Either::Right((res, _)) => TerminationStatus::Completed(res),
}
}

pub async fn wait_or_shutdown<F, E>(shutdown_tx: ShutdownSender, future2: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
{
match run_cancelable(shutdown_tx.subscribe(), future2).await {
TerminationStatus::Cancelled => {
tracing::trace!("Received shutdown signal");
Ok(())
}
TerminationStatus::Completed(res) => {
tracing::trace!("Sending shutdown signal");
let _ = shutdown_tx.send(());
res
}
}
}
30 changes: 0 additions & 30 deletions service/Cargo.toml

This file was deleted.

38 changes: 0 additions & 38 deletions service/src/error.rs

This file was deleted.

7 changes: 5 additions & 2 deletions vault/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ uses-bitcoind = [] # run tests relying on bitcoind regtest node

[dependencies]
thiserror = "1.0"
backoff = { version = "0.3.0", features = ["tokio"] }
clap = { version = "4.0.17", features = ["derive"]}
tokio = { version = "1.0", features = ["full"] }
tokio-stream = { version = "0.1.9", features = ["sync"] }
tokio-metrics = { version = "0.1.0", default-features = false }
serde = "1.0.136"
hyper = { version = "0.14.27" }
hyper-tls = "0.5.0"
warp = "0.3.2"
serde = { version = "1.0.136", features = ["derive"] }
parity-scale-codec = "3.0.0"
hex = "0.4.2"
futures = "0.3.5"
Expand Down Expand Up @@ -45,7 +49,6 @@ jsonrpc-core-client = { version = "18.0.0", features = ["http", "tls"] }
# Workspace dependencies
bitcoin = { path = "../bitcoin", features = ["cli"] }
runtime = { path = "../runtime" }
service = { path = "../service" }
faucet-rpc = { package = "faucet", path = "../faucet" }

# Substrate dependencies
Expand Down
5 changes: 3 additions & 2 deletions service/src/cli.rs → vault/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::trace;
use clap::Parser;
use std::str::FromStr;

Expand Down Expand Up @@ -44,8 +45,8 @@ impl FromStr for LoggingFormat {
impl LoggingFormat {
pub fn init_subscriber(&self) {
match *self {
Self::Full => crate::trace::init_subscriber(),
Self::Json => crate::trace::init_json_subscriber(),
Self::Full => trace::init_subscriber(),
Self::Json => trace::init_json_subscriber(),
}
}
}
Expand Down
40 changes: 15 additions & 25 deletions service/src/lib.rs → vault/src/connection_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
pub use crate::{
cli::{LoggingFormat, MonitoringConfig, RestartPolicy, ServiceConfig},
trace::init_subscriber,
Error,
};
use async_trait::async_trait;
use backoff::Error as BackoffError;
use bitcoin::{cli::BitcoinOpts as BitcoinConfig, BitcoinCoreApi, Error as BitcoinError};
use futures::{future::Either, Future, FutureExt};
use governor::{Quota, RateLimiter};
Expand All @@ -7,22 +13,14 @@ use runtime::{
cli::ConnectionOpts as ParachainConfig, CurrencyId, InterBtcParachain as BtcParachain, InterBtcSigner, PrettyPrint,
RuntimeCurrencyInfo, VaultId,
};
use std::{fmt, sync::Arc, time::Duration};

mod cli;
mod error;
mod trace;

pub use cli::{LoggingFormat, MonitoringConfig, RestartPolicy, ServiceConfig};
pub use error::Error;
pub use runtime::{ShutdownReceiver, ShutdownSender};
pub use trace::init_subscriber;
use std::{sync::Arc, time::Duration};
pub use warp;

pub type DynBitcoinCoreApi = Arc<dyn BitcoinCoreApi + Send + Sync>;

#[async_trait]
pub trait Service<Config, InnerError> {
pub trait Service<Config> {
const NAME: &'static str;
const VERSION: &'static str;

Expand All @@ -36,7 +34,7 @@ pub trait Service<Config, InnerError> {
constructor: Box<dyn Fn(VaultId) -> Result<DynBitcoinCoreApi, BitcoinError> + Send + Sync>,
keyname: String,
) -> Self;
async fn start(&self) -> Result<(), Error<InnerError>>;
async fn start(&self) -> Result<(), BackoffError<Error>>;
}

pub struct ConnectionManager<Config: Clone, F: Fn()> {
Expand Down Expand Up @@ -77,9 +75,7 @@ impl<Config: Clone + Send + 'static, F: Fn()> ConnectionManager<Config, F> {
}
}

pub async fn start<S: Service<Config, InnerError>, InnerError: fmt::Display>(
&self,
) -> Result<(), Error<InnerError>> {
pub async fn start<S: Service<Config>>(&self) -> Result<(), Error> {
loop {
tracing::info!("Version: {}", S::VERSION);
tracing::info!("AccountId: {}", self.signer.account_id.pretty_print());
Expand Down Expand Up @@ -138,18 +134,20 @@ impl<Config: Clone + Send + 'static, F: Fn()> ConnectionManager<Config, F> {
Box::new(constructor),
self.db_path.clone(),
);

match service.start().await {
Err(err @ Error::Abort(_)) => {
Err(err @ backoff::Error::Permanent(_)) => {
tracing::warn!("Disconnected: {}", err);
return Err(err);
return Err(err.into());
}
Err(err) => {
tracing::warn!("Disconnected: {}", err);
}
_ => {
tracing::warn!("Disconnected");
}
}
};

// propagate shutdown signal from main tasks
let _ = shutdown_tx.send(());

Expand Down Expand Up @@ -223,11 +221,3 @@ where
{
tokio::spawn(run_cancelable(shutdown_rx, future));
}

pub async fn on_shutdown(shutdown_tx: ShutdownSender, future2: impl Future) {
let mut shutdown_rx = shutdown_tx.subscribe();
let future1 = shutdown_rx.recv().fuse();

let _ = future1.await;
future2.await;
}
29 changes: 24 additions & 5 deletions vault/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::string::FromUtf8Error;

use bitcoin::Error as BitcoinError;
use jsonrpc_core_client::RpcError;
use parity_scale_codec::Error as CodecError;
use rocksdb::Error as RocksDbError;
use runtime::Error as RuntimeError;
use serde_json::Error as SerdeJsonError;
use std::{io::Error as IoError, num::ParseIntError, string::FromUtf8Error};
use thiserror::Error;
use tokio::task::JoinError as TokioJoinError;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -44,10 +44,29 @@ pub enum Error {
FromUtf8Error(#[from] FromUtf8Error),
#[error("BroadcastStreamRecvError: {0}")]
BroadcastStreamRecvError(#[from] BroadcastStreamRecvError),
#[error("Client has shutdown")]
ClientShutdown,
#[error("OsString parsing error")]
OsStringError,
#[error("File already exists")]
FileAlreadyExists,
#[error("There is a services already running on the system, with pid {0}")]
ServiceAlreadyRunning(u32),
#[error("Process with pid {0} not found")]
ProcessNotFound(String),
#[error("ParseIntError: {0}")]
ParseIntError(#[from] ParseIntError),
#[error("TokioError: {0}")]
TokioError(#[from] TokioJoinError),
#[error("System I/O error: {0}")]
IoError(#[from] IoError),
}

impl From<Error> for service::Error<Error> {
fn from(err: Error) -> Self {
Self::Retry(err)
impl From<backoff::Error<Error>> for Error {
fn from(err: backoff::Error<Error>) -> Self {
match err {
backoff::Error::Permanent(err) => err,
backoff::Error::Transient(err) => err,
}
}
}
11 changes: 8 additions & 3 deletions vault/src/execution.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use crate::{error::Error, metrics::update_bitcoin_metrics, system::VaultData, VaultIdManager, YIELD_RATE};
use crate::{
error::Error,
metrics::update_bitcoin_metrics,
service::{spawn_cancelable, DynBitcoinCoreApi, ShutdownSender},
system::VaultData,
VaultIdManager, YIELD_RATE,
};
use bitcoin::{
Error as BitcoinError, Hash, SatPerVbyte, Transaction, TransactionExt, TransactionMetadata, Txid,
BLOCK_INTERVAL as BITCOIN_BLOCK_INTERVAL,
Expand All @@ -11,7 +17,6 @@ use runtime::{
RedeemRequestStatus, ReplacePallet, ReplaceRequestStatus, SecurityPallet, UtilFuncs, VaultId, VaultRegistryPallet,
H256,
};
use service::{spawn_cancelable, DynBitcoinCoreApi, Error as ServiceError, ShutdownSender};
use std::{collections::HashMap, convert::TryInto, time::Duration};
use tokio::time::sleep;
use tokio_stream::wrappers::BroadcastStream;
Expand Down Expand Up @@ -476,7 +481,7 @@ pub async fn execute_open_requests(
num_confirmations: u32,
payment_margin: Duration,
auto_rbf: bool,
) -> Result<(), ServiceError<Error>> {
) -> Result<(), Error> {
let parachain_rpc = &parachain_rpc;
let vault_id = parachain_rpc.get_account_id().clone();

Expand Down
Loading

0 comments on commit cc8ed9c

Please sign in to comment.