diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index 5ad08ede06d..c329f8a292e 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -22,12 +22,13 @@ use color_eyre::eyre::Result; use zebra_chain::{ block::Block, - parameters::Network::{self, *}, + parameters::Network::*, serialization::ZcashSerialize, transaction::{self, Transaction}, }; use zebra_node_services::rpc_client::RpcRequestClient; use zebra_rpc::queue::CHANNEL_AND_QUEUE_CAPACITY; +use zebra_test::prelude::TestChild; use zebrad::components::mempool::downloads::MAX_INBOUND_CONCURRENCY; use crate::common::{ @@ -36,11 +37,14 @@ use crate::common::{ lightwalletd::{ can_spawn_lightwalletd_for_rpc, spawn_lightwalletd_for_rpc, sync::wait_for_zebrad_and_lightwalletd_sync, - wallet_grpc::{self, connect_to_lightwalletd, Empty, Exclude}, + wallet_grpc::{ + self, compact_tx_streamer_client::CompactTxStreamerClient, connect_to_lightwalletd, + Empty, Exclude, + }, }, regtest::MiningRpcMethods, sync::LARGE_CHECKPOINT_TIMEOUT, - test_type::TestType::{self, *}, + test_type::TestType::*, }; /// The maximum number of transactions we want to send in the test. @@ -178,150 +182,140 @@ pub async fn run() -> Result<()> { .into_inner(); for block in blocks { - // Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call: - // - // - // So we need to wait much longer than that here. - let sleep_until_lwd_last_mempool_refresh = - tokio::time::sleep(std::time::Duration::from_secs(4)); - - let transactions: Vec<_> = block - .transactions - .iter() - .filter(|tx| !tx.is_coinbase()) - .collect(); + zebrad = send_transactions_from_block(zebrad, &mut rpc_client, &zebrad_rpc_client, block) + .await?; + } - let transaction_hashes: Vec = - transactions.iter().map(|tx| tx.hash()).collect(); + Ok(()) +} - tracing::info!( - transaction_count = ?transactions.len(), - ?transaction_hashes, - "connected gRPC client to lightwalletd, sending transactions...", - ); +/// Sends non-coinbase transactions from a block to the mempool, verifies that the transactions +/// can be found in the mempool via lightwalletd, and commits the block to Zebra's chainstate. +/// +/// Returns the zebrad test child that's handling the RPC requests. +#[tracing::instrument] +async fn send_transactions_from_block( + mut zebrad: TestChild, + rpc_client: &mut CompactTxStreamerClient, + zebrad_rpc_client: &RpcRequestClient, + block: Block, +) -> Result> { + // Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call: + // + // + // So we need to wait much longer than that here. + let sleep_until_lwd_last_mempool_refresh = + tokio::time::sleep(std::time::Duration::from_secs(4)); + + let transactions: Vec<_> = block + .transactions + .iter() + .filter(|tx| !tx.is_coinbase()) + .collect(); - let mut has_tx_with_shielded_elements = false; - for transaction in transactions { - let transaction_hash = transaction.hash(); + let transaction_hashes: Vec = + transactions.iter().map(|tx| tx.hash()).collect(); - // See - has_tx_with_shielded_elements |= transaction.version() >= 4 - && (transaction.has_shielded_inputs() || transaction.has_shielded_outputs()); + tracing::info!( + transaction_count = ?transactions.len(), + ?transaction_hashes, + "connected gRPC client to lightwalletd, sending transactions...", + ); - let expected_response = wallet_grpc::SendResponse { - error_code: 0, - error_message: format!("\"{transaction_hash}\""), - }; + let mut has_tx_with_shielded_elements = false; + for transaction in transactions { + let transaction_hash = transaction.hash(); - tracing::info!(?transaction_hash, "sending transaction..."); + // See + has_tx_with_shielded_elements |= transaction.version() >= 4 + && (transaction.has_shielded_inputs() || transaction.has_shielded_outputs()); - let request = prepare_send_transaction_request(transaction.clone()); + let expected_response = wallet_grpc::SendResponse { + error_code: 0, + error_message: format!("\"{transaction_hash}\""), + }; - let response = rpc_client.send_transaction(request).await?.into_inner(); + tracing::info!(?transaction_hash, "sending transaction..."); - assert_eq!(response, expected_response); - } + let request = prepare_send_transaction_request(transaction.clone()); - // Check if some transaction is sent to mempool, - // Fails if there are only coinbase transactions in the first 50 future blocks - tracing::info!("waiting for mempool to verify some transactions..."); - zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; + let response = rpc_client.send_transaction(request).await?.into_inner(); - // Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements() - // - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - sleep_until_lwd_last_mempool_refresh.await; + assert_eq!(response, expected_response); + } - tracing::info!("calling GetMempoolTx gRPC to fetch transactions..."); - let mut transactions_stream = rpc_client - .get_mempool_tx(Exclude { txid: vec![] }) - .await? - .into_inner(); - - // Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead. - // If that happens, we skip the rest of the test. - tracing::info!("checking if lightwalletd has queried the mempool..."); - - // We need a short timeout here, because sometimes this message is not logged. - zebrad = zebrad.with_timeout(Duration::from_secs(60)); - let tx_log = - zebrad.expect_stdout_line_matches("answered mempool request .*req.*=.*TransactionIds"); - // Reset the failed timeout and give the rest of the test enough time to finish. - #[allow(unused_assignments)] - { - zebrad = zebrad.with_timeout(LARGE_CHECKPOINT_TIMEOUT); - } - - if tx_log.is_err() { - tracing::info!( - "lightwalletd didn't query the mempool, skipping mempool contents checks" - ); - return Ok(()); - } - - tracing::info!("checking the mempool contains some of the sent transactions..."); - let mut counter = 0; - while let Some(tx) = transactions_stream.message().await? { - let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length"); - let hash = transaction::Hash::from_bytes_in_display_order(&hash); - - assert!( - transaction_hashes.contains(&hash), - "unexpected transaction {hash:?}\n\ - in isolated mempool: {tx:?}", - ); + // Check if some transaction is sent to mempool, + // Fails if there are only coinbase transactions in the first 50 future blocks + tracing::info!("waiting for mempool to verify some transactions..."); + zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; - counter += 1; - } + // Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements() + // + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + sleep_until_lwd_last_mempool_refresh.await; - // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. - // - // TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and, - // only check if a transaction from the first block has shielded elements - assert!( - !has_tx_with_shielded_elements || counter >= 1, - "failed to read v4+ transactions with shielded elements from future blocks in mempool via lightwalletd" - ); + tracing::info!("calling GetMempoolTx gRPC to fetch transactions..."); + let mut transactions_stream = rpc_client + .get_mempool_tx(Exclude { txid: vec![] }) + .await? + .into_inner(); - // TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool. - tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); - let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner(); + // Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead. + // If that happens, we skip the rest of the test. + tracing::info!("checking if lightwalletd has queried the mempool..."); + + // We need a short timeout here, because sometimes this message is not logged. + zebrad = zebrad.with_timeout(Duration::from_secs(60)); + let tx_log = + zebrad.expect_stdout_line_matches("answered mempool request .*req.*=.*TransactionIds"); + // Reset the failed timeout and give the rest of the test enough time to finish. + #[allow(unused_assignments)] + { + zebrad = zebrad.with_timeout(LARGE_CHECKPOINT_TIMEOUT); + } - let mut _counter = 0; - while let Some(_tx) = transaction_stream.message().await? { - // TODO: check tx.data or tx.height here? - _counter += 1; - } + if tx_log.is_err() { + tracing::info!("lightwalletd didn't query the mempool, skipping mempool contents checks"); + return Ok(zebrad); + } + + tracing::info!("checking the mempool contains some of the sent transactions..."); + let mut counter = 0; + while let Some(tx) = transactions_stream.message().await? { + let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length"); + let hash = transaction::Hash::from_bytes_in_display_order(&hash); + + assert!( + transaction_hashes.contains(&hash), + "unexpected transaction {hash:?}\n\ + in isolated mempool: {tx:?}", + ); - zebrad_rpc_client.submit_block(block).await?; + counter += 1; } - Ok(()) -} + // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. + // + // TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and, + // only check if a transaction from the first block has shielded elements + assert!( + !has_tx_with_shielded_elements || counter >= 1, + "failed to read v4+ transactions with shielded elements from future blocks in mempool via lightwalletd" + ); -/// Loads transactions from a few block(s) after the chain tip of the cached state. -/// -/// Returns a list of non-coinbase transactions from blocks that have not been finalized to disk -/// in the `ZEBRA_CACHED_STATE_DIR`. -/// -/// ## Panics -/// -/// If the provided `test_type` doesn't need an rpc server and cached state -#[tracing::instrument] -async fn load_transactions_from_future_blocks( - network: Network, - test_type: TestType, - test_name: &str, -) -> Result>> { - let transactions = get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS) - .await? - .into_iter() - .flat_map(|block| block.transactions) - .filter(|transaction| !transaction.is_coinbase()) - .take(max_sent_transactions()) - .collect(); + // TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool. + tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); + let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner(); + + let mut _counter = 0; + while let Some(_tx) = transaction_stream.message().await? { + // TODO: check tx.data or tx.height here? + _counter += 1; + } + + zebrad_rpc_client.submit_block(block).await?; - Ok(transactions) + Ok(zebrad) } /// Prepare a request to send to lightwalletd that contains a transaction to be sent.