diff --git a/faucet/src/http.rs b/faucet/src/http.rs index 66992f0b2..6a40ca07a 100644 --- a/faucet/src/http.rs +++ b/faucet/src/http.rs @@ -12,7 +12,7 @@ use runtime::{ InterBtcParachain, VaultRegistryPallet, }; use serde::{Deserialize, Deserializer}; -use std::{collections::HashMap, net::SocketAddr, time::Duration}; +use std::{collections::HashMap, future::Future, net::SocketAddr, time::Duration}; use tokio::time::timeout; const HEALTH_DURATION: Duration = Duration::from_millis(5000); @@ -37,8 +37,11 @@ where D: Deserializer<'de>, { use serde::de::Error; - String::deserialize(deserializer) - .and_then(|string| Vec::from_hex(&string[2..]).map_err(|err| Error::custom(err.to_string()))) + let value = String::deserialize(deserializer)?; + let value = value + .get(2..) + .ok_or(Error::invalid_length(value.len(), &"Sequence of length > 2"))?; + Vec::from_hex(value).map_err(|err| Error::custom(err.to_string())) } fn parse_params(params: Params) -> Result { @@ -78,12 +81,12 @@ enum FundingRequestAccountType { } async fn _fund_account_raw( - parachain_rpc: &InterBtcParachain, + parachain_rpc: InterBtcParachain, params: Params, store: Store, user_allowance: u128, vault_allowance: u128, -) -> Result<(), Error> { +) -> Result>, Error> { let req: FundAccountJsonRpcRequest = parse_params(params)?; let mut allowances = HashMap::new(); allowances.insert(FundingRequestAccountType::User, user_allowance); @@ -193,17 +196,17 @@ async fn ensure_funding_allowed( } async fn atomic_faucet_funding( - parachain_rpc: &InterBtcParachain, + parachain_rpc: InterBtcParachain, kv: Bucket<'_, String, Json>, account_id: AccountId, currency_id: CurrencyId, allowances: HashMap, -) -> Result<(), Error> { +) -> Result> + '_, Error> { let account_str = format!("{}-{}", account_id, currency_id.inner().symbol()); let last_request_json = kv.get(account_str.clone())?; - let account_type = get_account_type(parachain_rpc, account_id.clone()).await?; + let account_type = get_account_type(¶chain_rpc, account_id.clone()).await?; ensure_funding_allowed( - parachain_rpc, + ¶chain_rpc, account_id.clone(), currency_id, last_request_json, @@ -225,33 +228,40 @@ async fn atomic_faucet_funding( amount ); - let mut transfers = vec![parachain_rpc.transfer_to(&account_id, amount, currency_id)]; - if currency_id != parachain_rpc.native_currency_id { - transfers.push(parachain_rpc.transfer_to(&account_id, amount, parachain_rpc.native_currency_id)); - } + Ok(async move { + let mut transfers = vec![parachain_rpc.transfer_to(&account_id, amount, currency_id)]; + if currency_id != parachain_rpc.native_currency_id { + transfers.push(parachain_rpc.transfer_to(&account_id, amount, parachain_rpc.native_currency_id)); + } - let result = futures::future::join_all(transfers).await; + let result = futures::future::join_all(transfers).await; + if let Some(err) = result.into_iter().find_map(|x| x.err()) { + return Err(err.into()); + } - if let Some(err) = result.into_iter().find_map(|x| x.err()) { - return Err(err.into()); - } + log::info!("Finished funding {}", account_id); - // Replace the previous (expired) claim datetime with the datetime of the current claim, only update - // this after successfully transferring funds to ensure that this can be called again on error - update_kv_store(&kv, account_str, Utc::now().to_rfc2822(), account_type.clone())?; - Ok(()) + // Replace the previous (expired) claim datetime with the datetime of the current claim, only update + // this after successfully transferring funds to ensure that this can be called again on error + update_kv_store(&kv, account_str, Utc::now().to_rfc2822(), account_type.clone())?; + Ok(()) + }) } async fn fund_account( - parachain_rpc: &InterBtcParachain, + parachain_rpc: InterBtcParachain, req: FundAccountJsonRpcRequest, store: Store, allowances: HashMap, -) -> Result<(), Error> { - let parachain_rpc = parachain_rpc.clone(); - let kv = open_kv_store(store)?; - atomic_faucet_funding(¶chain_rpc, kv, req.account_id.clone(), req.currency_id, allowances).await?; - Ok(()) +) -> Result>, Error> { + atomic_faucet_funding( + parachain_rpc, + open_kv_store(store)?, + req.account_id.clone(), + req.currency_id, + allowances, + ) + .await } pub async fn start_http( @@ -281,12 +291,16 @@ pub async fn start_http( let parachain_rpc = parachain_rpc.clone(); let store = store.clone(); async move { - let result = - _fund_account_raw(¶chain_rpc.clone(), params, store, user_allowance, vault_allowance).await; - if let Err(ref err) = result { - log::debug!("Failed to fund account: {}", err); + match _fund_account_raw(parachain_rpc, params, store, user_allowance, vault_allowance).await { + Ok(task) => { + tokio::spawn(task); + handle_resp(Ok(())) + } + Err(err) => { + log::debug!("Failed to fund account: {}", err); + handle_resp::<()>(Err(err)) + } } - handle_resp(result) } }); };