From c4424e5b73a44f8673639c5c6c95a6a888e964f1 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 27 Jul 2020 14:58:14 +0300 Subject: [PATCH] Auto relay exchange transactions (#227) * auto relay exchange transactions * docker + auto-relay-tx * clippy * jsonrpsee in Cargo.lock ??? * fix tests compilation * Show sccache * mute clippy * move * Update relays/ethereum/src/exchange.rs Co-authored-by: Hernando Castano * finish comment * (bool, String) -> StringifiedMaybeConnectionError * Update deployments/rialto/docker-compose.yml Co-authored-by: Hernando Castano Co-authored-by: Denis S. Soldatov aka General-Beck Co-authored-by: Hernando Castano --- bridges/bin/node/runtime/src/lib.rs | 6 + bridges/modules/currency-exchange/src/lib.rs | 72 +- .../primitives/currency-exchange/Cargo.toml | 7 + .../primitives/currency-exchange/src/lib.rs | 16 +- bridges/relays/ethereum/Cargo.toml | 1 + bridges/relays/ethereum/src/cli.yml | 8 +- .../relays/ethereum/src/ethereum_client.rs | 17 + .../relays/ethereum/src/ethereum_exchange.rs | 220 ++++-- bridges/relays/ethereum/src/exchange.rs | 687 ++++++++++++++---- bridges/relays/ethereum/src/exchange_loop.rs | 267 +++++++ bridges/relays/ethereum/src/main.rs | 21 +- bridges/relays/ethereum/src/rpc.rs | 4 + bridges/relays/ethereum/src/rpc_errors.rs | 13 - .../relays/ethereum/src/substrate_client.rs | 21 +- bridges/relays/ethereum/src/sync_loop.rs | 13 +- .../relays/ethereum/src/sync_loop_tests.rs | 3 +- bridges/relays/ethereum/src/utils.rs | 16 + 17 files changed, 1163 insertions(+), 229 deletions(-) create mode 100644 bridges/relays/ethereum/src/exchange_loop.rs diff --git a/bridges/bin/node/runtime/src/lib.rs b/bridges/bin/node/runtime/src/lib.rs index d77b54764438..9f473fee9d8b 100644 --- a/bridges/bin/node/runtime/src/lib.rs +++ b/bridges/bin/node/runtime/src/lib.rs @@ -555,6 +555,12 @@ impl_runtime_apis! { } } + impl sp_currency_exchange::CurrencyExchangeApi for Runtime { + fn filter_transaction_proof(proof: exchange::EthereumTransactionInclusionProof) -> bool { + BridgeCurrencyExchange::filter_transaction_proof(&proof) + } + } + impl sp_transaction_pool::runtime_api::TaggedTransactionQueue for Runtime { fn validate_transaction( source: TransactionSource, diff --git a/bridges/modules/currency-exchange/src/lib.rs b/bridges/modules/currency-exchange/src/lib.rs index 12d8657cab2f..1c39e1aea932 100644 --- a/bridges/modules/currency-exchange/src/lib.rs +++ b/bridges/modules/currency-exchange/src/lib.rs @@ -105,36 +105,21 @@ decl_module! { ) -> DispatchResult { let submitter = frame_system::ensure_signed(origin)?; - // ensure that transaction is included in finalized block that we know of - let transaction = ::PeerBlockchain::verify_transaction_inclusion_proof( - &proof, - ).ok_or_else(|| Error::::UnfinalizedTransaction)?; - - // parse transaction - let transaction = ::PeerMaybeLockFundsTransaction::parse(&transaction) - .map_err(Error::::from)?; - let transfer_id = transaction.id; - ensure!( - !Transfers::::contains_key(&transfer_id), - Error::::AlreadyClaimed - ); - - // grant recipient - let recipient = T::RecipientsMap::map(transaction.recipient).map_err(Error::::from)?; - let amount = T::CurrencyConverter::convert(transaction.amount).map_err(Error::::from)?; + // verify and parse transaction proof + let deposit = prepare_deposit_details::(&proof)?; // make sure to update the mapping if we deposit successfully to avoid double spending, // i.e. whenever `deposit_into` is successful we MUST update `Transfers`. { // if any changes were made to the storage, we can't just return error here, because // otherwise the same proof may be imported again - let deposit_result = T::DepositInto::deposit_into(recipient, amount); + let deposit_result = T::DepositInto::deposit_into(deposit.recipient, deposit.amount); match deposit_result { Ok(_) => (), Err(ExchangeError::DepositPartiallyFailed) => (), Err(error) => return Err(Error::::from(error).into()), } - Transfers::::insert(&transfer_id, ()) + Transfers::::insert(&deposit.transfer_id, ()) } // reward submitter for providing valid message @@ -143,7 +128,7 @@ decl_module! { frame_support::debug::trace!( target: "runtime", "Completed currency exchange: {:?}", - transfer_id, + deposit.transfer_id, ); Ok(()) @@ -158,6 +143,18 @@ decl_storage! { } } +impl Module { + /// Returns true if currency exchange module is able to import given transaction proof in + /// its current state. + pub fn filter_transaction_proof(proof: &::TransactionInclusionProof) -> bool { + if prepare_deposit_details::(proof).is_err() { + return false; + } + + true + } +} + impl From for Error { fn from(error: ExchangeError) -> Self { match error { @@ -176,6 +173,41 @@ impl OnTransactionSubmitted for () { fn on_valid_transaction_submitted(_: AccountId) {} } +/// Exchange deposit details. +struct DepositDetails { + /// Transfer id. + pub transfer_id: ::Id, + /// Transfer recipient. + pub recipient: ::Recipient, + /// Transfer amount. + pub amount: ::TargetAmount, +} + +/// Verify and parse transaction proof, preparing everything required for importing +/// this transaction proof. +fn prepare_deposit_details( + proof: &<::PeerBlockchain as Blockchain>::TransactionInclusionProof, +) -> Result, Error> { + // ensure that transaction is included in finalized block that we know of + let transaction = ::PeerBlockchain::verify_transaction_inclusion_proof(proof) + .ok_or_else(|| Error::::UnfinalizedTransaction)?; + + // parse transaction + let transaction = ::PeerMaybeLockFundsTransaction::parse(&transaction).map_err(Error::::from)?; + let transfer_id = transaction.id; + ensure!(!Transfers::::contains_key(&transfer_id), Error::::AlreadyClaimed); + + // grant recipient + let recipient = T::RecipientsMap::map(transaction.recipient).map_err(Error::::from)?; + let amount = T::CurrencyConverter::convert(transaction.amount).map_err(Error::::from)?; + + Ok(DepositDetails { + transfer_id, + recipient, + amount, + }) +} + #[cfg(test)] mod tests { use super::*; diff --git a/bridges/primitives/currency-exchange/Cargo.toml b/bridges/primitives/currency-exchange/Cargo.toml index 4cdba7ec1398..198d4ad531ed 100644 --- a/bridges/primitives/currency-exchange/Cargo.toml +++ b/bridges/primitives/currency-exchange/Cargo.toml @@ -11,6 +11,12 @@ codec = { package = "parity-scale-codec", version = "1.3.1", default-features = # Substrate Based Dependencies +[dependencies.sp-api] +version = "2.0.0-rc4" +tag = 'v2.0.0-rc4' +default-features = false +git = "https://github.com/paritytech/substrate.git" + [dependencies.sp-std] version = "2.0.0-rc4" tag = 'v2.0.0-rc4' @@ -27,6 +33,7 @@ git = "https://github.com/paritytech/substrate.git" default = ["std"] std = [ "codec/std", + "sp-api/std", "sp-std/std", "frame-support/std", ] diff --git a/bridges/primitives/currency-exchange/src/lib.rs b/bridges/primitives/currency-exchange/src/lib.rs index 00836dee23bf..28a3afd08080 100644 --- a/bridges/primitives/currency-exchange/src/lib.rs +++ b/bridges/primitives/currency-exchange/src/lib.rs @@ -15,9 +15,14 @@ // along with Parity Bridges Common. If not, see . #![cfg_attr(not(feature = "std"), no_std)] +// RuntimeApi generated functions +#![allow(clippy::too_many_arguments)] +// Generated by `DecodeLimit::decode_with_depth_limit` +#![allow(clippy::unnecessary_mut_passed)] use codec::{Decode, Encode, EncodeLike}; -use frame_support::RuntimeDebug; +use frame_support::{Parameter, RuntimeDebug}; +use sp_api::decl_runtime_apis; use sp_std::marker::PhantomData; /// All errors that may happen during exchange. @@ -127,3 +132,12 @@ impl CurrencyConverter for IdentityCurrencyConverter { Ok(currency) } } + +decl_runtime_apis! { + /// API for exchange transactions submitters. + pub trait CurrencyExchangeApi { + /// Returns true if currency exchange module is able to import transaction proof in + /// its current state. + fn filter_transaction_proof(proof: Proof) -> bool; + } +} diff --git a/bridges/relays/ethereum/Cargo.toml b/bridges/relays/ethereum/Cargo.toml index 027d471e69e2..cd39d6ea4453 100644 --- a/bridges/relays/ethereum/Cargo.toml +++ b/bridges/relays/ethereum/Cargo.toml @@ -29,6 +29,7 @@ rustc-hex = "2.0.1" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.57" sysinfo = "0.15" +sp-currency-exchange = { path = "../../primitives/currency-exchange" } sp-bridge-eth-poa = { path = "../../primitives/ethereum-poa" } time = "0.2" web3 = "0.13" diff --git a/bridges/relays/ethereum/src/cli.yml b/bridges/relays/ethereum/src/cli.yml index 988af146a7c5..2026da39ef46 100644 --- a/bridges/relays/ethereum/src/cli.yml +++ b/bridges/relays/ethereum/src/cli.yml @@ -111,12 +111,18 @@ subcommands: args: - eth-host: *eth-host - eth-port: *eth-port + - eth-start-with-block: + long: eth-start-with-block + value_name: ETH_START_WITH_BLOCK + help: Auto-relay transactions starting with given block number. If not specified, starts with best finalized Ethereum block (known to Substrate node) transactions. + takes_value: true + conflicts_with: + - eth-tx-hash - eth-tx-hash: long: eth-tx-hash value_name: ETH_TX_HASH help: Hash of the lock funds transaction. takes_value: true - required: true - sub-host: *sub-host - sub-port: *sub-port - sub-signer: *sub-signer diff --git a/bridges/relays/ethereum/src/ethereum_client.rs b/bridges/relays/ethereum/src/ethereum_client.rs index e23f1e5c5a8e..aba5d1ca7431 100644 --- a/bridges/relays/ethereum/src/ethereum_client.rs +++ b/bridges/relays/ethereum/src/ethereum_client.rs @@ -129,6 +129,23 @@ impl EthereumRpc for EthereumRpcClient { } } + async fn header_by_number_with_transactions(&self, number: u64) -> Result { + let get_full_tx_objects = true; + let header = Ethereum::get_block_by_number_with_transactions(&self.client, number, get_full_tx_objects).await?; + + let is_complete_header = header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some(); + if !is_complete_header { + return Err(RpcError::Ethereum(EthereumNodeError::IncompleteHeader)); + } + + let is_complete_transactions = header.transactions.iter().all(|tx| tx.raw.is_some()); + if !is_complete_transactions { + return Err(RpcError::Ethereum(EthereumNodeError::IncompleteTransaction)); + } + + Ok(header) + } + async fn header_by_hash_with_transactions(&self, hash: H256) -> Result { let get_full_tx_objects = true; let header = Ethereum::get_block_by_hash_with_transactions(&self.client, hash, get_full_tx_objects).await?; diff --git a/bridges/relays/ethereum/src/ethereum_exchange.rs b/bridges/relays/ethereum/src/ethereum_exchange.rs index 95ded39f60cd..d27f3345e8a6 100644 --- a/bridges/relays/ethereum/src/ethereum_exchange.rs +++ b/bridges/relays/ethereum/src/ethereum_exchange.rs @@ -18,11 +18,16 @@ use crate::ethereum_client::{EthereumConnectionParams, EthereumRpcClient}; use crate::ethereum_types::{ - EthereumHeaderId, Transaction as EthereumTransaction, TransactionHash as EthereumTransactionHash, H256, + EthereumHeaderId, HeaderWithTransactions as EthereumHeaderWithTransactions, Transaction as EthereumTransaction, + TransactionHash as EthereumTransactionHash, H256, }; -use crate::exchange::{relay_single_transaction_proof, SourceClient, TargetClient, TransactionProofPipeline}; +use crate::exchange::{ + relay_single_transaction_proof, SourceBlock, SourceClient, SourceTransaction, TargetClient, + TransactionProofPipeline, +}; +use crate::exchange_loop::{run as run_loop, InMemoryStorage}; use crate::rpc::{EthereumRpc, SubstrateRpc}; -use crate::rpc_errors::{EthereumNodeError, RpcError}; +use crate::rpc_errors::RpcError; use crate::substrate_client::{ SubmitEthereumExchangeTransactionProof, SubstrateConnectionParams, SubstrateRpcClient, SubstrateSigningParams, }; @@ -30,6 +35,7 @@ use crate::sync_types::HeaderId; use async_trait::async_trait; use bridge_node_runtime::exchange::EthereumTransactionInclusionProof; +use sp_currency_exchange::MaybeLockFundsTransaction; use std::time::Duration; /// Interval at which we ask Ethereum node for updates. @@ -37,17 +43,26 @@ const ETHEREUM_TICK_INTERVAL: Duration = Duration::from_secs(10); /// Interval at which we ask Substrate node for updates. const SUBSTRATE_TICK_INTERVAL: Duration = Duration::from_secs(5); +/// Exchange relay mode. +#[derive(Debug)] +pub enum ExchangeRelayMode { + /// Relay single transaction and quit. + Single(EthereumTransactionHash), + /// Auto-relay transactions starting with given block. + Auto(Option), +} + /// PoA exchange transaction relay params. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct EthereumExchangeParams { /// Ethereum connection params. pub eth: EthereumConnectionParams, - /// Hash of the Ethereum transaction to relay. - pub eth_tx_hash: EthereumTransactionHash, /// Substrate connection params. pub sub: SubstrateConnectionParams, /// Substrate signing params. pub sub_sign: SubstrateSigningParams, + /// Relay working mode. + pub mode: ExchangeRelayMode, } /// Ethereum to Substrate exchange pipeline. @@ -57,13 +72,46 @@ impl TransactionProofPipeline for EthereumToSubstrateExchange { const SOURCE_NAME: &'static str = "Ethereum"; const TARGET_NAME: &'static str = "Substrate"; - type BlockHash = H256; - type BlockNumber = u64; - type TransactionHash = EthereumTransactionHash; - type Transaction = EthereumTransaction; + type Block = EthereumSourceBlock; type TransactionProof = EthereumTransactionInclusionProof; } +/// Ethereum source block. +struct EthereumSourceBlock(EthereumHeaderWithTransactions); + +impl SourceBlock for EthereumSourceBlock { + type Hash = H256; + type Number = u64; + type Transaction = EthereumSourceTransaction; + + fn id(&self) -> EthereumHeaderId { + HeaderId( + self.0.number.expect(crate::ethereum_types::HEADER_ID_PROOF).as_u64(), + self.0.hash.expect(crate::ethereum_types::HEADER_ID_PROOF), + ) + } + + fn transactions(&self) -> Vec { + self.0 + .transactions + .iter() + .cloned() + .map(EthereumSourceTransaction) + .collect() + } +} + +/// Ethereum source transaction. +struct EthereumSourceTransaction(EthereumTransaction); + +impl SourceTransaction for EthereumSourceTransaction { + type Hash = EthereumTransactionHash; + + fn hash(&self) -> Self::Hash { + self.0.hash + } +} + /// Ethereum node as transactions proof source. struct EthereumTransactionsSource { client: EthereumRpcClient, @@ -77,63 +125,60 @@ impl SourceClient for EthereumTransactionsSource { async_std::task::sleep(ETHEREUM_TICK_INTERVAL).await; } - async fn transaction( + async fn block_by_hash(&self, hash: H256) -> Result { + self.client + .header_by_hash_with_transactions(hash) + .await + .map(EthereumSourceBlock) + } + + async fn block_by_number(&self, number: u64) -> Result { + self.client + .header_by_number_with_transactions(number) + .await + .map(EthereumSourceBlock) + } + + async fn transaction_block( &self, hash: &EthereumTransactionHash, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let eth_tx = match self.client.transaction_by_hash(*hash).await? { Some(eth_tx) => eth_tx, None => return Ok(None), }; // we need transaction to be mined => check if it is included in the block - let eth_header_id = match (eth_tx.block_number, eth_tx.block_hash) { - (Some(block_number), Some(block_hash)) => HeaderId(block_number.as_u64(), block_hash), + let (eth_header_id, eth_tx_index) = match (eth_tx.block_number, eth_tx.block_hash, eth_tx.transaction_index) { + (Some(block_number), Some(block_hash), Some(transaction_index)) => ( + HeaderId(block_number.as_u64(), block_hash), + transaction_index.as_u64() as _, + ), _ => return Ok(None), }; - Ok(Some((eth_header_id, eth_tx))) + Ok(Some((eth_header_id, eth_tx_index))) } async fn transaction_proof( &self, - eth_header_id: &EthereumHeaderId, - eth_tx: EthereumTransaction, + block: &EthereumSourceBlock, + tx_index: usize, ) -> Result { const TRANSACTION_HAS_RAW_FIELD_PROOF: &str = "RPC level checks that transactions from Ethereum\ node are having `raw` field; qed"; + const BLOCK_HAS_HASH_FIELD_PROOF: &str = "RPC level checks that block has `hash` field; qed"; - let eth_header = self.client.header_by_hash_with_transactions(eth_header_id.1).await?; - let eth_relay_tx_hash = eth_tx.hash; - let mut eth_relay_tx = Some(eth_tx); - let mut eth_relay_tx_index = None; - let mut transaction_proof = Vec::with_capacity(eth_header.transactions.len()); - for (index, eth_tx) in eth_header.transactions.into_iter().enumerate() { - if eth_tx.hash != eth_relay_tx_hash { - let eth_raw_tx = eth_tx.raw.expect(TRANSACTION_HAS_RAW_FIELD_PROOF); - transaction_proof.push(eth_raw_tx.0); - } else { - let eth_raw_relay_tx = match eth_relay_tx.take() { - Some(eth_relay_tx) => eth_relay_tx.raw.expect(TRANSACTION_HAS_RAW_FIELD_PROOF), - None => { - return Err( - EthereumNodeError::DuplicateBlockTransaction(*eth_header_id, eth_relay_tx_hash).into(), - ) - } - }; - eth_relay_tx_index = Some(index as u64); - transaction_proof.push(eth_raw_relay_tx.0); - } - } + let transaction_proof = block + .0 + .transactions + .iter() + .map(|tx| tx.raw.clone().expect(TRANSACTION_HAS_RAW_FIELD_PROOF).0) + .collect(); Ok(EthereumTransactionInclusionProof { - block: eth_header_id.1, - index: eth_relay_tx_index.ok_or_else(|| { - RpcError::from(EthereumNodeError::BlockMissingTransaction( - *eth_header_id, - eth_relay_tx_hash, - )) - })?, + block: block.0.hash.expect(BLOCK_HAS_HASH_FIELD_PROOF), + index: tx_index as _, proof: transaction_proof, }) } @@ -170,15 +215,51 @@ impl TargetClient for SubstrateTransactionsTarget { Ok(id.0 <= best_finalized_ethereum_block.0) } + async fn best_finalized_header_id(&self) -> Result { + self.client.best_ethereum_finalized_block().await + } + + async fn filter_transaction_proof(&self, proof: &EthereumTransactionInclusionProof) -> Result { + // let's try to parse transaction locally + let parse_result = bridge_node_runtime::exchange::EthTransaction::parse(&proof.proof[proof.index as usize]); + if parse_result.is_err() { + return Ok(false); + } + + // seems that transaction is relayable - let's check if runtime is able to import it + // (we can't if e.g. header is pruned or there's some issue with tx data) + self.client.verify_exchange_transaction_proof(proof.clone()).await + } + async fn submit_transaction_proof(&self, proof: EthereumTransactionInclusionProof) -> Result<(), Self::Error> { let sign_params = self.sign_params.clone(); self.client.submit_exchange_transaction_proof(sign_params, proof).await } } -/// Relay exchange transaction proof to Substrate node. +impl Default for EthereumExchangeParams { + fn default() -> Self { + EthereumExchangeParams { + eth: Default::default(), + sub: Default::default(), + sub_sign: Default::default(), + mode: ExchangeRelayMode::Auto(None), + } + } +} + +/// Relay exchange transaction proof(s) to Substrate node. pub fn run(params: EthereumExchangeParams) { - let eth_tx_hash = params.eth_tx_hash; + match params.mode { + ExchangeRelayMode::Single(eth_tx_hash) => run_single_transaction_relay(params, eth_tx_hash), + ExchangeRelayMode::Auto(eth_start_with_block_number) => { + run_auto_transactions_relay_loop(params, eth_start_with_block_number) + } + }; +} + +/// Run single transaction proof relay and stop. +fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H256) { let mut local_pool = futures::executor::LocalPool::new(); let result = local_pool.run_until(async move { @@ -212,3 +293,46 @@ pub fn run(params: EthereumExchangeParams) { } } } + +/// Run auto-relay loop. +fn run_auto_transactions_relay_loop(params: EthereumExchangeParams, eth_start_with_block_number: Option) { + let do_run_loop = move || -> Result<(), String> { + let eth_client = EthereumRpcClient::new(params.eth); + let sub_client = async_std::task::block_on(SubstrateRpcClient::new(params.sub)) + .map_err(|err| format!("Error starting Substrate client: {:?}", err))?; + + let eth_start_with_block_number = match eth_start_with_block_number { + Some(eth_start_with_block_number) => eth_start_with_block_number, + None => { + async_std::task::block_on(sub_client.best_ethereum_finalized_block()) + .map_err(|err| { + format!( + "Error retrieving best finalized Ethereum block from Substrate node: {:?}", + err + ) + })? + .0 + } + }; + + run_loop( + InMemoryStorage::new(eth_start_with_block_number), + EthereumTransactionsSource { client: eth_client }, + SubstrateTransactionsTarget { + client: sub_client, + sign_params: params.sub_sign, + }, + futures::future::pending(), + ); + + Ok(()) + }; + + if let Err(err) = do_run_loop() { + log::error!( + target: "bridge", + "Error auto-relaying Ethereum transactions proofs to Substrate node: {}", + err, + ); + } +} diff --git a/bridges/relays/ethereum/src/exchange.rs b/bridges/relays/ethereum/src/exchange.rs index 352e53fe6cb3..4de47e597505 100644 --- a/bridges/relays/ethereum/src/exchange.rs +++ b/bridges/relays/ethereum/src/exchange.rs @@ -14,10 +14,15 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -//! Relaying proofs of exchange transactions. +//! Relaying proofs of exchange transaction. + +use crate::sync_types::MaybeConnectionError; use async_trait::async_trait; -use std::fmt::{Debug, Display}; +use std::{ + fmt::{Debug, Display}, + string::ToString, +}; /// Transaction proof pipeline. pub trait TransactionProofPipeline { @@ -26,50 +31,76 @@ pub trait TransactionProofPipeline { /// Name of the transaction proof target. const TARGET_NAME: &'static str; + /// Block type. + type Block: SourceBlock; + /// Transaction inclusion proof type. + type TransactionProof; +} + +/// Block that is participating in exchange. +pub trait SourceBlock { /// Block hash type. - type BlockHash: Display; + type Hash: Clone + Debug + Display; /// Block number type. - type BlockNumber: Display; + type Number: Debug + Display + Clone + Copy + std::cmp::Ord + std::ops::Add + num_traits::One; + /// Block transaction. + type Transaction: SourceTransaction; + + /// Return hash of the block. + fn id(&self) -> crate::sync_types::HeaderId; + /// Return block transactions iterator. + fn transactions(&self) -> Vec; +} + +/// Transaction that is participating in exchange. +pub trait SourceTransaction { /// Transaction hash type. - type TransactionHash: Display; - /// Transaction type. - type Transaction; - /// Transaction inclusion proof type. - type TransactionProof; + type Hash: Debug + Display; + + /// Return transaction hash. + fn hash(&self) -> Self::Hash; } +/// Block hash for given pipeline. +pub type BlockHashOf

= <

::Block as SourceBlock>::Hash; + +/// Block number for given pipeline. +pub type BlockNumberOf

= <

::Block as SourceBlock>::Number; + +/// Transaction hash for given pipeline. +pub type TransactionOf

= <

::Block as SourceBlock>::Transaction; + +/// Transaction hash for given pipeline. +pub type TransactionHashOf

= as SourceTransaction>::Hash; + /// Header id. -pub type HeaderId

= crate::sync_types::HeaderId< -

::BlockHash, -

::BlockNumber, ->; +pub type HeaderId

= crate::sync_types::HeaderId, BlockNumberOf

>; /// Source client API. #[async_trait] pub trait SourceClient { /// Error type. - type Error: Debug; + type Error: Debug + MaybeConnectionError; /// Sleep until exchange-related data is (probably) updated. async fn tick(&self); - /// Return **mined** transaction by its hash. May return `Ok(None)` if transaction is unknown to the source node. - async fn transaction( - &self, - hash: &P::TransactionHash, - ) -> Result, P::Transaction)>, Self::Error>; + /// Get block by hash. + async fn block_by_hash(&self, hash: BlockHashOf

) -> Result; + /// Get canonical block by number. + async fn block_by_number(&self, number: BlockNumberOf

) -> Result; + /// Return block + index where transaction has been **mined**. May return `Ok(None)` if transaction + /// is unknown to the source node. + async fn transaction_block(&self, hash: &TransactionHashOf

) + -> Result, usize)>, Self::Error>; /// Prepare transaction proof. - async fn transaction_proof( - &self, - header: &HeaderId

