Skip to content

Commit

Permalink
Removes unused load_transactions_from_future_blocks and factors out…
Browse files Browse the repository at this point in the history
… code for sending transactions to its own fn
  • Loading branch information
arya2 committed Nov 22, 2024
1 parent 0ecf779 commit cecbb65
Showing 1 changed file with 120 additions and 126 deletions.
246 changes: 120 additions & 126 deletions zebrad/tests/common/lightwalletd/send_transaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ use color_eyre::eyre::Result;

use zebra_chain::{
block::Block,
parameters::Network::{self, *},
parameters::Network::*,
serialization::ZcashSerialize,
transaction::{self, Transaction},
};
use zebra_node_services::rpc_client::RpcRequestClient;
use zebra_rpc::queue::CHANNEL_AND_QUEUE_CAPACITY;
use zebra_test::prelude::TestChild;
use zebrad::components::mempool::downloads::MAX_INBOUND_CONCURRENCY;

use crate::common::{
Expand All @@ -36,11 +37,14 @@ use crate::common::{
lightwalletd::{
can_spawn_lightwalletd_for_rpc, spawn_lightwalletd_for_rpc,
sync::wait_for_zebrad_and_lightwalletd_sync,
wallet_grpc::{self, connect_to_lightwalletd, Empty, Exclude},
wallet_grpc::{
self, compact_tx_streamer_client::CompactTxStreamerClient, connect_to_lightwalletd,
Empty, Exclude,
},
},
regtest::MiningRpcMethods,
sync::LARGE_CHECKPOINT_TIMEOUT,
test_type::TestType::{self, *},
test_type::TestType::*,
};

/// The maximum number of transactions we want to send in the test.
Expand Down Expand Up @@ -178,150 +182,140 @@ pub async fn run() -> Result<()> {
.into_inner();

for block in blocks {
// Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call:
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L482>
//
// So we need to wait much longer than that here.
let sleep_until_lwd_last_mempool_refresh =
tokio::time::sleep(std::time::Duration::from_secs(4));

let transactions: Vec<_> = block
.transactions
.iter()
.filter(|tx| !tx.is_coinbase())
.collect();
zebrad = send_transactions_from_block(zebrad, &mut rpc_client, &zebrad_rpc_client, block)
.await?;
}

let transaction_hashes: Vec<transaction::Hash> =
transactions.iter().map(|tx| tx.hash()).collect();
Ok(())
}

tracing::info!(
transaction_count = ?transactions.len(),
?transaction_hashes,
"connected gRPC client to lightwalletd, sending transactions...",
);
/// Sends non-coinbase transactions from a block to the mempool, verifies that the transactions
/// can be found in the mempool via lightwalletd, and commits the block to Zebra's chainstate.
///
/// Returns the zebrad test child that's handling the RPC requests.
#[tracing::instrument]
async fn send_transactions_from_block(
mut zebrad: TestChild<tempfile::TempDir>,
rpc_client: &mut CompactTxStreamerClient<tonic::transport::Channel>,
zebrad_rpc_client: &RpcRequestClient,
block: Block,
) -> Result<TestChild<tempfile::TempDir>> {
// Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call:
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L482>
//
// So we need to wait much longer than that here.
let sleep_until_lwd_last_mempool_refresh =
tokio::time::sleep(std::time::Duration::from_secs(4));

let transactions: Vec<_> = block
.transactions
.iter()
.filter(|tx| !tx.is_coinbase())
.collect();

let mut has_tx_with_shielded_elements = false;
for transaction in transactions {
let transaction_hash = transaction.hash();
let transaction_hashes: Vec<transaction::Hash> =
transactions.iter().map(|tx| tx.hash()).collect();

// See <https://github.com/zcash/lightwalletd/blob/master/parser/transaction.go#L367>
has_tx_with_shielded_elements |= transaction.version() >= 4
&& (transaction.has_shielded_inputs() || transaction.has_shielded_outputs());
tracing::info!(
transaction_count = ?transactions.len(),
?transaction_hashes,
"connected gRPC client to lightwalletd, sending transactions...",
);

let expected_response = wallet_grpc::SendResponse {
error_code: 0,
error_message: format!("\"{transaction_hash}\""),
};
let mut has_tx_with_shielded_elements = false;
for transaction in transactions {
let transaction_hash = transaction.hash();

tracing::info!(?transaction_hash, "sending transaction...");
// See <https://github.com/zcash/lightwalletd/blob/master/parser/transaction.go#L367>
has_tx_with_shielded_elements |= transaction.version() >= 4
&& (transaction.has_shielded_inputs() || transaction.has_shielded_outputs());

let request = prepare_send_transaction_request(transaction.clone());
let expected_response = wallet_grpc::SendResponse {
error_code: 0,
error_message: format!("\"{transaction_hash}\""),
};

let response = rpc_client.send_transaction(request).await?.into_inner();
tracing::info!(?transaction_hash, "sending transaction...");

assert_eq!(response, expected_response);
}
let request = prepare_send_transaction_request(transaction.clone());

// Check if some transaction is sent to mempool,
// Fails if there are only coinbase transactions in the first 50 future blocks
tracing::info!("waiting for mempool to verify some transactions...");
zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?;
let response = rpc_client.send_transaction(request).await?.into_inner();

// Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements()
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L537>
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
sleep_until_lwd_last_mempool_refresh.await;
assert_eq!(response, expected_response);
}

tracing::info!("calling GetMempoolTx gRPC to fetch transactions...");
let mut transactions_stream = rpc_client
.get_mempool_tx(Exclude { txid: vec![] })
.await?
.into_inner();

// Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead.
// If that happens, we skip the rest of the test.
tracing::info!("checking if lightwalletd has queried the mempool...");

// We need a short timeout here, because sometimes this message is not logged.
zebrad = zebrad.with_timeout(Duration::from_secs(60));
let tx_log =
zebrad.expect_stdout_line_matches("answered mempool request .*req.*=.*TransactionIds");
// Reset the failed timeout and give the rest of the test enough time to finish.
#[allow(unused_assignments)]
{
zebrad = zebrad.with_timeout(LARGE_CHECKPOINT_TIMEOUT);
}

if tx_log.is_err() {
tracing::info!(
"lightwalletd didn't query the mempool, skipping mempool contents checks"
);
return Ok(());
}

tracing::info!("checking the mempool contains some of the sent transactions...");
let mut counter = 0;
while let Some(tx) = transactions_stream.message().await? {
let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length");
let hash = transaction::Hash::from_bytes_in_display_order(&hash);

assert!(
transaction_hashes.contains(&hash),
"unexpected transaction {hash:?}\n\
in isolated mempool: {tx:?}",
);
// Check if some transaction is sent to mempool,
// Fails if there are only coinbase transactions in the first 50 future blocks
tracing::info!("waiting for mempool to verify some transactions...");
zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?;

counter += 1;
}
// Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements()
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L537>
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
sleep_until_lwd_last_mempool_refresh.await;

// GetMempoolTx: make sure at least one of the transactions were inserted into the mempool.
//
// TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and,
// only check if a transaction from the first block has shielded elements
assert!(
!has_tx_with_shielded_elements || counter >= 1,
"failed to read v4+ transactions with shielded elements from future blocks in mempool via lightwalletd"
);
tracing::info!("calling GetMempoolTx gRPC to fetch transactions...");
let mut transactions_stream = rpc_client
.get_mempool_tx(Exclude { txid: vec![] })
.await?
.into_inner();

// TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool.
tracing::info!("calling GetMempoolStream gRPC to fetch transactions...");
let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner();
// Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead.
// If that happens, we skip the rest of the test.
tracing::info!("checking if lightwalletd has queried the mempool...");

// We need a short timeout here, because sometimes this message is not logged.
zebrad = zebrad.with_timeout(Duration::from_secs(60));
let tx_log =
zebrad.expect_stdout_line_matches("answered mempool request .*req.*=.*TransactionIds");
// Reset the failed timeout and give the rest of the test enough time to finish.
#[allow(unused_assignments)]
{
zebrad = zebrad.with_timeout(LARGE_CHECKPOINT_TIMEOUT);
}

let mut _counter = 0;
while let Some(_tx) = transaction_stream.message().await? {
// TODO: check tx.data or tx.height here?
_counter += 1;
}
if tx_log.is_err() {
tracing::info!("lightwalletd didn't query the mempool, skipping mempool contents checks");
return Ok(zebrad);
}

tracing::info!("checking the mempool contains some of the sent transactions...");
let mut counter = 0;
while let Some(tx) = transactions_stream.message().await? {
let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length");
let hash = transaction::Hash::from_bytes_in_display_order(&hash);

assert!(
transaction_hashes.contains(&hash),
"unexpected transaction {hash:?}\n\
in isolated mempool: {tx:?}",
);

zebrad_rpc_client.submit_block(block).await?;
counter += 1;
}

Ok(())
}
// GetMempoolTx: make sure at least one of the transactions were inserted into the mempool.
//
// TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and,
// only check if a transaction from the first block has shielded elements
assert!(
!has_tx_with_shielded_elements || counter >= 1,
"failed to read v4+ transactions with shielded elements from future blocks in mempool via lightwalletd"
);

/// Loads transactions from a few block(s) after the chain tip of the cached state.
///
/// Returns a list of non-coinbase transactions from blocks that have not been finalized to disk
/// in the `ZEBRA_CACHED_STATE_DIR`.
///
/// ## Panics
///
/// If the provided `test_type` doesn't need an rpc server and cached state
#[tracing::instrument]
async fn load_transactions_from_future_blocks(
network: Network,
test_type: TestType,
test_name: &str,
) -> Result<Vec<Arc<Transaction>>> {
let transactions = get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS)
.await?
.into_iter()
.flat_map(|block| block.transactions)
.filter(|transaction| !transaction.is_coinbase())
.take(max_sent_transactions())
.collect();
// TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool.
tracing::info!("calling GetMempoolStream gRPC to fetch transactions...");
let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner();

let mut _counter = 0;
while let Some(_tx) = transaction_stream.message().await? {
// TODO: check tx.data or tx.height here?
_counter += 1;
}

zebrad_rpc_client.submit_block(block).await?;

Ok(transactions)
Ok(zebrad)
}

/// Prepare a request to send to lightwalletd that contains a transaction to be sent.
Expand Down

0 comments on commit cecbb65

Please sign in to comment.