diff --git a/vault/src/execution.rs b/vault/src/execution.rs index 4fa9332fd..eebdbdfac 100644 --- a/vault/src/execution.rs +++ b/vault/src/execution.rs @@ -2,7 +2,7 @@ use crate::{ error::Error, metrics::update_bitcoin_metrics, storage::TransactionStore, system::VaultData, VaultIdManager, }; use bitcoin::{ - BitcoinCoreApi, LockedTransaction, PartialAddress, TransactionExt, TransactionMetadata, + BitcoinCoreApi, LockedTransaction, PartialAddress, Transaction, TransactionExt, TransactionMetadata, Txid, BLOCK_INTERVAL as BITCOIN_BLOCK_INTERVAL, }; use futures::{ @@ -172,6 +172,57 @@ impl Request { } } + async fn wait_and_execute< + B: BitcoinCoreApi + Clone + Send + Sync + 'static, + P: ReplacePallet + + RefundPallet + + BtcRelayPallet + + RedeemPallet + + SecurityPallet + + VaultRegistryPallet + + UtilFuncs + + Clone + + Send + + Sync, + >( + &self, + parachain_rpc: P, + btc_rpc: B, + txid: Txid, + num_confirmations: u32, + ) { + // Payment has been made, but it might not have been confirmed enough times yet + let tx_metadata = btc_rpc.wait_for_transaction_metadata(txid, num_confirmations).await; + + match tx_metadata { + Ok(tx_metadata) => { + // we have enough btc confirmations, now make sure they have been relayed before we continue + if let Err(e) = parachain_rpc + .wait_for_block_in_relay( + H256Le::from_bytes_le(&tx_metadata.block_hash.to_vec()), + Some(num_confirmations), + ) + .await + { + tracing::error!( + "Error while waiting for block inclusion for request #{}: {}", + self.hash, + e + ); + // continue; try to execute anyway + } + + match self.execute(parachain_rpc.clone(), tx_metadata).await { + Ok(_) => { + tracing::info!("Executed request #{:?}", self.hash); + } + Err(e) => tracing::error!("Failed to execute request #{}: {}", self.hash, e), + } + } + Err(e) => tracing::error!("Failed to confirm bitcoin transaction for request {}: {}", self.hash, e), + } + } + /// Makes the bitcoin transfer and executes the request pub async fn pay_and_execute< B: BitcoinCoreApi + Clone + Send + Sync + 'static, @@ -355,6 +406,7 @@ pub async fn execute_open_requests( shutdown_tx: ShutdownSender, parachain_rpc: InterBtcParachain, vault_id_manager: VaultIdManager, + read_only_btc_rpc: B, tx_store: Arc, num_confirmations: u32, payment_margin: Duration, @@ -413,6 +465,7 @@ where .map(|x| (x.hash, x)) .collect::>(); + // 1. check tx store for request txs for (hash, request) in open_requests.clone().into_iter() { // get the request this transaction corresponds to, if any if let Ok(tx) = tx_store.get_tx(&hash) { @@ -448,48 +501,68 @@ where // try sending but ignore the result as it may have already been processed let _ = btc_rpc.send_transaction(locked_tx).await; - // Payment has been made, but it might not have been confirmed enough times yet - let tx_metadata = btc_rpc - .clone() - .wait_for_transaction_metadata(txid, num_confirmations) + request + .wait_and_execute(parachain_rpc, btc_rpc, txid, num_confirmations) .await; + }); + } + } + + // find the height of bitcoin chain corresponding to the earliest btc_height + let btc_start_height = match open_requests + .iter() + .map(|(_, request)| request.btc_height.unwrap_or(u32::MAX)) + .min() + { + Some(x) => x, + None => return Ok(()), // the iterator is empty so we have nothing to do + }; - match tx_metadata { - Ok(tx_metadata) => { - // we have enough btc confirmations, now make sure they have been relayed before we continue - if let Err(e) = parachain_rpc - .wait_for_block_in_relay( - H256Le::from_bytes_le(&tx_metadata.block_hash.to_vec()), - Some(num_confirmations), - ) - .await - { - tracing::error!( - "Error while waiting for block inclusion for request #{}: {}", - request.hash, - e - ); - // continue; try to execute anyway - } - - match request.execute(parachain_rpc.clone(), tx_metadata).await { - Ok(_) => { - tracing::info!("Executed request #{:?}", request.hash); - } - Err(e) => tracing::error!("Failed to execute request #{}: {}", request.hash, e), - } + // 2. fallback to mempool / blocks to find payments (for backward compatibility) + // iterate through transactions in reverse order, starting from those in the mempool + let mut transaction_stream = bitcoin::reverse_stream_transactions(&read_only_btc_rpc, btc_start_height).await?; + while let Some(result) = transaction_stream.next().await { + let tx = match result { + Ok(x) => x, + Err(e) => { + tracing::warn!("Failed to process transaction: {}", e); + continue; + } + }; + + // get the request this transaction corresponds to, if any + if let Some(request) = get_request_for_btc_tx(&tx, &open_requests) { + open_requests.remove(&request.hash); + + tracing::info!( + "{:?} request #{:?} has valid bitcoin payment - processing...", + request.request_type, + request.hash + ); + // start a new task to (potentially) await confirmation and to execute on the parachain + // make copies of the variables we move into the task + let parachain_rpc = parachain_rpc.clone(); + let btc_rpc = vault_id_manager.clone(); + spawn_cancelable(shutdown_tx.subscribe(), async move { + let btc_rpc = match btc_rpc.get_bitcoin_rpc(&request.vault_id).await { + Some(x) => x, + None => { + tracing::error!( + "Failed to fetch bitcoin rpc for vault {}", + request.vault_id.pretty_print() + ); + return; // nothing we can do - bail } - Err(e) => tracing::error!( - "Failed to confirm bitcoin transaction for request {}: {}", - request.hash, - e - ), - } + }; + + request + .wait_and_execute(parachain_rpc, btc_rpc, tx.txid(), num_confirmations) + .await; }); } } - // All requests remaining in the hashmap did not have a bitcoin payment yet, so pay + // All requests remaining in the hashmap do not have a bitcoin payment yet, so pay // and execute all of these for (_, request) in open_requests { // there are potentially a large number of open requests - pay and execute each @@ -539,6 +612,19 @@ where Ok(()) } +/// Get the Request from the hashmap that the given Transaction satisfies, based +/// on the OP_RETURN and the amount of btc that is transfered to the address +fn get_request_for_btc_tx(tx: &Transaction, hash_map: &HashMap) -> Option { + let hash = tx.get_op_return()?; + let request = hash_map.get(&hash)?; + let paid_amount = tx.get_payment_amount_to(request.btc_address)?; + if paid_amount as u128 >= request.amount { + Some(request.clone()) + } else { + None + } +} + #[cfg(all(test, feature = "standalone-metadata"))] mod tests { use crate::metrics::PerCurrencyMetrics; @@ -547,7 +633,7 @@ mod tests { use async_trait::async_trait; use bitcoin::{ json, Amount, Block, BlockHash, BlockHeader, Error as BitcoinError, GetBlockResult, Network, PartialAddress, - PrivateKey, Transaction, TransactionMetadata, Txid, PUBLIC_KEY_SIZE, + PrivateKey, Transaction, TransactionMetadata, PUBLIC_KEY_SIZE, }; use runtime::{ AccountId, BlockNumber, BtcPublicKey, CurrencyId, Error as RuntimeError, ErrorCode, InterBtcRichBlockHeader, diff --git a/vault/src/system.rs b/vault/src/system.rs index 632a4b340..59c315187 100644 --- a/vault/src/system.rs +++ b/vault/src/system.rs @@ -531,6 +531,7 @@ impl VaultService { self.shutdown.clone(), self.btc_parachain.clone(), self.vault_id_manager.clone(), + self.btc_rpc_master_wallet.clone(), rocksdb.clone(), num_confirmations, self.config.payment_margin_minutes,