, - transaction: P::Transaction, - ) -> Result; + async fn transaction_proof(&self, block: &P::Block, tx_index: usize) -> Result; } /// Target client API. #[async_trait] pub trait TargetClient { /// Error type. - type Error: Debug; + type Error: Debug + MaybeConnectionError; /// Sleep until exchange-related data is (probably) updated. async fn tick(&self); @@ -77,44 +108,209 @@ pub trait TargetClient { async fn is_header_known(&self, id: &HeaderId

) -> Result; /// Returns `Ok(true)` if header is finalized by the target node. async fn is_header_finalized(&self, id: &HeaderId

) -> Result; + /// Returns best finalized header id. + async fn best_finalized_header_id(&self) -> Result, Self::Error>; + /// Returns `Ok(true)` if transaction proof is need to be relayed. + async fn filter_transaction_proof(&self, proof: &P::TransactionProof) -> Result; /// Submits transaction proof to the target node. async fn submit_transaction_proof(&self, proof: P::TransactionProof) -> Result<(), Self::Error>; } +/// Block transaction statistics. +#[derive(Debug, Default)] +#[cfg_attr(test, derive(PartialEq))] +pub struct RelayedBlockTransactions { + /// Total number of transactions processed (either relayed or ignored) so far. + pub processed: usize, + /// Total number of transactions successfully relayed so far. + pub relayed: usize, + /// Total number of transactions that we have failed to relay so far. + pub failed: usize, +} + +/// Stringified error that may be either connection-related or not. +enum StringifiedMaybeConnectionError { + /// The error is connection-related error. + Connection(String), + /// The error is connection-unrelated error. + NonConnection(String), +} + +/// Relay all suitable transactions from single block. +/// +/// If connection error occurs, returns Err with number of successfully processed transactions. +/// If some other error occurs, it is ignored and other transactions are processed. +/// +/// All transaction-level traces are written by this function. This function is not tracing +/// any information about block. +pub async fn relay_block_transactions( + source_client: &impl SourceClient

