Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: separate faucet funding from validation #300

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 46 additions & 32 deletions faucet/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use runtime::{
InterBtcParachain, VaultRegistryPallet,
};
use serde::{Deserialize, Deserializer};
use std::{collections::HashMap, net::SocketAddr, time::Duration};
use std::{collections::HashMap, future::Future, net::SocketAddr, time::Duration};
use tokio::time::timeout;

const HEALTH_DURATION: Duration = Duration::from_millis(5000);
Expand All @@ -37,8 +37,11 @@ where
D: Deserializer<'de>,
{
use serde::de::Error;
String::deserialize(deserializer)
.and_then(|string| Vec::from_hex(&string[2..]).map_err(|err| Error::custom(err.to_string())))
let value = String::deserialize(deserializer)?;
let value = value
.get(2..)
.ok_or(Error::invalid_length(value.len(), &"Sequence of length > 2"))?;
Vec::from_hex(value).map_err(|err| Error::custom(err.to_string()))
}

fn parse_params<T: Decode>(params: Params) -> Result<T, Error> {
Expand Down Expand Up @@ -78,12 +81,12 @@ enum FundingRequestAccountType {
}

async fn _fund_account_raw(
parachain_rpc: &InterBtcParachain,
parachain_rpc: InterBtcParachain,
params: Params,
store: Store,
user_allowance: u128,
vault_allowance: u128,
) -> Result<(), Error> {
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
let req: FundAccountJsonRpcRequest = parse_params(params)?;
let mut allowances = HashMap::new();
allowances.insert(FundingRequestAccountType::User, user_allowance);
Expand Down Expand Up @@ -193,17 +196,17 @@ async fn ensure_funding_allowed(
}

async fn atomic_faucet_funding(
parachain_rpc: &InterBtcParachain,
parachain_rpc: InterBtcParachain,
kv: Bucket<'_, String, Json<FaucetRequest>>,
account_id: AccountId,
currency_id: CurrencyId,
allowances: HashMap<FundingRequestAccountType, u128>,
) -> Result<(), Error> {
) -> Result<impl Future<Output = Result<(), Error>> + '_, Error> {
let account_str = format!("{}-{}", account_id, currency_id.inner().symbol());
let last_request_json = kv.get(account_str.clone())?;
let account_type = get_account_type(parachain_rpc, account_id.clone()).await?;
let account_type = get_account_type(&parachain_rpc, account_id.clone()).await?;
ensure_funding_allowed(
parachain_rpc,
&parachain_rpc,
account_id.clone(),
currency_id,
last_request_json,
Expand All @@ -225,33 +228,40 @@ async fn atomic_faucet_funding(
amount
);

let mut transfers = vec![parachain_rpc.transfer_to(&account_id, amount, currency_id)];
if currency_id != parachain_rpc.native_currency_id {
transfers.push(parachain_rpc.transfer_to(&account_id, amount, parachain_rpc.native_currency_id));
}
Ok(async move {
let mut transfers = vec![parachain_rpc.transfer_to(&account_id, amount, currency_id)];
if currency_id != parachain_rpc.native_currency_id {
transfers.push(parachain_rpc.transfer_to(&account_id, amount, parachain_rpc.native_currency_id));
}

let result = futures::future::join_all(transfers).await;
let result = futures::future::join_all(transfers).await;
if let Some(err) = result.into_iter().find_map(|x| x.err()) {
return Err(err.into());
}

if let Some(err) = result.into_iter().find_map(|x| x.err()) {
return Err(err.into());
}
log::info!("Finished funding {}", account_id);

// Replace the previous (expired) claim datetime with the datetime of the current claim, only update
// this after successfully transferring funds to ensure that this can be called again on error
update_kv_store(&kv, account_str, Utc::now().to_rfc2822(), account_type.clone())?;
Ok(())
// Replace the previous (expired) claim datetime with the datetime of the current claim, only update
// this after successfully transferring funds to ensure that this can be called again on error
update_kv_store(&kv, account_str, Utc::now().to_rfc2822(), account_type.clone())?;
Ok(())
})
}

async fn fund_account(
parachain_rpc: &InterBtcParachain,
parachain_rpc: InterBtcParachain,
req: FundAccountJsonRpcRequest,
store: Store,
allowances: HashMap<FundingRequestAccountType, u128>,
) -> Result<(), Error> {
let parachain_rpc = parachain_rpc.clone();
let kv = open_kv_store(store)?;
atomic_faucet_funding(&parachain_rpc, kv, req.account_id.clone(), req.currency_id, allowances).await?;
Ok(())
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
atomic_faucet_funding(
parachain_rpc,
open_kv_store(store)?,
req.account_id.clone(),
req.currency_id,
allowances,
)
.await
}

pub async fn start_http(
Expand Down Expand Up @@ -281,12 +291,16 @@ pub async fn start_http(
let parachain_rpc = parachain_rpc.clone();
let store = store.clone();
async move {
let result =
_fund_account_raw(&parachain_rpc.clone(), params, store, user_allowance, vault_allowance).await;
if let Err(ref err) = result {
log::debug!("Failed to fund account: {}", err);
match _fund_account_raw(parachain_rpc, params, store, user_allowance, vault_allowance).await {
Ok(task) => {
tokio::spawn(task);
handle_resp(Ok(()))
}
Err(err) => {
log::debug!("Failed to fund account: {}", err);
handle_resp::<()>(Err(err))
}
}
handle_resp(result)
}
});
};
Expand Down