From 8a0e1955c309a69055a5c22c571daeacb35b9fbe Mon Sep 17 00:00:00 2001 From: Samuel Onoja Date: Fri, 29 Mar 2024 14:59:05 +0100 Subject: [PATCH 1/2] feat(zcoin): tx_history support for WASM target (#2077) `z_coin_tx_history` should now work in wasm --- mm2src/coins/my_tx_history_v2.rs | 1 - mm2src/coins/z_coin.rs | 178 +++----------- .../z_coin/storage/walletdb/wasm/storage.rs | 2 +- mm2src/coins/z_coin/z_coin_errors.rs | 35 +-- mm2src/coins/z_coin/z_tx_history.rs | 222 ++++++++++++++++++ .../mm2_db/src/indexed_db/indexed_cursor.rs | 8 + .../mm2_main/src/rpc/dispatcher/dispatcher.rs | 2 +- mm2src/mm2_main/src/wasm_tests.rs | 3 +- 8 files changed, 290 insertions(+), 161 deletions(-) create mode 100644 mm2src/coins/z_coin/z_tx_history.rs diff --git a/mm2src/coins/my_tx_history_v2.rs b/mm2src/coins/my_tx_history_v2.rs index 97c5a5ca8f..6158829c66 100644 --- a/mm2src/coins/my_tx_history_v2.rs +++ b/mm2src/coins/my_tx_history_v2.rs @@ -514,7 +514,6 @@ where }) } -#[cfg(not(target_arch = "wasm32"))] pub async fn z_coin_tx_history_rpc( ctx: MmArc, request: MyTxHistoryRequestV2, diff --git a/mm2src/coins/z_coin.rs b/mm2src/coins/z_coin.rs index 3ec97cd558..4e125b0f3a 100644 --- a/mm2src/coins/z_coin.rs +++ b/mm2src/coins/z_coin.rs @@ -1,5 +1,4 @@ use crate::coin_errors::{MyAddressError, ValidatePaymentResult}; -#[cfg(not(target_arch = "wasm32"))] use crate::my_tx_history_v2::{MyTxHistoryErrorV2, MyTxHistoryRequestV2, MyTxHistoryResponseV2}; use crate::rpc_command::init_withdraw::{InitWithdrawCoin, WithdrawInProgressStatus, WithdrawTaskHandleShared}; use crate::utxo::rpc_clients::{ElectrumRpcRequest, UnspentInfo, UtxoRpcClientEnum, UtxoRpcError, UtxoRpcFut, @@ -7,33 +6,35 @@ use crate::utxo::rpc_clients::{ElectrumRpcRequest, UnspentInfo, UtxoRpcClientEnu use crate::utxo::utxo_builder::UtxoCoinBuildError; use crate::utxo::utxo_builder::{UtxoCoinBuilder, UtxoCoinBuilderCommonOps, UtxoFieldsWithGlobalHDBuilder, UtxoFieldsWithHardwareWalletBuilder, UtxoFieldsWithIguanaSecretBuilder}; +use crate::utxo::utxo_common::{addresses_from_script, big_decimal_from_sat}; use crate::utxo::utxo_common::{big_decimal_from_sat_unsigned, payment_script}; use crate::utxo::{sat_from_big_decimal, utxo_common, ActualTxFee, AdditionalTxData, AddrFromStrError, Address, BroadcastTxErr, FeePolicy, GetUtxoListOps, HistoryUtxoTx, HistoryUtxoTxMap, MatureUnspentList, RecentlySpentOutPointsGuard, UtxoActivationParams, UtxoAddressFormat, UtxoArc, UtxoCoinFields, UtxoCommonOps, UtxoRpcMode, UtxoTxBroadcastOps, UtxoTxGenerationOps, VerboseTransactionFrom}; use crate::utxo::{UnsupportedAddr, UtxoFeeDetails}; -use crate::TxFeeDetails; +use crate::z_coin::storage::{BlockDbImpl, WalletDbShared}; +use crate::z_coin::z_tx_history::{fetch_tx_history_from_db, ZCoinTxHistoryItem}; use crate::{BalanceError, BalanceFut, CheckIfMyPaymentSentArgs, CoinBalance, CoinFutSpawner, ConfirmPaymentInput, DexFee, FeeApproxStage, FoundSwapTxSpend, HistorySyncState, MakerSwapTakerCoin, MarketCoinOps, MmCoin, - MmCoinEnum, NegotiateSwapContractAddrErr, PaymentInstructionArgs, PaymentInstructions, + MmCoinEnum, NegotiateSwapContractAddrErr, NumConversError, PaymentInstructionArgs, PaymentInstructions, PaymentInstructionsErr, PrivKeyActivationPolicy, PrivKeyBuildPolicy, PrivKeyPolicyNotAllowed, RawTransactionFut, RawTransactionRequest, RawTransactionResult, RefundError, RefundPaymentArgs, RefundResult, SearchForSwapTxSpendInput, SendMakerPaymentSpendPreimageInput, SendPaymentArgs, SignRawTransactionRequest, SignatureError, SignatureResult, SpendPaymentArgs, SwapOps, TakerSwapMakerCoin, - TradeFee, TradePreimageFut, TradePreimageResult, TradePreimageValue, TransactionEnum, TransactionFut, - TransactionResult, TxMarshalingErr, UnexpectedDerivationMethod, ValidateAddressResult, ValidateFeeArgs, - ValidateInstructionsErr, ValidateOtherPubKeyErr, ValidatePaymentError, ValidatePaymentFut, - ValidatePaymentInput, ValidateWatcherSpendInput, VerificationError, VerificationResult, - WaitForHTLCTxSpendArgs, WatcherOps, WatcherReward, WatcherRewardError, WatcherSearchForSwapTxSpendInput, - WatcherValidatePaymentInput, WatcherValidateTakerFeeInput, WithdrawFut, WithdrawRequest}; -use crate::{NumConversError, TransactionDetails}; -use crate::{Transaction, WithdrawError}; + TradeFee, TradePreimageFut, TradePreimageResult, TradePreimageValue, Transaction, TransactionDetails, + TransactionEnum, TransactionFut, TransactionResult, TxFeeDetails, TxMarshalingErr, + UnexpectedDerivationMethod, ValidateAddressResult, ValidateFeeArgs, ValidateInstructionsErr, + ValidateOtherPubKeyErr, ValidatePaymentError, ValidatePaymentFut, ValidatePaymentInput, + ValidateWatcherSpendInput, VerificationError, VerificationResult, WaitForHTLCTxSpendArgs, WatcherOps, + WatcherReward, WatcherRewardError, WatcherSearchForSwapTxSpendInput, WatcherValidatePaymentInput, + WatcherValidateTakerFeeInput, WithdrawError, WithdrawFut, WithdrawRequest}; use async_trait::async_trait; use bitcrypto::dhash256; use chain::constants::SEQUENCE_FINAL; use chain::{Transaction as UtxoTx, TransactionOutput}; +use common::calc_total_pages; use common::executor::{AbortableSystem, AbortedError}; use common::{log, one_thousand_u32}; use crypto::privkey::{key_pair_from_secret, secp_privkey_from_hash}; @@ -85,15 +86,11 @@ use z_rpc::init_light_client; pub use z_rpc::{FirstSyncBlock, SyncStatus}; cfg_native!( - use crate::utxo::utxo_common::{addresses_from_script, big_decimal_from_sat}; - use common::{async_blocking, sha256_digest, calc_total_pages, PagingOptionsEnum}; - use db_common::sqlite::offset_by_id; - use db_common::sqlite::rusqlite::{Error as SqlError, Row}; - use db_common::sqlite::sql_builder::{name, SqlBuilder, SqlName}; + use common::{async_blocking, sha256_digest}; use zcash_client_sqlite::error::SqliteClientError as ZcashClientError; use zcash_client_sqlite::wallet::get_balance; use zcash_proofs::default_params_folder; - use z_rpc::{init_native_client}; + use z_rpc::init_native_client; ); cfg_wasm32!( @@ -105,13 +102,13 @@ cfg_wasm32!( ); #[allow(unused)] mod z_coin_errors; -use crate::z_coin::storage::{BlockDbImpl, WalletDbShared}; pub use z_coin_errors::*; pub mod storage; #[cfg(all(test, feature = "zhtlc-native-tests"))] mod z_coin_native_tests; #[cfg(target_arch = "wasm32")] mod z_params; +mod z_tx_history; /// `ZP2SHSpendError` compatible `TransactionErr` handling macro. macro_rules! try_ztx_s { @@ -138,7 +135,6 @@ cfg_native!( const SAPLING_OUTPUT_NAME: &str = "sapling-output.params"; const SAPLING_SPEND_NAME: &str = "sapling-spend.params"; const BLOCKS_TABLE: &str = "blocks"; - const TRANSACTIONS_TABLE: &str = "transactions"; const SAPLING_SPEND_EXPECTED_HASH: &str = "8e48ffd23abb3a5fd9c5589204f32d9c31285a04b78096ba40a79b75677efc13"; const SAPLING_OUTPUT_EXPECTED_HASH: &str = "2f0ebbcbb9bb0bcffe95a397e7eba89c29eb4dde6191c339db88570e3f3fb0e4"; ); @@ -184,6 +180,8 @@ impl Parameters for ZcoinConsensusParams { NetworkUpgrade::Blossom => self.blossom_activation_height.map(BlockHeight::from), NetworkUpgrade::Heartwood => self.heartwood_activation_height.map(BlockHeight::from), NetworkUpgrade::Canopy => self.canopy_activation_height.map(BlockHeight::from), + #[cfg(feature = "zfuture")] + NetworkUpgrade::ZFuture => unimplemented!(), } } @@ -240,39 +238,6 @@ pub struct ZOutput { pub memo: Option, } -#[cfg(not(target_arch = "wasm32"))] -struct ZCoinSqlTxHistoryItem { - tx_hash: Vec, - internal_id: i64, - height: i64, - timestamp: i64, - received_amount: i64, - spent_amount: i64, -} - -#[cfg(not(target_arch = "wasm32"))] -impl ZCoinSqlTxHistoryItem { - fn try_from_sql_row(row: &Row<'_>) -> Result { - let mut tx_hash: Vec = row.get(0)?; - tx_hash.reverse(); - Ok(ZCoinSqlTxHistoryItem { - tx_hash, - internal_id: row.get(1)?, - height: row.get(2)?, - timestamp: row.get(3)?, - received_amount: row.get(4)?, - spent_amount: row.get(5)?, - }) - } -} - -#[cfg(not(target_arch = "wasm32"))] -struct SqlTxHistoryRes { - transactions: Vec, - total_tx_count: u32, - skipped: usize, -} - #[derive(Serialize)] pub struct ZcoinTxDetails { /// Transaction hash in hexadecimal format @@ -531,73 +496,6 @@ impl ZCoin { Ok(tx) } - #[cfg(not(target_arch = "wasm32"))] - async fn tx_history_from_sql( - &self, - limit: usize, - paging_options: PagingOptionsEnum, - ) -> Result> { - let wallet_db = self.z_fields.light_wallet_db.clone(); - async_blocking(move || { - let db_guard = wallet_db.db.inner(); - let db_guard = db_guard.lock().unwrap(); - let conn = db_guard.sql_conn(); - - let total_sql = SqlBuilder::select_from(TRANSACTIONS_TABLE) - .field("COUNT(id_tx)") - .sql() - .expect("valid SQL"); - let total_tx_count = conn.query_row(&total_sql, [], |row| row.get(0))?; - - let mut sql_builder = SqlBuilder::select_from(name!(TRANSACTIONS_TABLE; "txes")); - sql_builder - .field("txes.txid") - .field("txes.id_tx as internal_id") - .field("txes.block as block"); - - let offset = match paging_options { - PagingOptionsEnum::PageNumber(page) => (page.get() - 1) * limit, - PagingOptionsEnum::FromId(id) => { - offset_by_id(conn, &sql_builder, [id], "id_tx", "block DESC, id_tx ASC", "id_tx = ?1")? - .ok_or(SqlTxHistoryError::FromIdDoesNotExist(id))? - }, - }; - - let sql = sql_builder - .field("blocks.time") - .field("COALESCE(rn.received_amount, 0)") - .field("COALESCE(sn.sent_amount, 0)") - .left() - .join("(SELECT tx, SUM(value) as received_amount FROM received_notes GROUP BY tx) as rn") - .on("txes.id_tx = rn.tx") - // detecting spent amount by "spent" field in received_notes table - .join("(SELECT spent, SUM(value) as sent_amount FROM received_notes GROUP BY spent) as sn") - .on("txes.id_tx = sn.spent") - .join(BLOCKS_TABLE) - .on("txes.block = blocks.height") - .group_by("internal_id") - .order_by("block", true) - .order_by("internal_id", false) - .offset(offset) - .limit(limit) - .sql() - .expect("valid query"); - - let sql_items = conn - .prepare(&sql)? - .query_map([], ZCoinSqlTxHistoryItem::try_from_sql_row)? - .collect::, _>>()?; - - Ok(SqlTxHistoryRes { - transactions: sql_items, - total_tx_count, - skipped: offset, - }) - }) - .await - } - - #[cfg(not(target_arch = "wasm32"))] async fn z_transactions_from_cache_or_rpc( &self, hashes: HashSet, @@ -613,23 +511,22 @@ impl ZCoin { .map_to_mm(|e| UtxoRpcError::InvalidResponse(e.to_string())) } - #[cfg(not(target_arch = "wasm32"))] - fn tx_details_from_sql_item( + fn tx_details_from_db_item( &self, - sql_item: ZCoinSqlTxHistoryItem, + tx_item: ZCoinTxHistoryItem, transactions: &mut HashMap, prev_transactions: &HashMap, current_block: u64, ) -> Result> { let mut from = HashSet::new(); - let mut confirmations = current_block as i64 - sql_item.height + 1; + let mut confirmations = current_block as i64 - tx_item.height + 1; if confirmations < 0 { confirmations = 0; } let mut transparent_input_amount = Amount::zero(); - let hash = H256Json::from(sql_item.tx_hash.as_slice()); + let hash = H256Json::from(tx_item.tx_hash.as_slice()); let z_tx = transactions.remove(&hash).or_mm_err(|| NoInfoAboutTx(hash))?; for input in z_tx.vin.iter() { let mut hash = H256Json::from(*input.prevout.hash()); @@ -657,11 +554,11 @@ impl ZCoin { } let fee_amount = z_tx.value_balance + transparent_input_amount - transparent_output_amount; - if sql_item.spent_amount > 0 { + if tx_item.spent_amount > 0 { from.insert(self.my_z_address_encoded()); } - if sql_item.received_amount > 0 { + if tx_item.received_amount > 0 { to.insert(self.my_z_address_encoded()); } @@ -691,35 +588,32 @@ impl ZCoin { } } - let spent_by_me = big_decimal_from_sat(sql_item.spent_amount, self.decimals()); - let received_by_me = big_decimal_from_sat(sql_item.received_amount, self.decimals()); + let spent_by_me = big_decimal_from_sat(tx_item.spent_amount, self.decimals()); + let received_by_me = big_decimal_from_sat(tx_item.received_amount, self.decimals()); Ok(ZcoinTxDetails { - tx_hash: hex::encode(sql_item.tx_hash), + tx_hash: hex::encode(tx_item.tx_hash), from, to, my_balance_change: &received_by_me - &spent_by_me, spent_by_me, received_by_me, - block_height: sql_item.height, + block_height: tx_item.height, confirmations, - timestamp: sql_item.timestamp, + timestamp: tx_item.timestamp, transaction_fee: big_decimal_from_sat(fee_amount.into(), self.decimals()), coin: self.ticker().into(), - internal_id: sql_item.internal_id, + internal_id: tx_item.internal_id, }) } - #[cfg(not(target_arch = "wasm32"))] pub async fn tx_history( &self, request: MyTxHistoryRequestV2, ) -> Result, MmError> { let current_block = self.utxo_rpc_client().get_block_count().compat().await?; - let sql_result = self - .tx_history_from_sql(request.limit, request.paging_options.clone()) - .await?; + let req_result = fetch_tx_history_from_db(self, request.limit, request.paging_options.clone()).await?; - let hashes_for_verbose = sql_result + let hashes_for_verbose = req_result .transactions .iter() .map(|item| H256Json::from(item.tx_hash.as_slice())) @@ -738,11 +632,11 @@ impl ZCoin { .collect(); let prev_transactions = self.z_transactions_from_cache_or_rpc(prev_tx_hashes).await?; - let transactions = sql_result + let transactions = req_result .transactions .into_iter() .map(|sql_item| { - self.tx_details_from_sql_item(sql_item, &mut transactions, &prev_transactions, current_block) + self.tx_details_from_db_item(sql_item, &mut transactions, &prev_transactions, current_block) }) .collect::>()?; @@ -754,9 +648,9 @@ impl ZCoin { // Zcoin is activated only after the state is synced sync_status: HistorySyncState::Finished, limit: request.limit, - skipped: sql_result.skipped, - total: sql_result.total_tx_count as usize, - total_pages: calc_total_pages(sql_result.total_tx_count as usize, request.limit), + skipped: req_result.skipped, + total: req_result.total_tx_count as usize, + total_pages: calc_total_pages(req_result.total_tx_count as usize, request.limit), paging_options: request.paging_options, }) } diff --git a/mm2src/coins/z_coin/storage/walletdb/wasm/storage.rs b/mm2src/coins/z_coin/storage/walletdb/wasm/storage.rs index e55b9e64d0..bf99dec6ca 100644 --- a/mm2src/coins/z_coin/storage/walletdb/wasm/storage.rs +++ b/mm2src/coins/z_coin/storage/walletdb/wasm/storage.rs @@ -146,7 +146,7 @@ impl<'a> WalletIndexedDb { Ok(db) } - async fn lock_db(&self) -> ZcoinStorageRes> { + pub(crate) async fn lock_db(&self) -> ZcoinStorageRes> { self.db .get_or_initialize() .await diff --git a/mm2src/coins/z_coin/z_coin_errors.rs b/mm2src/coins/z_coin/z_coin_errors.rs index f556435f57..5b5992baae 100644 --- a/mm2src/coins/z_coin/z_coin_errors.rs +++ b/mm2src/coins/z_coin/z_coin_errors.rs @@ -272,27 +272,32 @@ impl From for ZCoinBuildError { fn from(err: ZcoinClientInitError) -> Self { ZCoinBuildError::RpcClientInitErr(err) } } -#[cfg(not(target_arch = "wasm32"))] -pub(super) enum SqlTxHistoryError { +#[derive(Debug, Display)] +pub(crate) enum ZTxHistoryError { + #[cfg(not(target_arch = "wasm32"))] Sql(SqliteError), + #[cfg(target_arch = "wasm32")] + IndexedDbError(String), FromIdDoesNotExist(i64), } -#[cfg(not(target_arch = "wasm32"))] -impl From for SqlTxHistoryError { - fn from(err: SqliteError) -> Self { SqlTxHistoryError::Sql(err) } +impl From for MyTxHistoryErrorV2 { + fn from(err: ZTxHistoryError) -> Self { MyTxHistoryErrorV2::StorageError(err.to_string()) } } #[cfg(not(target_arch = "wasm32"))] -impl From for MyTxHistoryErrorV2 { - fn from(err: SqlTxHistoryError) -> Self { - match err { - SqlTxHistoryError::Sql(sql) => MyTxHistoryErrorV2::StorageError(sql.to_string()), - SqlTxHistoryError::FromIdDoesNotExist(id) => { - MyTxHistoryErrorV2::StorageError(format!("from_id {} does not exist", id)) - }, - } - } +impl From for ZTxHistoryError { + fn from(err: SqliteError) -> Self { ZTxHistoryError::Sql(err) } +} + +#[cfg(target_arch = "wasm32")] +impl From for ZTxHistoryError { + fn from(err: DbTransactionError) -> Self { ZTxHistoryError::IndexedDbError(err.to_string()) } +} + +#[cfg(target_arch = "wasm32")] +impl From for ZTxHistoryError { + fn from(err: CursorError) -> Self { ZTxHistoryError::IndexedDbError(err.to_string()) } } pub(super) struct NoInfoAboutTx(pub(super) H256Json); @@ -316,6 +321,7 @@ pub enum ZCoinBalanceError { impl From for ZCoinBalanceError { fn from(value: ZcoinStorageError) -> Self { ZCoinBalanceError::BalanceError(value.to_string()) } } + /// The `ValidateBlocksError` enum encapsulates different types of errors that may occur /// during the validation and scanning process of zcoin blocks. #[derive(Debug, Display)] @@ -342,6 +348,7 @@ pub enum ValidateBlocksError { impl From for ZcoinStorageError { fn from(value: ValidateBlocksError) -> Self { Self::ValidateBlocksError(value) } } + impl From> for ValidateBlocksError { fn from(value: MmError) -> Self { Self::ZcoinStorageError(value.to_string()) } } diff --git a/mm2src/coins/z_coin/z_tx_history.rs b/mm2src/coins/z_coin/z_tx_history.rs new file mode 100644 index 0000000000..26b7f9c8ce --- /dev/null +++ b/mm2src/coins/z_coin/z_tx_history.rs @@ -0,0 +1,222 @@ +use crate::z_coin::{ZCoin, ZTxHistoryError}; +use common::PagingOptionsEnum; +use mm2_err_handle::prelude::MmError; + +cfg_wasm32!( + use crate::z_coin::storage::wasm::tables::{WalletDbBlocksTable, WalletDbReceivedNotesTable, WalletDbTransactionsTable}; + use crate::MarketCoinOps; + use mm2_number::BigInt; + use num_traits::ToPrimitive; +); + +cfg_native!( + use crate::z_coin::BLOCKS_TABLE; + use db_common::sqlite::sql_builder::{name, SqlBuilder, SqlName}; + use db_common::sqlite::rusqlite::Error as SqliteError; + use db_common::sqlite::rusqlite::Row; + use db_common::sqlite::offset_by_id; + use common::async_blocking; +); + +#[cfg(not(target_arch = "wasm32"))] +const TRANSACTIONS_TABLE: &str = "transactions"; + +pub(crate) struct ZCoinTxHistoryItem { + pub(crate) tx_hash: Vec, + pub(crate) internal_id: i64, + pub(crate) height: i64, + pub(crate) timestamp: i64, + pub(crate) received_amount: i64, + pub(crate) spent_amount: i64, +} + +pub(crate) struct ZTxHistoryRes { + pub(crate) total_tx_count: u32, + pub(crate) transactions: Vec, + pub(crate) skipped: usize, +} + +/// Fetch transaction history from the database. +#[cfg(target_arch = "wasm32")] +pub(crate) async fn fetch_tx_history_from_db( + z: &ZCoin, + limit: usize, + paging_options: PagingOptionsEnum, +) -> Result> { + let wallet_db = z.z_fields.light_wallet_db.clone(); + let wallet_db = wallet_db.db.lock_db().await.unwrap(); + let db_transaction = wallet_db.get_inner().transaction().await?; + let tx_table = db_transaction.table::().await?; + let total_tx_count = tx_table.count_all().await? as u32; + let offset = match paging_options { + PagingOptionsEnum::PageNumber(page_number) => ((page_number.get() - 1) * limit) as i64, + PagingOptionsEnum::FromId(tx_id) => { + if tx_id > total_tx_count as i64 { + return MmError::err(ZTxHistoryError::FromIdDoesNotExist(tx_id)); + } + (total_tx_count as i64 - tx_id) + 1 + }, + }; + + // Fetch transactions + let txs = tx_table + .cursor_builder() + .only("ticker", z.ticker())? + .offset(offset as u32) + .limit(limit) + .reverse() + .open_cursor("ticker") + .await? + .collect() + .await?; + + // Fetch received notes + let rn_table = db_transaction.table::().await?; + let received_notes = rn_table + .cursor_builder() + .only("ticker", z.ticker())? + .open_cursor("ticker") + .await? + .collect() + .await?; + + // Fetch blocks + let blocks_table = db_transaction.table::().await?; + let blocks = blocks_table + .cursor_builder() + .only("ticker", z.ticker())? + .open_cursor("ticker") + .await? + .collect() + .await?; + + // Process transactions and construct tx_details + let mut tx_details = vec![]; + for (tx_id, tx) in txs { + if let Some((_, WalletDbBlocksTable { height, time, .. })) = blocks + .iter() + .find(|(_, block)| tx.block.map(|b| b == block.height).unwrap_or_default()) + { + let internal_id = tx_id; + let mut received_amount = 0; + let mut spent_amount = 0; + + for (_, note) in &received_notes { + if internal_id == note.tx { + received_amount += note.value.to_u64().ok_or_else(|| { + ZTxHistoryError::IndexedDbError("Number is too large to fit in a u64".to_string()) + })? as i64; + } + + // detecting spent amount by "spent" field in received_notes table + if let Some(spent) = ¬e.spent { + if &BigInt::from(internal_id) == spent { + spent_amount += note.value.to_u64().ok_or_else(|| { + ZTxHistoryError::IndexedDbError("Number is too large to fit in a u64".to_string()) + })? as i64; + } + } + } + + let mut tx_hash = tx.txid; + tx_hash.reverse(); + + tx_details.push(ZCoinTxHistoryItem { + tx_hash, + internal_id: internal_id as i64, + height: *height as i64, + timestamp: *time as i64, + received_amount, + spent_amount, + }); + } + } + + Ok(ZTxHistoryRes { + transactions: tx_details, + total_tx_count, + skipped: offset as usize, + }) +} + +#[cfg(not(target_arch = "wasm32"))] +impl ZCoinTxHistoryItem { + fn try_from_sql_row(row: &Row<'_>) -> Result { + let mut tx_hash: Vec = row.get(0)?; + tx_hash.reverse(); + Ok(ZCoinTxHistoryItem { + tx_hash, + internal_id: row.get(1)?, + height: row.get(2)?, + timestamp: row.get(3)?, + received_amount: row.get(4)?, + spent_amount: row.get(5)?, + }) + } +} + +#[cfg(not(target_arch = "wasm32"))] +pub(crate) async fn fetch_tx_history_from_db( + z: &ZCoin, + limit: usize, + paging_options: PagingOptionsEnum, +) -> Result> { + let wallet_db = z.z_fields.light_wallet_db.clone(); + async_blocking(move || { + let db_guard = wallet_db.db.inner(); + let db_guard = db_guard.lock().unwrap(); + let conn = db_guard.sql_conn(); + + let total_sql = SqlBuilder::select_from(TRANSACTIONS_TABLE) + .field("COUNT(id_tx)") + .sql() + .expect("valid SQL"); + let total_tx_count = conn.query_row(&total_sql, [], |row| row.get(0))?; + + let mut sql_builder = SqlBuilder::select_from(name!(TRANSACTIONS_TABLE; "txes")); + sql_builder + .field("txes.txid") + .field("txes.id_tx as internal_id") + .field("txes.block as block"); + + let offset = match paging_options { + PagingOptionsEnum::PageNumber(page) => (page.get() - 1) * limit, + PagingOptionsEnum::FromId(id) => { + offset_by_id(conn, &sql_builder, [id], "id_tx", "block DESC, id_tx ASC", "id_tx = ?1")? + .ok_or(ZTxHistoryError::FromIdDoesNotExist(id))? + }, + }; + + let sql = sql_builder + .field("blocks.time") + .field("COALESCE(rn.received_amount, 0)") + .field("COALESCE(sn.sent_amount, 0)") + .left() + .join("(SELECT tx, SUM(value) as received_amount FROM received_notes GROUP BY tx) as rn") + .on("txes.id_tx = rn.tx") + // detecting spent amount by "spent" field in received_notes table + .join("(SELECT spent, SUM(value) as sent_amount FROM received_notes GROUP BY spent) as sn") + .on("txes.id_tx = sn.spent") + .join(BLOCKS_TABLE) + .on("txes.block = blocks.height") + .group_by("internal_id") + .order_by("block", true) + .order_by("internal_id", false) + .offset(offset) + .limit(limit) + .sql() + .expect("valid query"); + + let sql_items = conn + .prepare(&sql)? + .query_map([], ZCoinTxHistoryItem::try_from_sql_row)? + .collect::, _>>()?; + + Ok(ZTxHistoryRes { + transactions: sql_items, + total_tx_count, + skipped: offset, + }) + }) + .await +} diff --git a/mm2src/mm2_db/src/indexed_db/indexed_cursor.rs b/mm2src/mm2_db/src/indexed_db/indexed_cursor.rs index 0f1ad8f50e..74b7c3e89b 100644 --- a/mm2src/mm2_db/src/indexed_db/indexed_cursor.rs +++ b/mm2src/mm2_db/src/indexed_db/indexed_cursor.rs @@ -145,11 +145,19 @@ impl<'transaction, 'reference, Table: TableSignature> CursorBuilder<'transaction pub fn where_first(self) -> CursorBuilder<'transaction, 'reference, Table> { self.where_(|_| Ok(true)) } pub fn limit(mut self, limit: usize) -> CursorBuilder<'transaction, 'reference, Table> { + if limit < 1 { + return self; + }; + self.filters_ext.limit = Some(limit); self } pub fn offset(mut self, offset: u32) -> CursorBuilder<'transaction, 'reference, Table> { + if offset < 1 { + return self; + }; + self.filters_ext.offset = Some(offset); self } diff --git a/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs b/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs index 1307807c13..9af6dc7b4b 100644 --- a/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs +++ b/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs @@ -206,6 +206,7 @@ async fn dispatcher_v2(request: MmRpcRequest, ctx: MmArc) -> DispatcherResult handle_mmrpc(ctx, request, ibc_chains).await, "ibc_transfer_channels" => handle_mmrpc(ctx, request, ibc_transfer_channels).await, "withdraw_nft" => handle_mmrpc(ctx, request, withdraw_nft).await, + "z_coin_tx_history" => handle_mmrpc(ctx, request, coins::my_tx_history_v2::z_coin_tx_history_rpc).await, #[cfg(not(target_arch = "wasm32"))] native_only_methods => match native_only_methods { #[cfg(all(feature = "enable-solana", not(target_os = "ios"), not(target_os = "android")))] @@ -214,7 +215,6 @@ async fn dispatcher_v2(request: MmRpcRequest, ctx: MmArc) -> DispatcherResult handle_mmrpc(ctx, request, enable_token::).await, - "z_coin_tx_history" => handle_mmrpc(ctx, request, coins::my_tx_history_v2::z_coin_tx_history_rpc).await, _ => MmError::err(DispatcherError::NoSuchMethod), }, #[cfg(target_arch = "wasm32")] diff --git a/mm2src/mm2_main/src/wasm_tests.rs b/mm2src/mm2_main/src/wasm_tests.rs index 5899f98b42..f565757b0c 100644 --- a/mm2src/mm2_main/src/wasm_tests.rs +++ b/mm2src/mm2_main/src/wasm_tests.rs @@ -125,7 +125,7 @@ async fn trade_base_rel_electrum( for (base, rel) in pairs.iter() { log!("Get {}/{} orderbook", base, rel); let rc = mm_bob - .rpc(&json! ({ + .rpc(&json!({ "userpass": mm_bob.userpass, "method": "orderbook", "base": base, @@ -249,7 +249,6 @@ async fn trade_v2_test_rick_and_morty() { #[wasm_bindgen_test] async fn activate_z_coin_light() { - register_wasm_log(); let coins = json!([pirate_conf()]); let conf = Mm2TestConf::seednode(PIRATE_TEST_BALANCE_SEED, &coins); From a81a67f8f4cb5ccf2f1bd9a87a9067fd3c242bc1 Mon Sep 17 00:00:00 2001 From: Samuel Onoja Date: Fri, 29 Mar 2024 21:05:12 +0100 Subject: [PATCH 2/2] feat(zcoin): balance event streaming (#2076) This commit implements balance event streaming for zcoin for Native and WASM targets. After each update to the wallet database with a new block, a check for transactions within the block is done. If transactions are detected, the latest balance is sent through the streaming channel. --- mm2src/coins/z_coin.rs | 82 ++++++++----- mm2src/coins/z_coin/storage.rs | 12 +- .../storage/blockdb/blockdb_idb_storage.rs | 16 ++- .../storage/blockdb/blockdb_sql_storage.rs | 14 ++- .../{ => storage}/z_params/indexeddb.rs | 0 .../z_coin/{ => storage}/z_params/mod.rs | 0 mm2src/coins/z_coin/z_balance_streaming.rs | 110 ++++++++++++++++++ mm2src/coins/z_coin/z_coin_errors.rs | 1 + mm2src/coins/z_coin/z_rpc.rs | 13 ++- 9 files changed, 208 insertions(+), 40 deletions(-) rename mm2src/coins/z_coin/{ => storage}/z_params/indexeddb.rs (100%) rename mm2src/coins/z_coin/{ => storage}/z_params/mod.rs (100%) create mode 100644 mm2src/coins/z_coin/z_balance_streaming.rs diff --git a/mm2src/coins/z_coin.rs b/mm2src/coins/z_coin.rs index 4e125b0f3a..0b55cbf41d 100644 --- a/mm2src/coins/z_coin.rs +++ b/mm2src/coins/z_coin.rs @@ -1,3 +1,12 @@ +pub mod storage; +mod z_balance_streaming; +mod z_coin_errors; +#[cfg(all(test, feature = "zhtlc-native-tests"))] +mod z_coin_native_tests; +mod z_htlc; +mod z_rpc; +mod z_tx_history; + use crate::coin_errors::{MyAddressError, ValidatePaymentResult}; use crate::my_tx_history_v2::{MyTxHistoryErrorV2, MyTxHistoryRequestV2, MyTxHistoryResponseV2}; use crate::rpc_command::init_withdraw::{InitWithdrawCoin, WithdrawInProgressStatus, WithdrawTaskHandleShared}; @@ -14,6 +23,7 @@ use crate::utxo::{sat_from_big_decimal, utxo_common, ActualTxFee, AdditionalTxDa UtxoCommonOps, UtxoRpcMode, UtxoTxBroadcastOps, UtxoTxGenerationOps, VerboseTransactionFrom}; use crate::utxo::{UnsupportedAddr, UtxoFeeDetails}; use crate::z_coin::storage::{BlockDbImpl, WalletDbShared}; +use crate::z_coin::z_balance_streaming::ZBalanceEventHandler; use crate::z_coin::z_tx_history::{fetch_tx_history_from_db, ZCoinTxHistoryItem}; use crate::{BalanceError, BalanceFut, CheckIfMyPaymentSentArgs, CoinBalance, CoinFutSpawner, ConfirmPaymentInput, DexFee, FeeApproxStage, FoundSwapTxSpend, HistorySyncState, MakerSwapTakerCoin, MarketCoinOps, MmCoin, @@ -48,6 +58,7 @@ use keys::hash::H256; use keys::{KeyPair, Message, Public}; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; +use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus}; use mm2_number::{BigDecimal, MmNumber}; #[cfg(test)] use mocktopus::macros::*; use primitives::bytes::Bytes; @@ -60,8 +71,10 @@ use std::convert::TryInto; use std::iter; use std::path::PathBuf; use std::sync::Arc; -#[cfg(target_arch = "wasm32")] -use z_coin_errors::ZCoinBalanceError; +pub use z_coin_errors::*; +use z_htlc::{z_p2sh_spend, z_send_dex_fee, z_send_htlc}; +use z_rpc::init_light_client; +pub use z_rpc::{FirstSyncBlock, SyncStatus}; use z_rpc::{SaplingSyncConnector, SaplingSyncGuard}; use zcash_client_backend::encoding::{decode_payment_address, encode_extended_spending_key, encode_payment_address}; use zcash_client_backend::wallet::{AccountId, SpendableNote}; @@ -78,13 +91,6 @@ use zcash_primitives::{constants::mainnet as z_mainnet_constants, sapling::Payme zip32::ExtendedFullViewingKey, zip32::ExtendedSpendingKey}; use zcash_proofs::prover::LocalTxProver; -mod z_htlc; -use z_htlc::{z_p2sh_spend, z_send_dex_fee, z_send_htlc}; - -mod z_rpc; -use z_rpc::init_light_client; -pub use z_rpc::{FirstSyncBlock, SyncStatus}; - cfg_native!( use common::{async_blocking, sha256_digest}; use zcash_client_sqlite::error::SqliteClientError as ZcashClientError; @@ -94,22 +100,14 @@ cfg_native!( ); cfg_wasm32!( - use crate::z_coin::z_params::ZcashParamsWasmImpl; + use crate::z_coin::storage::ZcashParamsWasmImpl; use common::executor::AbortOnDropHandle; use futures::channel::oneshot; use rand::rngs::OsRng; use zcash_primitives::transaction::builder::TransactionMetadata; + use z_coin_errors::ZCoinBalanceError; ); -#[allow(unused)] mod z_coin_errors; -pub use z_coin_errors::*; - -pub mod storage; -#[cfg(all(test, feature = "zhtlc-native-tests"))] -mod z_coin_native_tests; -#[cfg(target_arch = "wasm32")] mod z_params; -mod z_tx_history; - /// `ZP2SHSpendError` compatible `TransactionErr` handling macro. macro_rules! try_ztx_s { ($e: expr) => { @@ -209,6 +207,7 @@ pub struct ZCoinFields { light_wallet_db: WalletDbShared, consensus_params: ZcoinConsensusParams, sync_state_connector: AsyncMutex, + z_balance_event_handler: Option, } impl Transaction for ZTransaction { @@ -654,6 +653,17 @@ impl ZCoin { paging_options: request.paging_options, }) } + + async fn spawn_balance_stream_if_enabled(&self, ctx: &MmArc) -> Result<(), String> { + let coin = self.clone(); + if let Some(stream_config) = &ctx.event_stream_configuration { + if let EventInitStatus::Failed(err) = EventBehaviour::spawn_if_active(coin, stream_config).await { + return ERR!("Failed spawning zcoin balance event with error: {}", err); + } + } + + Ok(()) + } } impl AsRef for ZCoin { @@ -875,10 +885,24 @@ impl<'a> UtxoCoinBuilder for ZCoinBuilder<'a> { ); let blocks_db = self.init_blocks_db().await?; + let (z_balance_event_sender, z_balance_event_handler) = if self.ctx.event_stream_configuration.is_some() { + let (sender, receiver) = futures::channel::mpsc::unbounded(); + (Some(sender), Some(Arc::new(AsyncMutex::new(receiver)))) + } else { + (None, None) + }; + let (sync_state_connector, light_wallet_db) = match &self.z_coin_params.mode { #[cfg(not(target_arch = "wasm32"))] ZcoinRpcMode::Native => { - init_native_client(&self, self.native_client()?, blocks_db, &z_spending_key).await? + init_native_client( + &self, + self.native_client()?, + blocks_db, + &z_spending_key, + z_balance_event_sender, + ) + .await? }, ZcoinRpcMode::Light { light_wallet_d_servers, @@ -893,11 +917,13 @@ impl<'a> UtxoCoinBuilder for ZCoinBuilder<'a> { sync_params, skip_sync_params.unwrap_or_default(), &z_spending_key, + z_balance_event_sender, ) .await? }, }; - let z_fields = ZCoinFields { + + let z_fields = Arc::new(ZCoinFields { dex_fee_addr, my_z_addr, my_z_addr_encoded, @@ -907,12 +933,16 @@ impl<'a> UtxoCoinBuilder for ZCoinBuilder<'a> { light_wallet_db, consensus_params: self.protocol_info.consensus_params, sync_state_connector, - }; + z_balance_event_handler, + }); - Ok(ZCoin { - utxo_arc, - z_fields: Arc::new(z_fields), - }) + let zcoin = ZCoin { utxo_arc, z_fields }; + zcoin + .spawn_balance_stream_if_enabled(self.ctx) + .await + .map_to_mm(ZCoinBuildError::FailedSpawningBalanceEvents)?; + + Ok(zcoin) } } diff --git a/mm2src/coins/z_coin/storage.rs b/mm2src/coins/z_coin/storage.rs index 5b1f3f1f00..08e478f27a 100644 --- a/mm2src/coins/z_coin/storage.rs +++ b/mm2src/coins/z_coin/storage.rs @@ -4,8 +4,13 @@ pub mod blockdb; pub use blockdb::*; pub mod walletdb; +#[cfg(target_arch = "wasm32")] mod z_params; +#[cfg(target_arch = "wasm32")] +pub(crate) use z_params::ZcashParamsWasmImpl; + pub use walletdb::*; +use crate::z_coin::z_balance_streaming::ZBalanceEventSender; use mm2_err_handle::mm_error::MmResult; #[cfg(target_arch = "wasm32")] use walletdb::wasm::storage::DataConnStmtCacheWasm; @@ -55,7 +60,7 @@ pub struct CompactBlockRow { #[derive(Clone)] pub enum BlockProcessingMode { Validate, - Scan(DataConnStmtCacheWrapper), + Scan(DataConnStmtCacheWrapper, Option), } /// Checks that the scanned blocks in the data database, when combined with the recent @@ -114,7 +119,7 @@ pub async fn scan_cached_block( params: &ZcoinConsensusParams, block: &CompactBlock, last_height: &mut BlockHeight, -) -> Result<(), ValidateBlocksError> { +) -> Result { let mut data_guard = data.inner().clone(); // Fetch the ExtendedFullViewingKeys we are tracking let extfvks = data_guard.get_extended_full_viewing_keys().await?; @@ -201,5 +206,6 @@ pub async fn scan_cached_block( *last_height = current_height; - Ok(()) + // If there are any transactions in the block, return the transaction count + Ok(txs.len()) } diff --git a/mm2src/coins/z_coin/storage/blockdb/blockdb_idb_storage.rs b/mm2src/coins/z_coin/storage/blockdb/blockdb_idb_storage.rs index 112fb67400..cccf8cc0a9 100644 --- a/mm2src/coins/z_coin/storage/blockdb/blockdb_idb_storage.rs +++ b/mm2src/coins/z_coin/storage/blockdb/blockdb_idb_storage.rs @@ -3,6 +3,7 @@ use crate::z_coin::storage::{scan_cached_block, validate_chain, BlockDbImpl, Blo use crate::z_coin::z_coin_errors::ZcoinStorageError; use async_trait::async_trait; +use futures_util::SinkExt; use mm2_core::mm_ctx::MmArc; use mm2_db::indexed_db::{BeBigUint, ConstructibleDb, DbIdentifier, DbInstance, DbLocked, DbUpgrader, IndexedDb, IndexedDbBuilder, InitDbResult, MultiIndex, OnUpgradeResult, TableSignature}; @@ -123,7 +124,7 @@ impl BlockDbImpl { } /// Asynchronously rewinds the storage to a specified block height, effectively - /// removing data beyond the specified height from the storage. + /// removing data beyond the specified height from the storage. pub async fn rewind_to_height(&self, height: BlockHeight) -> ZcoinStorageRes { let locked_db = self.lock_db().await?; let db_transaction = locked_db.get_inner().transaction().await?; @@ -224,7 +225,7 @@ impl BlockDbImpl { BlockProcessingMode::Validate => validate_from .map(|(height, _)| height) .unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1), - BlockProcessingMode::Scan(data) => data.inner().block_height_extrema().await.map(|opt| { + BlockProcessingMode::Scan(data, _) => data.inner().block_height_extrema().await.map(|opt| { opt.map(|(_, max)| max) .unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1) })?, @@ -250,8 +251,15 @@ impl BlockDbImpl { BlockProcessingMode::Validate => { validate_chain(block, &mut prev_height, &mut prev_hash).await?; }, - BlockProcessingMode::Scan(data) => { - scan_cached_block(data, ¶ms, &block, &mut from_height).await?; + BlockProcessingMode::Scan(data, z_balance_change_sender) => { + let tx_size = scan_cached_block(data, ¶ms, &block, &mut from_height).await?; + // If there is/are transactions present in the current scanned block(s), + // we trigger a `Triggered` event to update the balance change. + if tx_size > 0 { + if let Some(mut sender) = z_balance_change_sender.clone() { + sender.send(()).await.expect("No receiver is available/dropped"); + }; + }; }, } } diff --git a/mm2src/coins/z_coin/storage/blockdb/blockdb_sql_storage.rs b/mm2src/coins/z_coin/storage/blockdb/blockdb_sql_storage.rs index 74c790bf89..e8ce70d6dd 100644 --- a/mm2src/coins/z_coin/storage/blockdb/blockdb_sql_storage.rs +++ b/mm2src/coins/z_coin/storage/blockdb/blockdb_sql_storage.rs @@ -6,6 +6,7 @@ use crate::z_coin::ZcoinConsensusParams; use common::async_blocking; use db_common::sqlite::rusqlite::{params, Connection}; use db_common::sqlite::{query_single_row, run_optimization_pragmas, rusqlite}; +use futures_util::SinkExt; use itertools::Itertools; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; @@ -193,7 +194,7 @@ impl BlockDbImpl { BlockProcessingMode::Validate => validate_from .map(|(height, _)| height) .unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1), - BlockProcessingMode::Scan(data) => { + BlockProcessingMode::Scan(data, _) => { let data = data.inner(); data.block_height_extrema().await.map(|opt| { opt.map(|(_, max)| max) @@ -224,8 +225,15 @@ impl BlockDbImpl { BlockProcessingMode::Validate => { validate_chain(block, &mut prev_height, &mut prev_hash).await?; }, - BlockProcessingMode::Scan(data) => { - scan_cached_block(data, ¶ms, &block, &mut from_height).await?; + BlockProcessingMode::Scan(data, z_balance_change_sender) => { + let tx_size = scan_cached_block(data, ¶ms, &block, &mut from_height).await?; + // If there are transactions present in the current scanned block, + // we send a `Triggered` event to update the balance change. + if tx_size > 0 { + if let Some(mut sender) = z_balance_change_sender.clone() { + sender.send(()).await.expect("No receiver is available/dropped"); + }; + }; }, } } diff --git a/mm2src/coins/z_coin/z_params/indexeddb.rs b/mm2src/coins/z_coin/storage/z_params/indexeddb.rs similarity index 100% rename from mm2src/coins/z_coin/z_params/indexeddb.rs rename to mm2src/coins/z_coin/storage/z_params/indexeddb.rs diff --git a/mm2src/coins/z_coin/z_params/mod.rs b/mm2src/coins/z_coin/storage/z_params/mod.rs similarity index 100% rename from mm2src/coins/z_coin/z_params/mod.rs rename to mm2src/coins/z_coin/storage/z_params/mod.rs diff --git a/mm2src/coins/z_coin/z_balance_streaming.rs b/mm2src/coins/z_coin/z_balance_streaming.rs new file mode 100644 index 0000000000..2e4c77df77 --- /dev/null +++ b/mm2src/coins/z_coin/z_balance_streaming.rs @@ -0,0 +1,110 @@ +use crate::common::Future01CompatExt; +use crate::hd_wallet::AsyncMutex; +use crate::z_coin::ZCoin; +use crate::{MarketCoinOps, MmCoin}; + +use async_trait::async_trait; +use common::executor::{AbortSettings, SpawnAbortable}; +use common::log::{error, info}; +use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; +use futures::channel::oneshot; +use futures::channel::oneshot::{Receiver, Sender}; +use futures_util::StreamExt; +use mm2_core::mm_ctx::MmArc; +use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus}; +use mm2_event_stream::{Event, EventStreamConfiguration}; +use std::sync::Arc; + +pub type ZBalanceEventSender = UnboundedSender<()>; +pub type ZBalanceEventHandler = Arc>>; + +#[async_trait] +impl EventBehaviour for ZCoin { + const EVENT_NAME: &'static str = "COIN_BALANCE"; + const ERROR_EVENT_NAME: &'static str = "COIN_BALANCE_ERROR"; + + async fn handle(self, _interval: f64, tx: Sender) { + const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen."; + + macro_rules! send_status_on_err { + ($match: expr, $sender: tt, $msg: literal) => { + match $match { + Some(t) => t, + None => { + $sender + .send(EventInitStatus::Failed($msg.to_owned())) + .expect(RECEIVER_DROPPED_MSG); + panic!("{}", $msg); + }, + } + }; + } + + let ctx = send_status_on_err!( + MmArc::from_weak(&self.as_ref().ctx), + tx, + "MM context must have been initialized already." + ); + let z_balance_change_handler = send_status_on_err!( + self.z_fields.z_balance_event_handler.as_ref(), + tx, + "Z balance change receiver can not be empty." + ); + + tx.send(EventInitStatus::Success).expect(RECEIVER_DROPPED_MSG); + + // Locks the balance change handler, iterates through received events, and updates balance changes accordingly. + let mut bal = z_balance_change_handler.lock().await; + while (bal.next().await).is_some() { + match self.my_balance().compat().await { + Ok(balance) => { + let payload = json!({ + "ticker": self.ticker(), + "address": self.my_z_address_encoded(), + "balance": { "spendable": balance.spendable, "unspendable": balance.unspendable } + }); + + ctx.stream_channel_controller + .broadcast(Event::new(Self::EVENT_NAME.to_string(), payload.to_string())) + .await; + }, + Err(err) => { + let ticker = self.ticker(); + error!("Failed getting balance for '{ticker}'. Error: {err}"); + let e = serde_json::to_value(err).expect("Serialization should't fail."); + return ctx + .stream_channel_controller + .broadcast(Event::new( + format!("{}:{}", Self::ERROR_EVENT_NAME, ticker), + e.to_string(), + )) + .await; + }, + }; + } + } + + async fn spawn_if_active(self, config: &EventStreamConfiguration) -> EventInitStatus { + if let Some(event) = config.get_event(Self::EVENT_NAME) { + info!( + "{} event is activated for {} address {}. `stream_interval_seconds`({}) has no effect on this.", + Self::EVENT_NAME, + self.ticker(), + self.my_z_address_encoded(), + event.stream_interval_seconds + ); + + let (tx, rx): (Sender, Receiver) = oneshot::channel(); + let fut = self.clone().handle(event.stream_interval_seconds, tx); + let settings = + AbortSettings::info_on_abort(format!("{} event is stopped for {}.", Self::EVENT_NAME, self.ticker())); + self.spawner().spawn_with_settings(fut, settings); + + rx.await.unwrap_or_else(|e| { + EventInitStatus::Failed(format!("Event initialization status must be received: {}", e)) + }) + } else { + EventInitStatus::Inactive + } + } +} diff --git a/mm2src/coins/z_coin/z_coin_errors.rs b/mm2src/coins/z_coin/z_coin_errors.rs index 5b5992baae..7fcf06cb12 100644 --- a/mm2src/coins/z_coin/z_coin_errors.rs +++ b/mm2src/coins/z_coin/z_coin_errors.rs @@ -249,6 +249,7 @@ pub enum ZCoinBuildError { ZCashParamsError(String), ZDerivationPathNotSet, SaplingParamsInvalidChecksum, + FailedSpawningBalanceEvents(String), } #[cfg(not(target_arch = "wasm32"))] diff --git a/mm2src/coins/z_coin/z_rpc.rs b/mm2src/coins/z_coin/z_rpc.rs index 55af1ac36b..bfdc61695f 100644 --- a/mm2src/coins/z_coin/z_rpc.rs +++ b/mm2src/coins/z_coin/z_rpc.rs @@ -32,6 +32,7 @@ use zcash_primitives::zip32::ExtendedSpendingKey; pub(crate) mod z_coin_grpc { tonic::include_proto!("pirate.wallet.sdk.rpc"); } +use crate::z_coin::z_balance_streaming::ZBalanceEventSender; use z_coin_grpc::compact_tx_streamer_client::CompactTxStreamerClient; use z_coin_grpc::{ChainSpec, CompactBlock as TonicCompactBlock}; @@ -507,6 +508,7 @@ pub(super) async fn init_light_client<'a>( sync_params: &Option, skip_sync_params: bool, z_spending_key: &ExtendedSpendingKey, + z_balance_event_sender: Option, ) -> Result<(AsyncMutex, WalletDbShared), MmError> { let coin = builder.ticker.to_string(); let (sync_status_notifier, sync_watcher) = channel(1); @@ -543,7 +545,7 @@ pub(super) async fn init_light_client<'a>( WalletDbShared::new(builder, maybe_checkpoint_block, z_spending_key, continue_from_prev_sync).await?; // Check min_height in blocks_db and rewind blocks_db to 0 if sync_height != min_height if !continue_from_prev_sync && (sync_height != min_height) { - // let user know we're clearing cache and resyncing from new provided height. + // let user know we're clearing cache and re-syncing from new provided height. if min_height > 0 { info!("Older/Newer sync height detected!, rewinding blocks_db to new height: {sync_height:?}"); } @@ -566,6 +568,7 @@ pub(super) async fn init_light_client<'a>( is_pre_sapling: sync_height < sapling_activation_height, actual: sync_height.max(sapling_activation_height), }, + z_balance_event_sender, }; let abort_handle = spawn_abortable(light_wallet_db_sync_loop(sync_handle, Box::new(light_rpc_clients))); @@ -582,6 +585,7 @@ pub(super) async fn init_native_client<'a>( native_client: NativeClient, blocks_db: BlockDbImpl, z_spending_key: &ExtendedSpendingKey, + z_balance_event_sender: Option, ) -> Result<(AsyncMutex, WalletDbShared), MmError> { let coin = builder.ticker.to_string(); let (sync_status_notifier, sync_watcher) = channel(1); @@ -610,6 +614,7 @@ pub(super) async fn init_native_client<'a>( scan_blocks_per_iteration: builder.z_coin_params.scan_blocks_per_iteration, scan_interval_ms: builder.z_coin_params.scan_interval_ms, first_sync_block, + z_balance_event_sender, }; let abort_handle = spawn_abortable(light_wallet_db_sync_loop(sync_handle, Box::new(native_client))); @@ -708,6 +713,7 @@ pub struct SaplingSyncLoopHandle { scan_blocks_per_iteration: u32, scan_interval_ms: u64, first_sync_block: FirstSyncBlock, + z_balance_event_sender: Option, } impl SaplingSyncLoopHandle { @@ -804,8 +810,7 @@ impl SaplingSyncLoopHandle { } } - let latest_block_height = blocks_db.get_latest_block().await?; - let current_block = BlockHeight::from_u32(latest_block_height); + let current_block = BlockHeight::from_u32(blocks_db.get_latest_block().await?); loop { match wallet_ops.block_height_extrema().await? { Some((_, max_in_wallet)) => { @@ -822,7 +827,7 @@ impl SaplingSyncLoopHandle { blocks_db .process_blocks_with_mode( self.consensus_params.clone(), - BlockProcessingMode::Scan(scan), + BlockProcessingMode::Scan(scan, self.z_balance_event_sender.clone()), None, Some(self.scan_blocks_per_iteration), )