, + target_client: &impl TargetClient

, + source_block: &P::Block, + mut relayed_transactions: RelayedBlockTransactions, +) -> Result { + let transactions_to_process = source_block + .transactions() + .into_iter() + .enumerate() + .skip(relayed_transactions.processed); + for (source_tx_index, source_tx) in transactions_to_process { + let result = async { + let source_tx_id = format!("{}/{}", source_block.id().1, source_tx_index); + let source_tx_proof = + prepare_transaction_proof(source_client, &source_tx_id, source_block, source_tx_index).await?; + + let needs_to_be_relayed = + target_client + .filter_transaction_proof(&source_tx_proof) + .await + .map_err(|err| { + StringifiedMaybeConnectionError::new( + err.is_connection_error(), + format!("Transaction filtering has failed with {:?}", err), + ) + })?; + + if !needs_to_be_relayed { + return Ok(false); + } + + relay_ready_transaction_proof(target_client, &source_tx_id, source_tx_proof) + .await + .map(|_| true) + } + .await; + + // We have two options here: + // 1) retry with the same transaction later; + // 2) report error and proceed with next transaction. + // + // Option#1 may seems better, but: + // 1) we do not track if transaction is mined (without an error) by the target node; + // 2) error could be irrecoverable (e.g. when block is already pruned by bridge module or tx + // has invalid format) && we'll end up in infinite loop of retrying the same transaction proof. + // + // So we're going with option#2 here (the only exception are connection errors). + match result { + Ok(false) => { + relayed_transactions.processed += 1; + } + Ok(true) => { + log::info!( + target: "bridge", + "{} transaction {} proof has been successfully submitted to {} node", + P::SOURCE_NAME, + source_tx.hash(), + P::TARGET_NAME, + ); + + relayed_transactions.processed += 1; + relayed_transactions.relayed += 1; + } + Err(err) => { + log::error!( + target: "bridge", + "Error relaying {} transaction {} proof to {} node: {}. {}", + P::SOURCE_NAME, + source_tx.hash(), + P::TARGET_NAME, + err.to_string(), + if err.is_connection_error() { + "Going to retry after delay..." + } else { + "You may need to submit proof of this transaction manually" + }, + ); + + if err.is_connection_error() { + return Err(relayed_transactions); + } + + relayed_transactions.processed += 1; + relayed_transactions.failed += 1; + } + } + } + + Ok(relayed_transactions) +} + /// Relay single transaction proof. pub async fn relay_single_transaction_proof( source_client: &impl SourceClient

, target_client: &impl TargetClient

, - source_tx_hash: P::TransactionHash, + source_tx_hash: TransactionHashOf

, ) -> Result<(), String> { // wait for transaction and header on source node - let (source_header_id, source_tx) = wait_transaction_mined(source_client, &source_tx_hash).await?; - let transaction_proof = source_client - .transaction_proof(&source_header_id, source_tx) - .await - .map_err(|err| { - format!( - "Error building transaction {} proof on {} node: {:?}", - source_tx_hash, - P::SOURCE_NAME, - err, - ) - })?; + let (source_header_id, source_tx_index) = wait_transaction_mined(source_client, &source_tx_hash).await?; + let source_block = source_client.block_by_hash(source_header_id.1.clone()).await; + let source_block = source_block.map_err(|err| { + format!( + "Error retrieving block {} from {} node: {:?}", + source_header_id.1, + P::SOURCE_NAME, + err, + ) + })?; // wait for transaction and header on target node wait_header_imported(target_client, &source_header_id).await?; wait_header_finalized(target_client, &source_header_id).await?; - // and finally - submit transaction proof to target node + // and finally - prepare and submit transaction proof to target node + let source_tx_id = format!("{}", source_tx_hash); + relay_ready_transaction_proof( + target_client, + &source_tx_id, + prepare_transaction_proof(source_client, &source_tx_id, &source_block, source_tx_index) + .await + .map_err(|err| err.to_string())?, + ) + .await + .map_err(|err| err.to_string()) +} + +/// Prepare transaction proof. +async fn prepare_transaction_proof( + source_client: &impl SourceClient

, + source_tx_id: &str, + source_block: &P::Block, + source_tx_index: usize, +) -> Result { + source_client + .transaction_proof(source_block, source_tx_index) + .await + .map_err(|err| { + StringifiedMaybeConnectionError::new( + err.is_connection_error(), + format!( + "Error building transaction {} proof on {} node: {:?}", + source_tx_id, + P::SOURCE_NAME, + err, + ), + ) + }) +} + +/// Relay prepared proof of transaction. +async fn relay_ready_transaction_proof( + target_client: &impl TargetClient

, + source_tx_id: &str, + source_tx_proof: P::TransactionProof, +) -> Result<(), StringifiedMaybeConnectionError> { target_client - .submit_transaction_proof(transaction_proof) + .submit_transaction_proof(source_tx_proof) .await .map_err(|err| { - format!( - "Error submitting transaction {} proof to {} node: {:?}", - source_tx_hash, - P::TARGET_NAME, - err, + StringifiedMaybeConnectionError::new( + err.is_connection_error(), + format!( + "Error submitting transaction {} proof to {} node: {:?}", + source_tx_id, + P::TARGET_NAME, + err, + ), ) }) } @@ -122,10 +318,10 @@ pub async fn relay_single_transaction_proof( /// Wait until transaction is mined by source node. async fn wait_transaction_mined( source_client: &impl SourceClient

, - source_tx_hash: &P::TransactionHash, -) -> Result<(HeaderId

, P::Transaction), String> { + source_tx_hash: &TransactionHashOf

, +) -> Result<(HeaderId

, usize), String> { loop { - let source_header_and_tx = source_client.transaction(&source_tx_hash).await.map_err(|err| { + let source_header_and_tx = source_client.transaction_block(&source_tx_hash).await.map_err(|err| { format!( "Error retrieving transaction {} from {} node: {:?}", source_tx_hash, @@ -245,68 +441,144 @@ async fn wait_header_finalized( } } +impl StringifiedMaybeConnectionError { + fn new(is_connection_error: bool, error: String) -> Self { + if is_connection_error { + StringifiedMaybeConnectionError::Connection(error) + } else { + StringifiedMaybeConnectionError::NonConnection(error) + } + } +} + +impl MaybeConnectionError for StringifiedMaybeConnectionError { + fn is_connection_error(&self) -> bool { + match *self { + StringifiedMaybeConnectionError::Connection(_) => true, + StringifiedMaybeConnectionError::NonConnection(_) => false, + } + } +} + +impl ToString for StringifiedMaybeConnectionError { + fn to_string(&self) -> String { + match *self { + StringifiedMaybeConnectionError::Connection(ref err) => err.clone(), + StringifiedMaybeConnectionError::NonConnection(ref err) => err.clone(), + } + } +} + #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; use crate::sync_types::HeaderId; use parking_lot::Mutex; + use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + }; + + pub fn test_block_id() -> TestHeaderId { + HeaderId(1, 1) + } - fn test_header_id() -> TestHeaderId { - HeaderId(100, 100) + pub fn test_next_block_id() -> TestHeaderId { + HeaderId(2, 2) } - fn test_transaction_hash() -> TestTransactionHash { - 200 + pub fn test_transaction_hash(tx_index: u64) -> TestTransactionHash { + 200 + tx_index } - fn test_transaction() -> TestTransaction { - 300 + pub fn test_transaction(tx_index: u64) -> TestTransaction { + TestTransaction(test_transaction_hash(tx_index)) } - fn test_transaction_proof() -> TestTransactionProof { - 400 + pub fn test_block() -> TestBlock { + TestBlock(test_block_id(), vec![test_transaction(0)]) } - type TestError = u64; - type TestBlockNumber = u64; - type TestBlockHash = u64; - type TestTransactionHash = u64; - type TestTransaction = u64; - type TestTransactionProof = u64; - type TestHeaderId = HeaderId; + pub fn test_next_block() -> TestBlock { + TestBlock(test_next_block_id(), vec![test_transaction(1)]) + } + + pub type TestBlockNumber = u64; + pub type TestBlockHash = u64; + pub type TestTransactionHash = u64; + pub type TestHeaderId = HeaderId; - struct TestTransactionProofPipeline; + #[derive(Debug, Clone, PartialEq)] + pub struct TestError(pub bool); + + impl MaybeConnectionError for TestError { + fn is_connection_error(&self) -> bool { + self.0 + } + } + + pub struct TestTransactionProofPipeline; impl TransactionProofPipeline for TestTransactionProofPipeline { const SOURCE_NAME: &'static str = "TestSource"; const TARGET_NAME: &'static str = "TestTarget"; - type BlockHash = TestBlockHash; - type BlockNumber = TestBlockNumber; - type TransactionHash = TestTransactionHash; - type Transaction = TestTransaction; + type Block = TestBlock; type TransactionProof = TestTransactionProof; } - struct TestTransactionsSource { - on_tick: Box, - data: Mutex, + #[derive(Debug, Clone)] + pub struct TestBlock(pub TestHeaderId, pub Vec); + + impl SourceBlock for TestBlock { + type Hash = TestBlockHash; + type Number = TestBlockNumber; + type Transaction = TestTransaction; + + fn id(&self) -> TestHeaderId { + self.0 + } + + fn transactions(&self) -> Vec { + self.1.clone() + } } - struct TestTransactionsSourceData { - transaction: Result, TestError>, - transaction_proof: Result, + #[derive(Debug, Clone)] + pub struct TestTransaction(pub TestTransactionHash); + + impl SourceTransaction for TestTransaction { + type Hash = TestTransactionHash; + + fn hash(&self) -> Self::Hash { + self.0 + } + } + + #[derive(Debug, Clone, PartialEq)] + pub struct TestTransactionProof(pub TestTransactionHash); + + pub struct TestTransactionsSource { + pub on_tick: Box, + pub data: Arc>, + } + + pub struct TestTransactionsSourceData { + pub block: Result, + pub transaction_block: Result, TestError>, + pub proofs_to_fail: HashMap, } impl TestTransactionsSource { - fn new(on_tick: Box) -> Self { + pub fn new(on_tick: Box) -> Self { Self { on_tick, - data: Mutex::new(TestTransactionsSourceData { - transaction: Ok(Some((test_header_id(), test_transaction()))), - transaction_proof: Ok(test_transaction_proof()), - }), + data: Arc::new(Mutex::new(TestTransactionsSourceData { + block: Ok(test_block()), + transaction_block: Ok(Some((test_block_id(), 0))), + proofs_to_fail: HashMap::new(), + })), } } } @@ -319,42 +591,53 @@ mod tests { (self.on_tick)(&mut *self.data.lock()) } - async fn transaction( - &self, - _: &TestTransactionHash, - ) -> Result, TestError> { - self.data.lock().transaction + async fn block_by_hash(&self, _: TestBlockHash) -> Result { + self.data.lock().block.clone() + } + + async fn block_by_number(&self, _: TestBlockNumber) -> Result { + self.data.lock().block.clone() } - async fn transaction_proof( - &self, - _: &TestHeaderId, - _: TestTransaction, - ) -> Result { - self.data.lock().transaction_proof + async fn transaction_block(&self, _: &TestTransactionHash) -> Result, TestError> { + self.data.lock().transaction_block.clone() + } + + async fn transaction_proof(&self, block: &TestBlock, index: usize) -> Result { + let tx_hash = block.1[index].hash(); + let proof_error = self.data.lock().proofs_to_fail.get(&tx_hash).cloned(); + if let Some(err) = proof_error { + return Err(err); + } + + Ok(TestTransactionProof(tx_hash)) } } - struct TestTransactionsTarget { - on_tick: Box, - data: Mutex, + pub struct TestTransactionsTarget { + pub on_tick: Box, + pub data: Arc>, } - struct TestTransactionsTargetData { - is_header_known: Result, - is_header_finalized: Result, - submitted_proofs: Vec, + pub struct TestTransactionsTargetData { + pub is_header_known: Result, + pub is_header_finalized: Result, + pub best_finalized_header_id: Result, + pub transactions_to_accept: HashSet, + pub submitted_proofs: Vec, } impl TestTransactionsTarget { - fn new(on_tick: Box) -> Self { + pub fn new(on_tick: Box) -> Self { Self { on_tick, - data: Mutex::new(TestTransactionsTargetData { + data: Arc::new(Mutex::new(TestTransactionsTargetData { is_header_known: Ok(true), is_header_finalized: Ok(true), + best_finalized_header_id: Ok(test_block_id()), + transactions_to_accept: vec![test_transaction_hash(0)].into_iter().collect(), submitted_proofs: Vec::new(), - }), + })), } } } @@ -368,11 +651,19 @@ mod tests { } async fn is_header_known(&self, _: &TestHeaderId) -> Result { - self.data.lock().is_header_known + self.data.lock().is_header_known.clone() } async fn is_header_finalized(&self, _: &TestHeaderId) -> Result { - self.data.lock().is_header_finalized + self.data.lock().is_header_finalized.clone() + } + + async fn best_finalized_header_id(&self) -> Result { + self.data.lock().best_finalized_header_id.clone() + } + + async fn filter_transaction_proof(&self, proof: &TestTransactionProof) -> Result { + Ok(self.data.lock().transactions_to_accept.contains(&proof.0)) } async fn submit_transaction_proof(&self, proof: TestTransactionProof) -> Result<(), TestError> { @@ -381,23 +672,22 @@ mod tests { } } - fn ensure_success(source: TestTransactionsSource, target: TestTransactionsTarget) { + fn ensure_relay_single_success(source: &TestTransactionsSource, target: &TestTransactionsTarget) { assert_eq!( - async_std::task::block_on(relay_single_transaction_proof( - &source, - &target, - test_transaction_hash(), - )), + async_std::task::block_on(relay_single_transaction_proof(source, target, test_transaction_hash(0),)), Ok(()), ); - assert_eq!(target.data.lock().submitted_proofs, vec![test_transaction_proof()],); + assert_eq!( + target.data.lock().submitted_proofs, + vec![TestTransactionProof(test_transaction_hash(0))], + ); } - fn ensure_failure(source: TestTransactionsSource, target: TestTransactionsTarget) { + fn ensure_relay_single_failure(source: TestTransactionsSource, target: TestTransactionsTarget) { assert!(async_std::task::block_on(relay_single_transaction_proof( &source, &target, - test_transaction_hash(), + test_transaction_hash(0), )) .is_err(),); assert!(target.data.lock().submitted_proofs.is_empty()); @@ -407,21 +697,21 @@ mod tests { fn ready_transaction_proof_relayed_immediately() { let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - ensure_success(source, target) + ensure_relay_single_success(&source, &target) } #[test] fn relay_transaction_proof_waits_for_transaction_to_be_mined() { let source = TestTransactionsSource::new(Box::new(|source_data| { - assert_eq!(source_data.transaction, Ok(None)); - source_data.transaction = Ok(Some((test_header_id(), test_transaction()))); + assert_eq!(source_data.transaction_block, Ok(None)); + source_data.transaction_block = Ok(Some((test_block_id(), 0))); })); let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); // transaction is not yet mined, but will be available after first wait (tick) - source.data.lock().transaction = Ok(None); + source.data.lock().transaction_block = Ok(None); - ensure_success(source, target) + ensure_relay_single_success(&source, &target) } #[test] @@ -429,9 +719,9 @@ mod tests { let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - source.data.lock().transaction = Err(0); + source.data.lock().transaction_block = Err(TestError(false)); - ensure_failure(source, target) + ensure_relay_single_failure(source, target) } #[test] @@ -439,9 +729,13 @@ mod tests { let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - source.data.lock().transaction_proof = Err(0); + source + .data + .lock() + .proofs_to_fail + .insert(test_transaction_hash(0), TestError(false)); - ensure_failure(source, target) + ensure_relay_single_failure(source, target) } #[test] @@ -455,7 +749,7 @@ mod tests { // header is not yet imported, but will be available after first wait (tick) target.data.lock().is_header_known = Ok(false); - ensure_success(source, target) + ensure_relay_single_success(&source, &target) } #[test] @@ -463,9 +757,9 @@ mod tests { let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - target.data.lock().is_header_known = Err(0); + target.data.lock().is_header_known = Err(TestError(false)); - ensure_failure(source, target) + ensure_relay_single_failure(source, target) } #[test] @@ -479,7 +773,7 @@ mod tests { // header is not yet finalized, but will be available after first wait (tick) target.data.lock().is_header_finalized = Ok(false); - ensure_success(source, target) + ensure_relay_single_success(&source, &target) } #[test] @@ -487,8 +781,147 @@ mod tests { let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - target.data.lock().is_header_finalized = Err(0); + target.data.lock().is_header_finalized = Err(TestError(false)); + + ensure_relay_single_failure(source, target) + } + + #[test] + fn relay_transaction_proof_fails_when_target_node_rejects_proof() { + let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); + let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); + + target + .data + .lock() + .transactions_to_accept + .remove(&test_transaction_hash(0)); + + ensure_relay_single_success(&source, &target) + } + + fn test_relay_block_transactions( + source: &TestTransactionsSource, + target: &TestTransactionsTarget, + pre_relayed: RelayedBlockTransactions, + ) -> Result { + async_std::task::block_on(relay_block_transactions( + source, + target, + &TestBlock( + test_block_id(), + vec![test_transaction(0), test_transaction(1), test_transaction(2)], + ), + pre_relayed, + )) + } + + #[test] + fn relay_block_transactions_process_all_transactions() { + let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); + let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); + + // let's only accept tx#1 + target + .data + .lock() + .transactions_to_accept + .remove(&test_transaction_hash(0)); + target + .data + .lock() + .transactions_to_accept + .insert(test_transaction_hash(1)); + + let relayed_transactions = test_relay_block_transactions(&source, &target, Default::default()); + assert_eq!( + relayed_transactions, + Ok(RelayedBlockTransactions { + processed: 3, + relayed: 1, + failed: 0, + }), + ); + assert_eq!( + target.data.lock().submitted_proofs, + vec![TestTransactionProof(test_transaction_hash(1))], + ); + } + + #[test] + fn relay_block_transactions_ignores_transaction_failure() { + let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); + let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); - ensure_failure(source, target) + // let's reject proof for tx#0 + source + .data + .lock() + .proofs_to_fail + .insert(test_transaction_hash(0), TestError(false)); + + let relayed_transactions = test_relay_block_transactions(&source, &target, Default::default()); + assert_eq!( + relayed_transactions, + Ok(RelayedBlockTransactions { + processed: 3, + relayed: 0, + failed: 1, + }), + ); + assert_eq!(target.data.lock().submitted_proofs, vec![],); + } + + #[test] + fn relay_block_transactions_fails_on_connection_error() { + let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed"))); + let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed"))); + + // fail with connection error when preparing proof for tx#1 + source + .data + .lock() + .proofs_to_fail + .insert(test_transaction_hash(1), TestError(true)); + + let relayed_transactions = test_relay_block_transactions(&source, &target, Default::default()); + assert_eq!( + relayed_transactions, + Err(RelayedBlockTransactions { + processed: 1, + relayed: 1, + failed: 0, + }), + ); + assert_eq!( + target.data.lock().submitted_proofs, + vec![TestTransactionProof(test_transaction_hash(0))], + ); + + // now do not fail on tx#2 + source.data.lock().proofs_to_fail.clear(); + // and also relay tx#3 + target + .data + .lock() + .transactions_to_accept + .insert(test_transaction_hash(2)); + + let relayed_transactions = test_relay_block_transactions(&source, &target, relayed_transactions.unwrap_err()); + assert_eq!( + relayed_transactions, + Ok(RelayedBlockTransactions { + processed: 3, + relayed: 2, + failed: 0, + }), + ); + assert_eq!( + target.data.lock().submitted_proofs, + vec![ + TestTransactionProof(test_transaction_hash(0)), + TestTransactionProof(test_transaction_hash(2)) + ], + ); } } diff --git a/bridges/relays/ethereum/src/exchange_loop.rs b/bridges/relays/ethereum/src/exchange_loop.rs new file mode 100644 index 000000000000..678211d7c139 --- /dev/null +++ b/bridges/relays/ethereum/src/exchange_loop.rs @@ -0,0 +1,267 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Relaying proofs of exchange transactions. + +use crate::exchange::{ + relay_block_transactions, BlockNumberOf, RelayedBlockTransactions, SourceClient, TargetClient, + TransactionProofPipeline, +}; +use crate::utils::retry_backoff; + +use backoff::backoff::Backoff; +use futures::{future::FutureExt, select}; +use num_traits::One; +use std::{future::Future, time::Duration}; + +/// Delay after connection-related error happened before we'll try +/// reconnection again. +const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10); + +/// Transactions proofs relay state. +#[derive(Debug)] +pub struct TransactionProofsRelayState { + /// Number of last header we have processed so far. + pub best_processed_header_number: BlockNumber, +} + +/// Transactions proofs relay storage. +pub trait TransactionProofsRelayStorage { + /// Associated block number. + type BlockNumber; + + /// Get relay state. + fn state(&self) -> TransactionProofsRelayState; + /// Update relay state. + fn set_state(&mut self, state: &TransactionProofsRelayState); +} + +/// In-memory storage for auto-relay loop. +#[derive(Debug)] +pub struct InMemoryStorage { + best_processed_header_number: BlockNumber, +} + +impl InMemoryStorage { + /// Created new in-memory storage with given best processed block number. + pub fn new(best_processed_header_number: BlockNumber) -> Self { + InMemoryStorage { + best_processed_header_number, + } + } +} + +impl TransactionProofsRelayStorage for InMemoryStorage { + type BlockNumber = BlockNumber; + + fn state(&self) -> TransactionProofsRelayState { + TransactionProofsRelayState { + best_processed_header_number: self.best_processed_header_number, + } + } + + fn set_state(&mut self, state: &TransactionProofsRelayState) { + self.best_processed_header_number = state.best_processed_header_number; + } +} + +/// Run proofs synchronization. +pub fn run( + mut storage: impl TransactionProofsRelayStorage>, + source_client: impl SourceClient

, + target_client: impl TargetClient

, + exit_signal: impl Future, +) { + let mut local_pool = futures::executor::LocalPool::new(); + + local_pool.run_until(async move { + let mut retry_backoff = retry_backoff(); + let mut state = storage.state(); + let mut current_finalized_block = None; + + let exit_signal = exit_signal.fuse(); + + futures::pin_mut!(exit_signal); + + loop { + let iteration_result = run_loop_iteration( + &mut storage, + &source_client, + &target_client, + &mut state, + &mut current_finalized_block, + ) + .await; + + match iteration_result { + Ok(_) => { + retry_backoff.reset(); + + select! { + _ = source_client.tick().fuse() => {}, + _ = exit_signal => return, + } + } + Err(_) => { + let retry_timeout = retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY); + + select! { + _ = async_std::task::sleep(retry_timeout).fuse() => {}, + _ = exit_signal => return, + } + } + } + } + }); +} + +/// Run exchange loop until we need to break. +async fn run_loop_iteration( + storage: &mut impl TransactionProofsRelayStorage>, + source_client: &impl SourceClient

, + target_client: &impl TargetClient

, + state: &mut TransactionProofsRelayState>, + current_finalized_block: &mut Option<(P::Block, RelayedBlockTransactions)>, +) -> Result<(), ()> { + let best_finalized_header_id = match target_client.best_finalized_header_id().await { + Ok(best_finalized_header_id) => { + log::trace!( + target: "bridge", + "Got best finalized {} block from {} node: {:?}", + P::SOURCE_NAME, + P::TARGET_NAME, + best_finalized_header_id, + ); + + best_finalized_header_id + } + Err(err) => { + log::error!( + target: "bridge", + "Failed to retrieve best {} header id from {} node: {:?}. Going to retry...", + P::SOURCE_NAME, + P::TARGET_NAME, + err, + ); + + return Err(()); + } + }; + + loop { + // if we already have some finalized block body, try to relay its transactions + if let Some((block, relayed_transactions)) = current_finalized_block.take() { + let result = relay_block_transactions(source_client, target_client, &block, relayed_transactions).await; + + match result { + Ok(relayed_transactions) => { + log::info!( + target: "bridge", + "Relay has processed {} block #{}. Total/Relayed/Failed transactions: {}/{}/{}", + P::SOURCE_NAME, + state.best_processed_header_number, + relayed_transactions.processed, + relayed_transactions.relayed, + relayed_transactions.failed, + ); + + state.best_processed_header_number = state.best_processed_header_number + One::one(); + storage.set_state(state); + + // we have just updated state => proceed to next block retrieval + } + Err(relayed_transactions) => { + *current_finalized_block = Some((block, relayed_transactions)); + return Err(()); + } + } + } + + // we may need to retrieve finalized block body from source node + if best_finalized_header_id.0 > state.best_processed_header_number { + let next_block_number = state.best_processed_header_number + One::one(); + let result = source_client.block_by_number(next_block_number).await; + + match result { + Ok(block) => { + *current_finalized_block = Some((block, RelayedBlockTransactions::default())); + + // we have received new finalized block => go back to relay its transactions + continue; + } + Err(err) => { + log::error!( + target: "bridge", + "Failed to retrieve canonical block #{} from {} node: {:?}. Going to retry...", + next_block_number, + P::SOURCE_NAME, + err, + ); + + return Err(()); + } + } + } + + // there are no any transactions we need to relay => wait for new data + return Ok(()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::exchange::tests::{ + test_next_block, test_next_block_id, test_transaction_hash, TestTransactionProof, TestTransactionsSource, + TestTransactionsTarget, + }; + use futures::{future::FutureExt, stream::StreamExt}; + + #[test] + fn exchange_loop_is_able_to_relay_proofs() { + let storage = InMemoryStorage { + best_processed_header_number: 0, + }; + let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no target ticks allowed"))); + let target_data = target.data.clone(); + let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded(); + + let source = TestTransactionsSource::new(Box::new(move |data| { + let transaction1_relayed = target_data + .lock() + .submitted_proofs + .contains(&TestTransactionProof(test_transaction_hash(0))); + let transaction2_relayed = target_data + .lock() + .submitted_proofs + .contains(&TestTransactionProof(test_transaction_hash(1))); + match (transaction1_relayed, transaction2_relayed) { + (true, true) => exit_sender.unbounded_send(()).unwrap(), + (true, false) => { + data.block = Ok(test_next_block()); + target_data.lock().best_finalized_header_id = Ok(test_next_block_id()); + target_data + .lock() + .transactions_to_accept + .insert(test_transaction_hash(1)); + } + _ => (), + } + })); + + run(storage, source, target, exit_receiver.into_future().map(|(_, _)| ())); + } +} diff --git a/bridges/relays/ethereum/src/main.rs b/bridges/relays/ethereum/src/main.rs index 4894c7efb27e..403ff101f863 100644 --- a/bridges/relays/ethereum/src/main.rs +++ b/bridges/relays/ethereum/src/main.rs @@ -22,6 +22,7 @@ mod ethereum_exchange; mod ethereum_sync_loop; mod ethereum_types; mod exchange; +mod exchange_loop; mod headers; mod metrics; mod rpc; @@ -260,11 +261,21 @@ fn ethereum_exchange_params(matches: &clap::ArgMatches) -> Result ethereum_exchange::ExchangeRelayMode::Single( + eth_tx_hash + .parse() + .map_err(|e| format!("Failed to parse eth-tx-hash: {}", e))?, + ), + None => ethereum_exchange::ExchangeRelayMode::Auto(match matches.value_of("eth-start-with-block") { + Some(eth_start_with_block) => Some( + eth_start_with_block + .parse() + .map_err(|e| format!("Failed to parse eth-start-with-block: {}", e))?, + ), + None => None, + }), + }; Ok(params) } diff --git a/bridges/relays/ethereum/src/rpc.rs b/bridges/relays/ethereum/src/rpc.rs index 412add5f8efd..5fc73ed02eae 100644 --- a/bridges/relays/ethereum/src/rpc.rs +++ b/bridges/relays/ethereum/src/rpc.rs @@ -50,6 +50,8 @@ jsonrpsee::rpc_api! { fn get_block_by_number(block_number: U64, full_tx_objs: bool) -> EthereumHeader; #[rpc(method = "eth_getBlockByHash", positional_params)] fn get_block_by_hash(hash: H256, full_tx_objs: bool) -> EthereumHeader; + #[rpc(method = "eth_getBlockByNumber", positional_params)] + fn get_block_by_number_with_transactions(number: U64, full_tx_objs: bool) -> EthereumHeaderWithTransactions; #[rpc(method = "eth_getBlockByHash", positional_params)] fn get_block_by_hash_with_transactions(hash: H256, full_tx_objs: bool) -> EthereumHeaderWithTransactions; #[rpc(method = "eth_getTransactionByHash", positional_params)] @@ -91,6 +93,8 @@ pub trait EthereumRpc { async fn header_by_number(&self, block_number: u64) -> Result; /// Retrieve block header by its hash from Ethereum node. async fn header_by_hash(&self, hash: H256) -> Result; + /// Retrieve block header and its transactions by its number from Ethereum node. + async fn header_by_number_with_transactions(&self, block_number: u64) -> Result; /// Retrieve block header and its transactions by its hash from Ethereum node. async fn header_by_hash_with_transactions(&self, hash: H256) -> Result; /// Retrieve transaction by its hash from Ethereum node. diff --git a/bridges/relays/ethereum/src/rpc_errors.rs b/bridges/relays/ethereum/src/rpc_errors.rs index 4645d4e4dcf8..9739237520b0 100644 --- a/bridges/relays/ethereum/src/rpc_errors.rs +++ b/bridges/relays/ethereum/src/rpc_errors.rs @@ -14,7 +14,6 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -use crate::ethereum_types::{EthereumHeaderId, TransactionHash as EthereumTransactionHash}; use crate::sync_types::MaybeConnectionError; use jsonrpsee::client::RequestError; @@ -103,10 +102,6 @@ pub enum EthereumNodeError { /// An invalid Substrate block number was received from /// an Ethereum node. InvalidSubstrateBlockNumber, - /// Block includes the same transaction more than once. - DuplicateBlockTransaction(EthereumHeaderId, EthereumTransactionHash), - /// Block is missing transaction we believe is a part of this block. - BlockMissingTransaction(EthereumHeaderId, EthereumTransactionHash), } impl ToString for EthereumNodeError { @@ -122,14 +117,6 @@ impl ToString for EthereumNodeError { } Self::IncompleteTransaction => "Incomplete Ethereum Transaction (missing required field - raw)".to_string(), Self::InvalidSubstrateBlockNumber => "Received an invalid Substrate block from Ethereum Node".to_string(), - Self::DuplicateBlockTransaction(header_id, tx_hash) => format!( - "Ethereum block {}/{} includes Ethereum transaction {} more than once", - header_id.0, header_id.1, tx_hash, - ), - Self::BlockMissingTransaction(header_id, tx_hash) => format!( - "Ethereum block {}/{} is missing Ethereum transaction {} which we believe is a part of this block", - header_id.0, header_id.1, tx_hash, - ), } } } diff --git a/bridges/relays/ethereum/src/substrate_client.rs b/bridges/relays/ethereum/src/substrate_client.rs index e0c59457b012..32f219d4ea33 100644 --- a/bridges/relays/ethereum/src/substrate_client.rs +++ b/bridges/relays/ethereum/src/substrate_client.rs @@ -38,6 +38,7 @@ const ETH_API_IMPORT_REQUIRES_RECEIPTS: &str = "EthereumHeadersApi_is_import_req const ETH_API_IS_KNOWN_BLOCK: &str = "EthereumHeadersApi_is_known_block"; const ETH_API_BEST_BLOCK: &str = "EthereumHeadersApi_best_block"; const ETH_API_BEST_FINALIZED_BLOCK: &str = "EthereumHeadersApi_finalized_block"; +const EXCH_API_FILTER_TRANSACTION_PROOF: &str = "CurrencyExchangeApi_filter_transaction_proof"; const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities"; type Result = std::result::Result; @@ -145,7 +146,7 @@ impl SubstrateRpc for SubstrateRpcClient { async fn best_ethereum_finalized_block(&self) -> Result { let call = ETH_API_BEST_FINALIZED_BLOCK.to_string(); - let data = Bytes("0x".into()); + let data = Bytes(Vec::new()); let encoded_response = Substrate::state_call(&self.client, call, data, None).await?; let decoded_response: (u64, sp_bridge_eth_poa::H256) = Decode::decode(&mut &encoded_response.0[..])?; @@ -298,6 +299,11 @@ impl SubmitEthereumHeaders for SubstrateRpcClient { /// calls. #[async_trait] pub trait SubmitEthereumExchangeTransactionProof: SubstrateRpc { + /// Pre-verify Ethereum exchange transaction proof. + async fn verify_exchange_transaction_proof( + &self, + proof: bridge_node_runtime::exchange::EthereumTransactionInclusionProof, + ) -> Result; /// Submits Ethereum exchange transaction proof to Substrate runtime. async fn submit_exchange_transaction_proof( &self, @@ -308,6 +314,19 @@ pub trait SubmitEthereumExchangeTransactionProof: SubstrateRpc { #[async_trait] impl SubmitEthereumExchangeTransactionProof for SubstrateRpcClient { + async fn verify_exchange_transaction_proof( + &self, + proof: bridge_node_runtime::exchange::EthereumTransactionInclusionProof, + ) -> Result { + let call = EXCH_API_FILTER_TRANSACTION_PROOF.to_string(); + let data = Bytes(proof.encode()); + + let encoded_response = Substrate::state_call(&self.client, call, data, None).await?; + let is_allowed: bool = Decode::decode(&mut &encoded_response.0[..])?; + + Ok(is_allowed) + } + async fn submit_exchange_transaction_proof( &self, params: SubstrateSigningParams, diff --git a/bridges/relays/ethereum/src/sync_loop.rs b/bridges/relays/ethereum/src/sync_loop.rs index be64e9fa504d..b2c5d2a5ee39 100644 --- a/bridges/relays/ethereum/src/sync_loop.rs +++ b/bridges/relays/ethereum/src/sync_loop.rs @@ -20,6 +20,7 @@ use crate::sync_loop_metrics::SyncLoopMetrics; use crate::sync_types::{ HeaderIdOf, HeaderStatus, HeadersSyncPipeline, MaybeConnectionError, QueuedHeader, SubmittedHeaders, }; +use crate::utils::retry_backoff; use async_trait::async_trait; use backoff::{backoff::Backoff, ExponentialBackoff}; @@ -48,9 +49,6 @@ const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60); /// Delay after connection-related error happened before we'll try /// reconnection again. const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10); -/// Max delay after connection-unrelated error happened before we'll try the -/// same request again. -const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60); /// Source client trait. #[async_trait] @@ -569,15 +567,6 @@ fn interval(timeout: Duration) -> impl futures::Stream { }) } -/// Exponential backoff for connection-unrelated errors retries. -pub(crate) fn retry_backoff() -> ExponentialBackoff { - let mut backoff = ExponentialBackoff::default(); - // we do not want relayer to stop - backoff.max_elapsed_time = None; - backoff.max_interval = MAX_BACKOFF_INTERVAL; - backoff -} - /// Process result of the future from a client. /// /// Returns whether or not the client we're interacting with is online. In this context diff --git a/bridges/relays/ethereum/src/sync_loop_tests.rs b/bridges/relays/ethereum/src/sync_loop_tests.rs index b49d4ce5e530..7b65f3d54a8d 100644 --- a/bridges/relays/ethereum/src/sync_loop_tests.rs +++ b/bridges/relays/ethereum/src/sync_loop_tests.rs @@ -16,10 +16,11 @@ #![cfg(test)] -use crate::sync_loop::{process_future_result, retry_backoff, run, SourceClient, TargetClient}; +use crate::sync_loop::{process_future_result, run, SourceClient, TargetClient}; use crate::sync_types::{ HeaderId, HeadersSyncPipeline, MaybeConnectionError, QueuedHeader, SourceHeader, SubmittedHeaders, }; +use crate::utils::retry_backoff; use async_trait::async_trait; use backoff::backoff::Backoff; diff --git a/bridges/relays/ethereum/src/utils.rs b/bridges/relays/ethereum/src/utils.rs index 4f022c80477d..e769e1078277 100644 --- a/bridges/relays/ethereum/src/utils.rs +++ b/bridges/relays/ethereum/src/utils.rs @@ -14,6 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . +use backoff::ExponentialBackoff; +use std::time::Duration; + +/// Max delay after connection-unrelated error happened before we'll try the +/// same request again. +const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60); + /// Macro that returns (client, Err(error)) tuple from function if result is Err(error). #[macro_export] macro_rules! bail_on_error { @@ -35,3 +42,12 @@ macro_rules! bail_on_arg_error { } }; } + +/// Exponential backoff for connection-unrelated errors retries. +pub fn retry_backoff() -> ExponentialBackoff { + let mut backoff = ExponentialBackoff::default(); + // we do not want relayer to stop + backoff.max_elapsed_time = None; + backoff.max_interval = MAX_BACKOFF_INTERVAL; + backoff +}