From b24390c5f9c80f9e398b1ee84ec25cfb7cd155d6 Mon Sep 17 00:00:00 2001 From: danda Date: Tue, 17 Sep 2024 19:23:13 -0700 Subject: [PATCH] perf: split lengthy fn(s) to improve concurrency Primary changes are: 1. splits lengthy &mut self methods into read-only &self method(s) and faster &mut self method(s). This makes these methods more concurrency friendly. 2. provides original functionality with methods that have _atomic() suffix. These take &mut self and call both the &self and &mut self methods. 3. Adds ScopeDurationLogger to easily enable logging of slow methods. by default "slow" means > 0.1 seconds. Details: * log genesis block digest * rename GlobalState::set_new_tip() -> set_new_tip_atomic() * impl ScopeDurationLogger and use it to log method durations * split WalletState::update_wallet_state_with_new_block() * split ArchivalState::update_mutator_set() * add experimental GlobalStateLock::set_new_tip_concurrent() (not used) * split GlobalState::resync_membership_proofs() * remove duration_* macros * add fn_name, fn_name_bare macros --- src/lib.rs | 41 ++ src/macros.rs | 202 +----- src/main_loop.rs | 4 +- src/models/blockchain/block/mod.rs | 9 +- src/models/state/archival_state.rs | 363 +++++++---- src/models/state/mempool.rs | 40 +- src/models/state/mod.rs | 590 +++++++++++++----- src/models/state/wallet/mod.rs | 6 +- src/models/state/wallet/wallet_state.rs | 331 ++++++++-- src/peer_loop.rs | 64 +- src/rpc_server.rs | 2 +- src/tests/shared.rs | 5 +- .../mutator_set/archival_mutator_set.rs | 2 +- src/util_types/mutator_set/removal_record.rs | 13 +- 14 files changed, 1100 insertions(+), 572 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 64f43b93c..f76b483d9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -284,6 +284,47 @@ pub async fn initialize(cli_args: cli_args::Args) -> Result<()> { .await } +pub struct ScopeDurationLogger<'a> { + start: Instant, + description: &'a str, + log_slow_fn_threshold: f64, +} +impl<'a> ScopeDurationLogger<'a> { + pub fn new_with_threshold(description: &'a str, log_slow_fn_threshold: f64) -> Self { + Self { + start: Instant::now(), + description, + log_slow_fn_threshold, + } + } + + pub fn new(description: &'a str) -> Self { + Self::new_with_threshold( + description, + match env::var("LOG_SLOW_FN_THRESHOLD") { + Ok(t) => t.parse().unwrap(), + Err(_) => 0.1, + }, + ) + } +} + +impl Drop for ScopeDurationLogger<'_> { + fn drop(&mut self) { + let elapsed = self.start.elapsed(); + let duration = elapsed.as_secs() as f64 + elapsed.subsec_nanos() as f64 / 1e9; + + if duration >= self.log_slow_fn_threshold { + let msg = format!( + "\n-- slow fn threshold of {} secs exceeded --\n executed {} in {} secs", + self.log_slow_fn_threshold, self.description, duration, + ); + + tracing::warn!("{}", msg); + } + } +} + /// Time a fn call. Duration is returned as a float in seconds. pub fn time_fn_call(f: impl FnOnce() -> O) -> (O, f64) { let start = Instant::now(); diff --git a/src/macros.rs b/src/macros.rs index d6e69d2df..a6eff2ed8 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -1,141 +1,20 @@ -/// executes an expression, times duration, and emits trace! message -/// -/// The trace level is `tracing::Level::TRACE` by default. -/// -/// Accepts arguments in 3 forms: -/// duration!(myfunc()) -/// duration!(myfunc(), message) -/// duration!(myfunc(), message, trace_level) -#[allow(unused_macros)] -macro_rules! duration { - ($target: expr, $message: expr, $lvl: expr) => {{ - let (output, duration) = $crate::time_fn_call(|| $target); - let msg = format!( - "at {}:{}{}\n-- executed expression --\n{}\n -- duration: {} secs --", - file!(), - line!(), - if $message.len() > 0 { - format! {"\n{}", $message} - } else { - "".to_string() - }, - stringify!($target), - duration - ); - match $lvl { - tracing::Level::INFO => tracing::info!("{}", msg), - tracing::Level::TRACE => tracing::trace!("{}", msg), - tracing::Level::DEBUG => tracing::trace!("{}", msg), - tracing::Level::WARN => tracing::warn!("{}", msg), - tracing::Level::ERROR => tracing::error!("{}", msg), +macro_rules! fn_name_bare { + () => {{ + fn f() {} + fn type_name_of(_: T) -> &'static str { + std::any::type_name::() } - output + type_name_of(f) + .rsplit("::") + .find(|&part| part != "f" && part != "{{closure}}") + .expect("Short function name") }}; - ($target: expr, $message: expr) => { - $crate::macros::duration!($target, $message, tracing::Level::TRACE) - }; - ($target: expr) => { - $crate::macros::duration!($target, "", tracing::Level::TRACE) - }; } -/// executes an expression, times duration, and emits info! message -/// -/// Accepts arguments in 2 forms: -/// duration!(myfunc()) -/// duration!(myfunc(), message) -#[allow(unused_macros)] -macro_rules! duration_info { - ($target: expr) => { - $crate::macros::duration!($target, "", tracing::Level::INFO) - }; - ($target: expr, $message: expr) => { - $crate::macros::duration!($target, $message, tracing::Level::INFO) - }; -} - -/// executes an expression, times duration, and emits debug! message -/// -/// Accepts arguments in 2 forms: -/// duration!(myfunc()) -/// duration!(myfunc(), message) -#[allow(unused_macros)] -macro_rules! duration_debug { - ($target: expr) => { - $crate::macros::duration!($target, "", tracing::Level::DEBUG) - }; - ($target: expr, $message: expr) => { - $crate::macros::duration!($target, $message, tracing::Level::DEBUG) - }; -} - -/// executes an async expression, times duration, and emits trace! message -/// -/// Accepts arguments in 3 forms: -/// duration!(myfunc()) -/// duration!(myfunc(), message) -/// duration!(myfunc(), message, trace_level) -#[allow(unused_macros)] -macro_rules! duration_async { - ($target: expr, $message: expr, $lvl: expr) => {{ - let (output, duration) = $crate::time_fn_call_async({ $target }).await; - let msg = format!( - "at {}:{}{}\n-- executed expression --\n{}\n -- duration: {} secs --", - file!(), - line!(), - if $message.len() > 0 { - format! {"\n{}", $message} - } else { - "".to_string() - }, - stringify!($target), - duration - ); - match $lvl { - tracing::Level::INFO => tracing::info!("{}", msg), - tracing::Level::TRACE => tracing::trace!("{}", msg), - tracing::Level::DEBUG => tracing::trace!("{}", msg), - tracing::Level::WARN => tracing::warn!("{}", msg), - tracing::Level::ERROR => tracing::error!("{}", msg), - } - output +macro_rules! fn_name { + () => {{ + format!("{}()", crate::macros::fn_name_bare!()) }}; - ($target: expr, $message: expr) => { - $crate::macros::duration_async!($target, $message, tracing::Level::TRACE) - }; - ($target: expr) => { - $crate::macros::duration_async!($target, "", tracing::Level::TRACE) - }; -} - -/// executes an async expression, times duration, and emits info! message -/// -/// Accepts arguments in 2 forms: -/// duration!(myfunc()) -/// duration!(myfunc(), message) -#[allow(unused_macros)] -macro_rules! duration_async_info { - ($target: expr) => { - $crate::macros::duration_async!($target, "", tracing::Level::INFO) - }; - ($target: expr, $message: expr) => { - $crate::macros::duration_async!($target, $message, tracing::Level::INFO) - }; -} - -/// executes an async expression, times duration, and emits debug! message -/// -/// Accepts arguments in 2 forms: -/// duration!(myfunc()) -/// duration!(myfunc(), message) -#[allow(unused_macros)] -macro_rules! duration_async_debug { - ($target: expr) => { - $crate::macros::duration_async!($target, "", tracing::Level::DEBUG) - }; - ($target: expr, $message: expr) => { - $crate::macros::duration_async!($target, $message, tracing::Level::DEBUG) - }; } // These allow the macros to be used as @@ -144,67 +23,22 @@ macro_rules! duration_async_debug { // see: https://stackoverflow.com/a/67140319/10087197 #[allow(unused_imports)] -pub(crate) use duration; -#[allow(unused_imports)] -pub(crate) use duration_async; -#[allow(unused_imports)] -pub(crate) use duration_async_debug; -#[allow(unused_imports)] -pub(crate) use duration_async_info; +pub(crate) use fn_name; #[allow(unused_imports)] -pub(crate) use duration_debug; -#[allow(unused_imports)] -pub(crate) use duration_info; +pub(crate) use fn_name_bare; #[cfg(test)] mod test { use super::*; - use tracing::Level; - - fn fibonacci(n: u32) -> u32 { - match n { - 0 => 1, - 1 => 1, - _ => fibonacci(n - 1) + fibonacci(n - 2), - } - } - - async fn fibonacci_async(n: u32) -> u32 { - match n { - 0 => 1, - 1 => 1, - _ => fibonacci(n - 1) + fibonacci(n - 2), - } - } #[test] - fn duration_tests() { - duration!(fibonacci(1)); - duration!(fibonacci(2), "fibonacci - 2".to_string()); - duration!(fibonacci(3), "fibonacci - 3", Level::INFO); - - duration_info!(fibonacci(4)); - duration_info!(fibonacci(5), "fibonacci - 5"); - duration_info!(fibonacci(6), "fibonacci - 6".to_string()); - - duration_debug!(fibonacci(7)); - duration_debug!(fibonacci(8), "fibonacci - 8"); - duration_debug!(fibonacci(9), "fibonacci - 9".to_string()); + fn fn_name_test() { + assert_eq!(fn_name!(), "fn_name_test()"); } #[tokio::test] - async fn duration_async_tests() { - duration_async!(fibonacci_async(1)); - duration_async!(fibonacci_async(2), "fibonacci_async - 2".to_string()); - duration_async!(fibonacci_async(3), "fibonacci_async - 3", Level::INFO); - - duration_async_info!(fibonacci_async(4)); - duration_async_info!(fibonacci_async(5), "fibonacci_async - 5"); - duration_async_info!(fibonacci_async(6), "fibonacci_async - 6".to_string()); - - duration_async_debug!(fibonacci_async(7)); - duration_async_debug!(fibonacci_async(8), "fibonacci_async - 8"); - duration_async_debug!(fibonacci_async(9), "fibonacci_async - 9".to_string()); + async fn async_fn_name_test() { + assert_eq!(fn_name!(), "async_fn_name_test()"); } } diff --git a/src/main_loop.rs b/src/main_loop.rs index 050c9e99c..97ff64f9a 100644 --- a/src/main_loop.rs +++ b/src/main_loop.rs @@ -345,7 +345,7 @@ impl MainLoopHandler { } global_state_mut - .set_new_self_mined_tip( + .set_new_self_mined_tip_atomic( new_block.as_ref().clone(), new_block_info.coinbase_utxo_info.as_ref().clone(), ) @@ -429,7 +429,7 @@ impl MainLoopHandler { new_block.kernel.header.timestamp.standard_format() ); - global_state_mut.set_new_tip(new_block).await?; + global_state_mut.set_new_tip_atomic(new_block).await?; } } diff --git a/src/models/blockchain/block/mod.rs b/src/models/blockchain/block/mod.rs index 3ac49df01..76bf5fde6 100644 --- a/src/models/blockchain/block/mod.rs +++ b/src/models/blockchain/block/mod.rs @@ -327,7 +327,14 @@ impl Block { difficulty: MINIMUM_DIFFICULTY.into(), }; - Self::new(header, body, BlockType::Genesis) + let block = Self::new(header, body, BlockType::Genesis); + + debug!( + "Instantiated genesis block with digest\n str: {}\n hex: {}", + block.hash(), + block.hash().to_hex() + ); + block } fn premine_distribution(_network: Network) -> Vec<(ReceivingAddress, NeptuneCoins)> { diff --git a/src/models/state/archival_state.rs b/src/models/state/archival_state.rs index b45019a08..eeffc446d 100644 --- a/src/models/state/archival_state.rs +++ b/src/models/state/archival_state.rs @@ -680,14 +680,31 @@ impl ArchivalState { ret } - /// Update the mutator set with a block after this block has been stored to the database. - /// Handles rollback of the mutator set if needed but requires that all blocks that are - /// rolled back are present in the DB. The input block is considered chain tip. All blocks - /// stored in the database are assumed to be valid. - pub async fn update_mutator_set(&mut self, new_block: &Block) -> Result<()> { - let (forwards, backwards) = { + /// obtain the backwards, forwards paths from synced block to new block. + /// + /// concurrency and atomicity: + /// + /// The prepare/finalize pattern separates read ops from write ops so the + /// reads can be performed with either a read-lock or write-lock. + /// + /// The prepare/finalize pattern provides atomic read and atomic write with + /// improved concurrency however read+write is not atomic unless read+write + /// is performed within a single write-lock, which sacrifices concurrency. + pub(crate) async fn init_update_mutator_set( + &self, + new_block: &Block, + ) -> Result<(Vec, Vec)> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + + debug!( + "new_block prev_block_digest: {}", + new_block.kernel.header.prev_block_digest.to_hex() + ); + + let (backwards, forwards) = { // Get the block digest that the mutator set was most recently synced to let ms_block_sync_digest = self.archival_mutator_set.get_sync_label().await; + debug!("ms_block_sync_digest: {}", ms_block_sync_digest.to_hex()); // Find path from mutator set sync digest to new block. Optimize for the common case, // where the new block is the child block of block that the mutator set is synced to. @@ -705,128 +722,171 @@ impl ArchivalState { }; let forwards = [forwards, vec![new_block.hash()]].concat(); - (forwards, backwards) + (backwards, forwards) }; - for digest in backwards { - // Roll back mutator set - let roll_back_block = self - .get_block(digest) - .await - .expect("Fetching block must succeed") - .unwrap(); + Ok((backwards, forwards)) + } - debug!( - "Updating mutator set: rolling back block with height {}", - roll_back_block.kernel.header.height - ); + /// reads removal and addition records for a batch of blocks + /// + /// concurrency and atomicity: + /// + /// The prepare/finalize pattern separates read ops from write ops so the + /// reads can be performed with either a read-lock or write-lock. + /// + /// The prepare/finalize pattern provides atomic read and atomic write with + /// improved concurrency however read+write is not atomic unless read+write + /// is performed within a single write-lock, which sacrifices concurrency. + pub(crate) async fn prepare_update_mutator_set( + &self, + new_block: &Block, + path: &[Digest], + skip: usize, + batch_size: usize, + ) -> Result> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + + let mut block_records = vec![]; + + for digest in path.iter().skip(skip).take(batch_size) { + let block = match *digest == new_block.hash() { + true => new_block, + false => &self.get_block(*digest).await?.ok_or_else(|| { + anyhow::anyhow!( + "block not found with digest: {} hex: {}", + digest, + digest.to_hex() + ) + })?, + }; - // Roll back all addition records contained in block - for addition_record in roll_back_block - .kernel - .body - .transaction - .kernel - .outputs - .iter() - .rev() - { - assert!( - self.archival_mutator_set - .ams_mut() - .add_is_reversible(addition_record) - .await, - "Addition record must be in sync with block being rolled back." - ); - self.archival_mutator_set - .ams_mut() - .revert_add(addition_record) - .await; - } + block_records.push(( + block.kernel.body.transaction.kernel.inputs.clone(), + block.kernel.body.transaction.kernel.outputs.clone(), + )); + } - // Roll back all removal records contained in block - for removal_record in roll_back_block.kernel.body.transaction.kernel.inputs.iter() { - self.archival_mutator_set - .ams_mut() - .revert_remove(removal_record) - .await; + Ok(match !block_records.is_empty() { + true => Some(MutatorSetBlockUpdate { + block_records, + skip, + }), + false => None, + }) + } + + /// write backwards-path removal/addition updates + /// + /// concurrency and atomicity: + /// + /// The prepare/finalize pattern separates read ops from write ops so the + /// reads can be performed with either a read-lock or write-lock. + /// + /// The prepare/finalize pattern provides atomic read and atomic write with + /// improved concurrency however read+write is not atomic unless read+write + /// is performed within a single write-lock, which sacrifices concurrency. + pub(crate) async fn update_mutator_set_backwards( + &mut self, + updates: MutatorSetBlockUpdate, + ) -> Result<()> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + + let MutatorSetBlockUpdate { + block_records, + skip, + .. + } = updates; + + debug!( + "Updating mutator set: walking blocks backwards. batch {}..{}", + skip, + skip + block_records.len(), + ); + + let ams = self.archival_mutator_set.ams_mut(); + + for (removals, additions) in block_records { + for addition_record in additions.into_iter().rev() { + ams.revert_add(&addition_record).await; + } + for removal_record in removals { + ams.revert_remove(&removal_record).await; } } - for digest in forwards { - // Add block to mutator set - let apply_forward_block = if digest == new_block.hash() { - new_block.to_owned() - } else { - self.get_block(digest) - .await - .expect("Fetching block must succeed") - .unwrap() - }; - debug!( - "Updating mutator set: adding block with height {}. Mined: {}", - apply_forward_block.kernel.header.height, - apply_forward_block - .kernel - .header - .timestamp - .standard_format() - ); + Ok(()) + } - let mut addition_records: Vec = apply_forward_block - .kernel - .body - .transaction - .kernel - .outputs - .clone(); - addition_records.reverse(); - let mut removal_records = apply_forward_block - .kernel - .body - .transaction - .kernel - .inputs - .clone(); - removal_records.reverse(); - let mut removal_records: Vec<&mut RemovalRecord> = - removal_records.iter_mut().collect::>(); - - // Add items, thus adding the output UTXOs to the mutator set - while let Some(addition_record) = addition_records.pop() { - // Batch-update all removal records to keep them valid after next addition - RemovalRecord::batch_update_from_addition( - &mut removal_records, - &self.archival_mutator_set.ams().accumulator().await, - ); + /// write forwards-path removal/addition updates + /// + /// concurrency and atomicity: + /// + /// The prepare/finalize pattern separates read ops from write ops so the + /// reads can be performed with either a read-lock or write-lock. + /// + /// The prepare/finalize pattern provides atomic read and atomic write with + /// improved concurrency however read+write is not atomic unless read+write + /// is performed within a single write-lock, which sacrifices concurrency. + pub(crate) async fn update_mutator_set_forwards( + &mut self, + updates: MutatorSetBlockUpdate, + ) -> Result<()> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + + let MutatorSetBlockUpdate { + block_records, + skip, + .. + } = updates; - // Add the element to the mutator set - self.archival_mutator_set - .ams_mut() - .add(&addition_record) - .await; + debug!( + "Updating mutator set: walking blocks forwards. batch {}..{}", + skip, + skip + block_records.len(), + ); + + let ams = self.archival_mutator_set.ams_mut(); + + for (mut removals, mut additions) in block_records { + removals.reverse(); + additions.reverse(); + + let mut removals: Vec<&mut RemovalRecord> = removals.iter_mut().collect::>(); + + while let Some(addition_record) = additions.pop() { + // Batch-update all removal records to keep them valid after next addition + RemovalRecord::batch_update_from_addition(&mut removals, &ams.accumulator().await); + ams.add(&addition_record).await; } - // Remove items, thus removing the input UTXOs from the mutator set - while let Some(removal_record) = removal_records.pop() { + while let Some(removal_record) = removals.pop() { // Batch-update all removal records to keep them valid after next removal - RemovalRecord::batch_update_from_remove(&mut removal_records, removal_record); - - // Remove the element from the mutator set - self.archival_mutator_set - .ams_mut() - .remove(removal_record) - .await; + RemovalRecord::batch_update_from_remove(&mut removals, removal_record); + ams.remove(removal_record).await; } } + Ok(()) + } + + /// perform final verification, set sync label, persist to disk. + /// + /// concurrency and atomicity: + /// + /// The prepare/finalize pattern separates read ops from write ops so the + /// reads can be performed with either a read-lock or write-lock. + /// + /// The prepare/finalize pattern provides atomic read and atomic write with + /// improved concurrency however read+write is not atomic unless read+write + /// is performed within a single write-lock, which sacrifices concurrency. + pub(crate) async fn finalize_update_mutator_set(&mut self, new_block: &Block) -> Result<()> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + // Sanity check that archival mutator set has been updated consistently with the new block - debug!("sanity check: was AMS updated consistently with new block?"); + let new_block_ams_hash = new_block.kernel.body.mutator_set_accumulator.hash(); assert_eq!( - new_block - .kernel.body - .mutator_set_accumulator - .hash(), + new_block_ams_hash, self.archival_mutator_set.ams().hash().await, "Calculated archival mutator set commitment must match that from newly added block. Block Digest: {:?}", new_block.hash() ); @@ -839,6 +899,61 @@ impl ArchivalState { Ok(()) } + + /// Updates the mutator set with a block + /// + /// Handles rollback of the mutator set if needed but requires that all + /// blocks that are rolled back are present in the DB. The input block is + /// considered chain tip. All blocks stored in the database are assumed to + /// be valid. + /// + /// concurrency and atomicity: + /// + /// This &mut self method is read+write atomic but is not concurrent. + /// + /// The caller must hold the global write-lock while calling this + /// which blocks all other tasks, read or write. + pub(crate) async fn update_mutator_set_atomic(&mut self, new_block: &Block) -> Result<()> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + + // obtain the backwards, forwards paths. + let (backwards, forwards) = self.init_update_mutator_set(new_block).await?; + + const BATCH_SIZE: usize = 5; // process 5 blocks at a time. + + // process backwards path + for skip in (0..).map(|i| i * BATCH_SIZE) { + // read removal and addition records for this batch + match self + .prepare_update_mutator_set(new_block, &backwards, skip, BATCH_SIZE) + .await? + { + // write removal/addition updates for this batch + Some(updates) => self.update_mutator_set_backwards(updates).await?, + None => break, + } + } + // process forwards path + for skip in (0..).map(|i| i * BATCH_SIZE) { + // read removal and addition records for this batch + match self + .prepare_update_mutator_set(new_block, &forwards, skip, BATCH_SIZE) + .await? + { + // write removal/addition updates for this batch + Some(updates) => self.update_mutator_set_forwards(updates).await?, + None => break, + } + } + // perform final verification, set sync label, persist to disk. + self.finalize_update_mutator_set(new_block).await + } +} + +/// contains removal and addition records for a batch of blocks. +pub(crate) struct MutatorSetBlockUpdate { + block_records: Vec<(Vec, Vec)>, + skip: usize, } #[cfg(test)] @@ -964,7 +1079,7 @@ mod archival_state_tests { rng.gen(), ); archival_state - .update_mutator_set(&mock_block_1) + .update_mutator_set_atomic(&mock_block_1) .await .unwrap(); @@ -1011,7 +1126,7 @@ mod archival_state_tests { { genesis_receiver_global_state - .set_new_tip(mock_block_1.clone()) + .set_new_tip_atomic(mock_block_1.clone()) .await .unwrap(); let ams_ref = &genesis_receiver_global_state @@ -1060,7 +1175,7 @@ mod archival_state_tests { // Remove an element from the mutator set, verify that the active window DB is updated. genesis_receiver_global_state - .set_new_tip(mock_block_2.clone()) + .set_new_tip_atomic(mock_block_2.clone()) .await?; let ams_ref = &genesis_receiver_global_state @@ -1096,7 +1211,7 @@ mod archival_state_tests { // 2. Update mutator set with this archival_state - .update_mutator_set(&mock_block_1a) + .update_mutator_set_atomic(&mock_block_1a) .await .unwrap(); @@ -1111,7 +1226,7 @@ mod archival_state_tests { // 4. Update mutator set with that archival_state - .update_mutator_set(&mock_block_1b) + .update_mutator_set_atomic(&mock_block_1b) .await .unwrap(); @@ -1195,7 +1310,10 @@ mod archival_state_tests { archival_state.write_block_as_tip(&block_1a).await.unwrap(); // 2. Update mutator set with this - archival_state.update_mutator_set(&block_1a).await.unwrap(); + archival_state + .update_mutator_set_atomic(&block_1a) + .await + .unwrap(); // 3. Create competing block 1 and store it to DB let (mock_block_1b, _, _) = make_mock_block_with_valid_pow( @@ -1212,7 +1330,7 @@ mod archival_state_tests { // 4. Update mutator set with that and verify rollback archival_state - .update_mutator_set(&mock_block_1b) + .update_mutator_set_atomic(&mock_block_1b) .await .unwrap(); } @@ -1334,7 +1452,7 @@ mod archival_state_tests { global_state .chain .archival_state_mut() - .update_mutator_set(&next_block) + .update_mutator_set_atomic(&next_block) .await .unwrap(); @@ -1345,8 +1463,7 @@ mod archival_state_tests { &previous_block.kernel.body.mutator_set_accumulator, &next_block, ) - .await - .unwrap(); + .await?; } // Genesis block may have a different number of outputs than the blocks produced above @@ -1380,7 +1497,7 @@ mod archival_state_tests { global_state .chain .archival_state_mut() - .update_mutator_set(&mock_block_1b) + .update_mutator_set_atomic(&mock_block_1b) .await .unwrap(); } @@ -1633,7 +1750,7 @@ mod archival_state_tests { &mut bob_state_lock, ] { let mut state = state_lock.lock_guard_mut().await; - state.set_new_tip(block_1.clone()).await.unwrap(); + state.set_new_tip_atomic(block_1.clone()).await.unwrap(); } { @@ -1827,7 +1944,7 @@ mod archival_state_tests { &mut bob_state_lock, ] { let mut state = state_lock.lock_guard_mut().await; - state.set_new_tip(block_2.clone()).await.unwrap(); + state.set_new_tip_atomic(block_2.clone()).await.unwrap(); } assert!(alice_state_lock diff --git a/src/models/state/mempool.rs b/src/models/state/mempool.rs index fe5d24cfb..563706e30 100644 --- a/src/models/state/mempool.rs +++ b/src/models/state/mempool.rs @@ -315,7 +315,7 @@ impl Mempool { /// Remove from the mempool all transactions that become invalid because /// of a newly received block. Also update all mutator set data for mempool /// transactions that were not removed. - pub async fn update_with_block( + pub fn update_with_block( &mut self, previous_mutator_set_accumulator: MutatorSetAccumulator, block: &Block, @@ -653,11 +653,11 @@ mod tests { )) .await; other_global_state - .set_new_tip(block_1.clone()) + .set_new_tip_atomic(block_1.clone()) .await .unwrap(); premine_receiver_global_state - .set_new_tip(block_1.clone()) + .set_new_tip_atomic(block_1.clone()) .await .unwrap(); @@ -736,12 +736,10 @@ mod tests { // Update the mempool with block 2 and verify that the mempool now only contains one tx assert_eq!(2, mempool.len()); - mempool - .update_with_block( - block_1.kernel.body.mutator_set_accumulator.clone(), - &block_2, - ) - .await; + mempool.update_with_block( + block_1.kernel.body.mutator_set_accumulator.clone(), + &block_2, + ); assert_eq!(1, mempool.len()); // Create a new block to verify that the non-mined transaction contains @@ -780,12 +778,10 @@ mod tests { for _ in 0..11 { let (next_block, _, _) = make_mock_block(&previous_block, None, other_receiver_address, rng.gen()); - mempool - .update_with_block( - previous_block.kernel.body.mutator_set_accumulator.clone(), - &next_block, - ) - .await; + mempool.update_with_block( + previous_block.kernel.body.mutator_set_accumulator.clone(), + &next_block, + ); previous_block = next_block; } @@ -805,12 +801,10 @@ mod tests { "Block with tx with updated mutator set data must be valid after 10 blocks have been mined" ); - mempool - .update_with_block( - previous_block.kernel.body.mutator_set_accumulator.clone(), - &block_14, - ) - .await; + mempool.update_with_block( + previous_block.kernel.body.mutator_set_accumulator.clone(), + &block_14, + ); assert!( mempool.is_empty(), @@ -894,7 +888,7 @@ mod tests { rng.gen(), ); premine_receiver_global_state - .set_new_tip(next_block.clone()) + .set_new_tip_atomic(next_block.clone()) .await .unwrap(); @@ -942,7 +936,7 @@ mod tests { "Sanity check that new tip has height 1" ); premine_receiver_global_state - .set_new_tip(block_1b.clone()) + .set_new_tip_atomic(block_1b.clone()) .await .unwrap(); diff --git a/src/models/state/mod.rs b/src/models/state/mod.rs index 52d78ab76..8b4228faa 100644 --- a/src/models/state/mod.rs +++ b/src/models/state/mod.rs @@ -7,6 +7,7 @@ pub mod shared; pub mod wallet; use std::cmp::max; +use std::collections::BTreeMap; use std::ops::Deref; use std::ops::DerefMut; @@ -38,7 +39,6 @@ use crate::models::peer::HandshakeData; use crate::models::state::wallet::expected_utxo::ExpectedUtxo; use crate::models::state::wallet::monitored_utxo::MonitoredUtxo; use crate::prelude::twenty_first; -use crate::time_fn_call_async; use crate::util_types::mutator_set::mutator_set_accumulator::MutatorSetAccumulator; use crate::Hash; use crate::VERSION; @@ -173,26 +173,263 @@ impl GlobalStateLock { self.lock_guard_mut().await.flush_databases().await } - /// store a coinbase (self-mined) block - pub async fn store_coinbase_block( + /// stores a block and sets it as the new tip. + /// + /// concurrency and atomicity: + /// + /// This method is atomic (read+write) but not concurrent. + /// All other tasks that read or write GlobalState will be blocked. + pub async fn set_new_tip_atomic( &mut self, new_block: Block, - coinbase_utxo_info: ExpectedUtxo, + coinbase_utxo_info: Option, ) -> Result<()> { self.lock_guard_mut() .await - .set_new_self_mined_tip(new_block, coinbase_utxo_info) + .set_new_tip_atomic_internal(new_block, coinbase_utxo_info) .await } - /// store a block (non coinbase) - pub async fn store_block(&mut self, new_block: Block) -> Result<()> { - self.lock_guard_mut().await.set_new_tip(new_block).await + /// stores a block and sets it as the new tip. + /// + /// concurrency and atomicity: + /// + /// This method favors concurrency over atomicity. + /// + /// The prepare/finalize pattern enables lengthy read operations to be + /// performed with only a read-lock. + /// + /// These reads are performed in batches. Thus it is not guaranteed + /// that reads are atomic nor read+write. + /// + /// experimental: + /// + /// This method is considered experimental and is not presently in use. + pub async fn set_new_tip_concurrent( + &mut self, + new_block: Block, + coinbase_utxo_info: Option, + ) -> Result<()> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + + { + let mut gsm = self.lock_guard_mut().await; + + // Apply the updates + gsm.chain + .archival_state_mut() + .write_block_as_tip(&new_block) + .await?; + + if let Some(coinbase_info) = coinbase_utxo_info { + // Notify wallet to expect the coinbase UTXO, as we mined this block + gsm.wallet_state + .add_expected_utxo(ExpectedUtxo::new( + coinbase_info.utxo, + coinbase_info.sender_randomness, + coinbase_info.receiver_preimage, + UtxoNotifier::OwnMiner, + )) + .await; + } + } // write-lock released + + // update the mutator set with the UTXOs from this block + { + // obtain the backwards, forwards paths. + let (backwards, forwards) = self + .lock_guard() + .await + .chain + .archival_state() + .init_update_mutator_set(&new_block) + .await?; + + const BATCH_SIZE: usize = 5; // process 5 blocks at a time. + + // process backwards path + for skip in (0..).map(|i| i * BATCH_SIZE) { + // read removal and addition records for this batch + let result = self + .lock_guard() + .await + .chain + .archival_state() + .prepare_update_mutator_set(&new_block, &backwards, skip, BATCH_SIZE) + .await?; + match result { + Some(updates) => { + // write removal/addition updates for this batch + self.lock_guard_mut() + .await + .chain + .archival_state_mut() + .update_mutator_set_backwards(updates) + .await? + } + None => break, + }; + } + // process forwards path + for skip in (0..).map(|i| i * BATCH_SIZE) { + // read removal and addition records for this batch + let result = self + .lock_guard() + .await + .chain + .archival_state() + .prepare_update_mutator_set(&new_block, &forwards, skip, BATCH_SIZE) + .await?; + match result { + Some(updates) => { + // write removal/addition updates for this batch + self.lock_guard_mut() + .await + .chain + .archival_state_mut() + .update_mutator_set_forwards(updates) + .await? + } + None => break, + } + } + // perform final verification, set sync label, persist to disk. + self.lock_guard_mut() + .await + .chain + .archival_state_mut() + .finalize_update_mutator_set(&new_block) + .await? + } + + // Get parent of tip for mutator-set data needed for various updates. Parent of the + // stored block will always exist since all blocks except the genesis block have a + // parent, and the genesis block is considered code, not data, so the genesis block + // will never be changed or updated through this method. + let tip_parent = self + .lock_guard() + .await + .chain + .archival_state() + .get_tip_parent() + .await + .ok_or(anyhow::anyhow!( + "Parent must exist when storing a new block" + ))?; + + // Sanity check that must always be true for a valid block + assert_eq!( + tip_parent.hash(), + new_block.header().prev_block_digest, + "Tip parent has must match indicated parent hash" + ); + let previous_ms_accumulator = tip_parent.body().mutator_set_accumulator.clone(); + + // prepare to update wallet state with relevant UTXOs from this block + let result = self + .lock_guard() + .await + .wallet_state + .prepare_update_wallet_state_with_new_block(&previous_ms_accumulator, &new_block) + .await?; + + let mut gsm = self.lock_guard_mut().await; + + if let Some(wallet_updates) = result { + // write updates + gsm.wallet_state + .finalize_update_wallet_state_with_new_block(wallet_updates) + .await?; + } + + // Update mempool with UTXOs from this block. This is done by removing all transaction + // that became invalid/was mined by this block. + gsm.mempool + .update_with_block(previous_ms_accumulator, &new_block); + + gsm.chain.light_state_mut().set_block(new_block); + + // Flush databases + gsm.flush_databases().await?; + + Ok(()) } /// resync membership proofs + /// + /// concurrency and atomicity: + /// + /// This method favors concurrency over (read+write) atomicity. + /// + /// The prepare/finalize pattern enables lengthy read operations to be + /// performed without requiring a write-lock. + /// + /// The prepare/finalize pattern provides atomic read and atomic write with + /// improved concurrency however read+write is not automatically atomic. + /// + /// This fn performs potentially lengthy operations. The work has been + /// split into two: an immutable fn that prepares a batch of updates and a + /// mutable fn that finalizes (writes) the updates as quickly as possible. + /// In this way, we do not hold the write-lock any longer than necessary. + /// + /// Splitting the logic this way requires buffering updates in RAM. We + /// batch the updates to avoid running out of mem. The batch size is + /// initially set to 1000. This is just a wild-ass-guess. It seems high + /// enough that most wallets will only need a single batch, but low enough + /// that we shouldn't run out of mem if processing a huge wallet. + /// + /// see: pub async fn resync_membership_proofs(&mut self) -> Result<()> { - self.lock_guard_mut().await.resync_membership_proofs().await + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + + // acquire read-lock + let gs = self.lock_guard().await; + + // Do not fix memberhip proofs if node is in sync mode, as we would otherwise + // have to sync many times, instead of just *one* time once we have caught up. + if gs.net.syncing { + debug!("Not syncing MS membership proofs because we are syncing"); + return Ok(()); + } + + // is it necessary? + let tip_hash = gs.chain.light_state().hash(); + if gs.wallet_state.is_synced_to(tip_hash).await { + debug!("Membership proof syncing not needed"); + return Ok(()); + } + + // do we have blocks? + if !gs.chain.is_archival_node() { + todo!("We don't yet support non-archival nodes"); + // request blocks from peers, etc. + } + + drop(gs); // release read-lock. + + // Process monitored_utxos in batches of 1000. It seems unlikely many + // wallets would have more than 1000 mutxo that need resyncing but We do + // this as a safety valve to avoid blowing up memory too much. + const BATCH_SIZE: u64 = 1000; + for batch_idx in 0.. { + // perform lengthy op gathering updates with read-lock + let updates_batch = self + .lock_guard() + .await + .prepare_resync_membership_proofs(tip_hash, batch_idx, BATCH_SIZE) + .await?; + if updates_batch.len() == 0 { + // no more updates, we're done. + break; + } + + // write updates as quickly as possible with write-lock. + self.lock_guard_mut() + .await + .finalize_resync_membership_proofs(tip_hash, updates_batch) + .await?; + } + Ok(()) } pub async fn prune_abandoned_monitored_utxos( @@ -282,15 +519,6 @@ impl GlobalState { .await } - pub async fn get_latest_balance_height(&self) -> Option { - let (height, time_secs) = - time_fn_call_async(self.get_latest_balance_height_internal()).await; - - debug!("call to get_latest_balance_height() took {time_secs} seconds"); - - height - } - /// Retrieve block height of last change to wallet balance. /// /// note: this fn could be implemented as: @@ -305,7 +533,9 @@ impl GlobalState { /// Presently this is o(n) with the number of monitored utxos. /// if storage could keep track of latest spend utxo for the active /// tip, then this could be o(1). - async fn get_latest_balance_height_internal(&self) -> Option { + pub async fn get_latest_balance_height(&self) -> Option { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + let current_tip_digest = self.chain.light_state().hash(); let monitored_utxos = self.wallet_state.wallet_db.monitored_utxos(); @@ -362,6 +592,8 @@ impl GlobalState { /// Retrieve wallet balance history pub async fn get_balance_history(&self) -> Vec<(Digest, Timestamp, BlockHeight, NeptuneCoins)> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + let current_tip_digest = self.chain.light_state().hash(); let monitored_utxos = self.wallet_state.wallet_db.monitored_utxos(); @@ -846,6 +1078,8 @@ impl GlobalState { /// are not synced with a valid mutator set membership proof. And this corruption /// can only happen if the wallet database is deleted or corrupted. pub(crate) async fn restore_monitored_utxos_from_recovery_data(&mut self) -> Result<()> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + let tip_hash = self.chain.light_state().hash(); let ams_ref = &self.chain.archival_state().archival_mutator_set; @@ -959,16 +1193,49 @@ impl GlobalState { Ok(()) } - /// Locking: - /// * acquires `monitored_utxos_lock` for write - pub async fn resync_membership_proofs_from_stored_blocks( - &mut self, + /// prepare a batch of updates for re-syncing membership proofs + /// + /// concurrency and atomicity: + /// + /// The prepare/finalize pattern separates read ops from write ops so the + /// reads can be performed with either a read-lock or write-lock. + /// + /// The prepare/finalize pattern provides atomic read and atomic write with + /// improved concurrency however read+write is not atomic unless read+write + /// is performed within a single write-lock, which sacrifices concurrency. + /// + /// perf: this fn in o(n) with the number of monitored_utxos in the wallet. + /// + /// ```text + /// lengthy, i/o calls, foreach mutxo: + /// monitored_utxos.get() + /// ArchivalState::find_path() + /// ArchivalState::get_block() + /// called twice per block in a backwards loop along path + /// called twice per block in a forwards loop along path + /// + /// it should be possible to reduce this to 1 call per block + /// for each loop by storing prev block in a temp var. + /// ``` + /// + /// see + async fn prepare_resync_membership_proofs( + &self, tip_hash: Digest, - ) -> Result<()> { + batch_idx: u64, + batch_size: u64, + ) -> Result + Send> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + // loop over all monitored utxos - let monitored_utxos = self.wallet_state.wallet_db.monitored_utxos_mut(); + let monitored_utxos = self.wallet_state.wallet_db.monitored_utxos(); + + let mut mutxo_updates: BTreeMap = Default::default(); - 'outer: for i in 0..monitored_utxos.len().await { + let start = batch_idx * batch_size; + let end = std::cmp::min(start + batch_size, monitored_utxos.len().await); + + 'outer: for i in start..end { let i = i as Index; let monitored_utxo = monitored_utxos.get(i).await; @@ -1126,10 +1393,37 @@ impl GlobalState { // store updated membership proof monitored_utxo.add_membership_proof_for_tip(tip_hash, membership_proof); - // update storage. - monitored_utxos.set(i, monitored_utxo).await + mutxo_updates.insert(i, monitored_utxo); } + Ok(mutxo_updates.into_iter()) + } + + /// write/persist a list of [MonitoredUtxo]. + /// + /// see + /// + /// concurrency and atomicity: + /// + /// The prepare/finalize pattern separates read ops from write ops so the + /// reads can be performed with either a read-lock or write-lock. + /// + /// The prepare/finalize pattern provides atomic read and atomic write with + /// improved concurrency however read+write is not atomic unless read+write + /// is performed within a single write-lock, which sacrifices concurrency. + async fn finalize_resync_membership_proofs( + &mut self, + tip_hash: Digest, + monitored_utxos: impl IntoIterator + Send, + ) -> Result<()> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + + self.wallet_state + .wallet_db + .monitored_utxos_mut() + .set_many(monitored_utxos) + .await; + // Update sync label and persist self.wallet_state.wallet_db.set_sync_label(tip_hash).await; self.wallet_state.wallet_db.persist().await; @@ -1148,6 +1442,8 @@ impl GlobalState { &mut self, block_depth_threshhold: usize, ) -> Result { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + const MIN_BLOCK_DEPTH_FOR_MUTXO_PRUNING: usize = 10; if block_depth_threshhold < MIN_BLOCK_DEPTH_FOR_MUTXO_PRUNING { bail!( @@ -1243,139 +1539,93 @@ impl GlobalState { /// Update client's state with a new block. Block is assumed to be valid, also wrt. to PoW. /// The received block will be set as the new tip, regardless of its accumulated PoW. - pub async fn set_new_tip(&mut self, new_block: Block) -> Result<()> { - self.set_new_tip_internal(new_block, None).await + pub async fn set_new_tip_atomic(&mut self, new_block: Block) -> Result<()> { + self.set_new_tip_atomic_internal(new_block, None).await } /// Update client's state with a new block that was mined locally. Block is assumed to be valid, /// also wrt. to PoW. The received block will be set as the new tip, regardless of its /// accumulated PoW. - pub async fn set_new_self_mined_tip( + pub async fn set_new_self_mined_tip_atomic( &mut self, new_block: Block, coinbase_utxo_info: ExpectedUtxo, ) -> Result<()> { - self.set_new_tip_internal(new_block, Some(coinbase_utxo_info)) + self.set_new_tip_atomic_internal(new_block, Some(coinbase_utxo_info)) .await } /// Update client's state with a new block. Block is assumed to be valid, also wrt. to PoW. /// The received block will be set as the new tip, regardless of its accumulated PoW. or its /// validity. - async fn set_new_tip_internal( + async fn set_new_tip_atomic_internal( &mut self, new_block: Block, coinbase_utxo_info: Option, ) -> Result<()> { - // note: we make this fn internal so we can log its duration and ensure it will - // never be called directly by another fn, without the timings. - async fn set_new_tip_internal_worker( - myself: &mut GlobalState, - new_block: Block, - coinbase_utxo_info: Option, - ) -> Result<()> { - // Apply the updates - myself - .chain - .archival_state_mut() - .write_block_as_tip(&new_block) - .await?; + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); - // update the mutator set with the UTXOs from this block - myself - .chain - .archival_state_mut() - .update_mutator_set(&new_block) - .await - .expect("Updating mutator set must succeed"); + // Apply the updates + self.chain + .archival_state_mut() + .write_block_as_tip(&new_block) + .await?; - if let Some(coinbase_info) = coinbase_utxo_info { - // Notify wallet to expect the coinbase UTXO, as we mined this block - myself - .wallet_state - .add_expected_utxo(ExpectedUtxo::new( - coinbase_info.utxo, - coinbase_info.sender_randomness, - coinbase_info.receiver_preimage, - UtxoNotifier::OwnMiner, - )) - .await; - } + // update the mutator set with the UTXOs from this block + self.chain + .archival_state_mut() + .update_mutator_set_atomic(&new_block) + .await?; - // Get parent of tip for mutator-set data needed for various updates. Parent of the - // stored block will always exist since all blocks except the genesis block have a - // parent, and the genesis block is considered code, not data, so the genesis block - // will never be changed or updated through this method. - let tip_parent = myself - .chain + if let Some(coinbase_info) = coinbase_utxo_info { + // Notify wallet to expect the coinbase UTXO, as we mined this block + self.wallet_state + .add_expected_utxo(ExpectedUtxo::new( + coinbase_info.utxo, + coinbase_info.sender_randomness, + coinbase_info.receiver_preimage, + UtxoNotifier::OwnMiner, + )) + .await; + } + + // Get parent of tip for mutator-set data needed for various updates. Parent of the + // stored block will always exist since all blocks except the genesis block have a + // parent, and the genesis block is considered code, not data, so the genesis block + // will never be changed or updated through this method. + let tip_parent = + self.chain .archival_state() .get_tip_parent() .await - .expect("Parent must exist when storing a new block"); - - // Sanity check that must always be true for a valid block - assert_eq!( - tip_parent.hash(), - new_block.header().prev_block_digest, - "Tip parent has must match indicated parent hash" - ); - let previous_ms_accumulator = tip_parent.body().mutator_set_accumulator.clone(); - - // update wallet state with relevant UTXOs from this block - myself - .wallet_state - .update_wallet_state_with_new_block(&previous_ms_accumulator, &new_block) - .await?; - - // Update mempool with UTXOs from this block. This is done by removing all transaction - // that became invalid/was mined by this block. - myself - .mempool - .update_with_block(previous_ms_accumulator, &new_block) - .await; - - myself.chain.light_state_mut().set_block(new_block); + .ok_or(anyhow::anyhow!( + "Parent must exist when storing a new block" + ))?; - // Flush databases - myself.flush_databases().await?; - - Ok(()) - } - - crate::macros::duration_async_info!(set_new_tip_internal_worker( - self, - new_block, - coinbase_utxo_info - )) - } + // Sanity check that must always be true for a valid block + assert_eq!( + tip_parent.hash(), + new_block.header().prev_block_digest, + "Tip parent has must match indicated parent hash" + ); + let previous_ms_accumulator = tip_parent.body().mutator_set_accumulator.clone(); - /// resync membership proofs - pub async fn resync_membership_proofs(&mut self) -> Result<()> { - // Do not fix memberhip proofs if node is in sync mode, as we would otherwise - // have to sync many times, instead of just *one* time once we have caught up. - if self.net.syncing { - debug!("Not syncing MS membership proofs because we are syncing"); - return Ok(()); - } + // update wallet state with relevant UTXOs from this block + self.wallet_state + .update_wallet_state_with_new_block(&previous_ms_accumulator, &new_block) + .await?; - // is it necessary? - let current_tip_digest = self.chain.light_state().hash(); - if self.wallet_state.is_synced_to(current_tip_digest).await { - debug!("Membership proof syncing not needed"); - return Ok(()); - } + // Update mempool with UTXOs from this block. This is done by removing all transaction + // that became invalid/was mined by this block. + self.mempool + .update_with_block(previous_ms_accumulator, &new_block); - // do we have blocks? - if self.chain.is_archival_node() { - return self - .resync_membership_proofs_from_stored_blocks(current_tip_digest) - .await; - } + self.chain.light_state_mut().set_block(new_block); - // request blocks from peers - todo!("We don't yet support non-archival nodes"); + // Flush databases + self.flush_databases().await?; - // Ok(()) + Ok(()) } #[inline] @@ -1700,8 +1950,12 @@ mod global_state_tests { ); // Call resync + let mutxo_updates = global_state + .prepare_resync_membership_proofs(mock_block_1a.hash(), 0, 1000) + .await + .unwrap(); global_state - .resync_membership_proofs_from_stored_blocks(mock_block_1a.hash()) + .finalize_resync_membership_proofs(mock_block_1a.hash(), mutxo_updates) .await .unwrap(); @@ -1740,7 +1994,7 @@ mod global_state_tests { let (mock_block_1a, coinbase_utxo, coinbase_output_randomness) = make_mock_block(&genesis_block, None, own_receiving_address, rng.gen()); global_state - .set_new_self_mined_tip( + .set_new_self_mined_tip_atomic( mock_block_1a.clone(), ExpectedUtxo::new( coinbase_utxo, @@ -1768,18 +2022,21 @@ mod global_state_tests { for _ in 0..5 { let (next_block, _, _) = make_mock_block(&parent_block, None, other_receiving_address, rng.gen()); - global_state.set_new_tip(next_block.clone()).await.unwrap(); + global_state + .set_new_tip_atomic(next_block.clone()) + .await + .unwrap(); parent_block = next_block; } + drop(global_state); // Call resync which fails to sync the UTXO that was abandoned when block 1a was abandoned - global_state - .resync_membership_proofs_from_stored_blocks(parent_block.hash()) - .await - .unwrap(); + global_state_lock.resync_membership_proofs().await.unwrap(); + + let gs = global_state_lock.lock_guard().await; // Verify that one MUTXO is unsynced, and that 1 (from genesis) is synced - let wallet_status_after_forking = global_state + let wallet_status_after_forking = gs .wallet_state .get_wallet_status_from_lock(parent_block.hash()) .await; @@ -1788,19 +2045,19 @@ mod global_state_tests { // Verify that the MUTXO from block 1a is considered abandoned, and that the one from // genesis block is not. - let monitored_utxos = global_state.wallet_state.wallet_db.monitored_utxos(); + let monitored_utxos = gs.wallet_state.wallet_db.monitored_utxos(); assert!( !monitored_utxos .get(0) .await - .was_abandoned(parent_block.hash(), global_state.chain.archival_state()) + .was_abandoned(parent_block.hash(), gs.chain.archival_state()) .await ); assert!( monitored_utxos .get(1) .await - .was_abandoned(parent_block.hash(), global_state.chain.archival_state()) + .was_abandoned(parent_block.hash(), gs.chain.archival_state()) .await ); @@ -1830,7 +2087,7 @@ mod global_state_tests { make_mock_block(&genesis_block, None, own_receiving_address, rng.gen()); { global_state - .set_new_self_mined_tip( + .set_new_self_mined_tip_atomic( mock_block_1a.clone(), ExpectedUtxo::new( coinbase_utxo_1a, @@ -1856,7 +2113,7 @@ mod global_state_tests { let (next_a_block, _, _) = make_mock_block(&fork_a_block, None, other_receiving_address, rng.gen()); global_state - .set_new_tip(next_a_block.clone()) + .set_new_tip_atomic(next_a_block.clone()) .await .unwrap(); fork_a_block = next_a_block; @@ -1876,7 +2133,7 @@ mod global_state_tests { let (next_b_block, _, _) = make_mock_block(&fork_b_block, None, other_receiving_address, rng.gen()); global_state - .set_new_tip(next_b_block.clone()) + .set_new_tip_atomic(next_b_block.clone()) .await .unwrap(); fork_b_block = next_b_block; @@ -1896,12 +2153,13 @@ mod global_state_tests { wallet_status_on_b_fork_before_resync.unsynced_unspent.len() ); + drop(global_state); + // Run the resync and verify that MPs are synced - global_state - .resync_membership_proofs_from_stored_blocks(fork_b_block.hash()) - .await - .unwrap(); - let wallet_status_on_b_fork_after_resync = global_state + global_state_lock.resync_membership_proofs().await.unwrap(); + + let mut gsm = global_state_lock.lock_guard_mut().await; + let wallet_status_on_b_fork_after_resync = gsm .wallet_state .get_wallet_status_from_lock(fork_b_block.hash()) .await; @@ -1918,15 +2176,12 @@ mod global_state_tests { for _ in 0..100 { let (next_c_block, _, _) = make_mock_block(&fork_c_block, None, other_receiving_address, rng.gen()); - global_state - .set_new_tip(next_c_block.clone()) - .await - .unwrap(); + gsm.set_new_tip_atomic(next_c_block.clone()).await.unwrap(); fork_c_block = next_c_block; } // Verify that there are zero MUTXOs with synced MPs - let wallet_status_on_c_fork_before_resync = global_state + let wallet_status_on_c_fork_before_resync = gsm .wallet_state .get_wallet_status_from_lock(fork_c_block.hash()) .await; @@ -1938,14 +2193,13 @@ mod global_state_tests { 2, wallet_status_on_c_fork_before_resync.unsynced_unspent.len() ); + drop(gsm); // Run the resync and verify that UTXO from genesis is synced, but that // UTXO from 1a is not synced. - global_state - .resync_membership_proofs_from_stored_blocks(fork_c_block.hash()) - .await - .unwrap(); - let wallet_status_on_c_fork_after_resync = global_state + global_state_lock.resync_membership_proofs().await.unwrap(); + let gs = global_state_lock.lock_guard().await; + let wallet_status_on_c_fork_after_resync = gs .wallet_state .get_wallet_status_from_lock(fork_c_block.hash()) .await; @@ -1956,19 +2210,19 @@ mod global_state_tests { ); // Also check that UTXO from 1a is considered abandoned - let monitored_utxos = global_state.wallet_state.wallet_db.monitored_utxos(); + let monitored_utxos = gs.wallet_state.wallet_db.monitored_utxos(); assert!( !monitored_utxos .get(0) .await - .was_abandoned(fork_c_block.hash(), global_state.chain.archival_state()) + .was_abandoned(fork_c_block.hash(), gs.chain.archival_state()) .await ); assert!( monitored_utxos .get(1) .await - .was_abandoned(fork_c_block.hash(), global_state.chain.archival_state()) + .was_abandoned(fork_c_block.hash(), gs.chain.archival_state()) .await ); @@ -2138,7 +2392,7 @@ mod global_state_tests { genesis_state_lock .lock_guard_mut() .await - .set_new_self_mined_tip( + .set_new_self_mined_tip_atomic( block_1.clone(), ExpectedUtxo::new( cb_utxo, @@ -2152,7 +2406,7 @@ mod global_state_tests { for state_lock in [&mut alice_state_lock, &mut bob_state_lock] { let mut state = state_lock.lock_guard_mut().await; - state.set_new_tip(block_1.clone()).await.unwrap(); + state.set_new_tip_atomic(block_1.clone()).await.unwrap(); } assert_eq!( @@ -2305,7 +2559,7 @@ mod global_state_tests { let (block_1, _cb_utxo, _cb_output_randomness) = make_mock_block_with_valid_pow(&genesis_block, None, receiving_address, rng.gen()); - global_state.set_new_tip(block_1).await.unwrap(); + global_state.set_new_tip_atomic(block_1).await.unwrap(); assert!(global_state .chain @@ -2496,7 +2750,7 @@ mod global_state_tests { .await; // alice's node learns of the new block. - alice_state_mut.set_new_tip(block_1.clone()).await?; + alice_state_mut.set_new_tip_atomic(block_1.clone()).await?; // alice should have 2 monitored utxos. assert_eq!( @@ -2533,7 +2787,7 @@ mod global_state_tests { let mut bob_state_mut = bob_state_lock.lock_guard_mut().await; // bob's node adds block1 to the chain. - bob_state_mut.set_new_tip(block_1.clone()).await?; + bob_state_mut.set_new_tip_atomic(block_1.clone()).await?; // Now Bob should have a balance of 20, from Alice assert_eq!( @@ -2568,7 +2822,7 @@ mod global_state_tests { assert_eq!(alice_initial_balance, 20000u32.into()); // now alice must replay old blocks. (there's only one so far) - alice_state_mut.set_new_tip(block_1).await?; + alice_state_mut.set_new_tip_atomic(block_1).await?; // Now alice should have a balance of 19979. // 20000 from premine - 21 (20 to Bob + 1 fee) diff --git a/src/models/state/wallet/mod.rs b/src/models/state/wallet/mod.rs index 96e8c34ee..f43c2a434 100644 --- a/src/models/state/wallet/mod.rs +++ b/src/models/state/wallet/mod.rs @@ -962,7 +962,7 @@ mod wallet_tests { .await; } premine_receiver_global_state - .set_new_tip(block_1.clone()) + .set_new_tip_atomic(block_1.clone()) .await .unwrap(); @@ -1026,7 +1026,7 @@ mod wallet_tests { ) .await?; premine_receiver_global_state - .set_new_tip(next_block.clone()) + .set_new_tip_atomic(next_block.clone()) .await .unwrap(); } @@ -1097,7 +1097,7 @@ mod wallet_tests { ) .await?; premine_receiver_global_state - .set_new_tip(block_2_b.clone()) + .set_new_tip_atomic(block_2_b.clone()) .await .unwrap(); premine_receiver_global_state diff --git a/src/models/state/wallet/wallet_state.rs b/src/models/state/wallet/wallet_state.rs index 9865a5f22..85b95652e 100644 --- a/src/models/state/wallet/wallet_state.rs +++ b/src/models/state/wallet/wallet_state.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeMap; use std::collections::HashMap; use std::error::Error; use std::fmt::Debug; @@ -5,6 +6,8 @@ use std::path::PathBuf; use anyhow::bail; use anyhow::Result; +use async_stream::stream; +use futures::Stream; use itertools::Itertools; use num_traits::Zero; use serde_derive::Deserialize; @@ -25,6 +28,7 @@ use twenty_first::util_types::algebraic_hasher::AlgebraicHasher; use crate::config_models::cli_args::Args; use crate::config_models::data_directory::DataDirectory; use crate::database::storage::storage_schema::traits::*; +use crate::database::storage::storage_schema::DbtVec; use crate::database::storage::storage_vec::{traits::*, Index}; use crate::database::NeptuneLevelDb; use crate::models::blockchain::block::Block; @@ -224,14 +228,21 @@ impl WalletState { } } - // note: this will write modified state to disk. - wallet_state - .update_wallet_state_with_new_block( + // generate updates. (read-only) + if let Some(updates) = wallet_state + .prepare_update_wallet_state_with_new_block( &MutatorSetAccumulator::default(), &Block::genesis_block(cli_args.network), ) .await - .expect("Updating wallet state with genesis block must succeed"); + .expect("generating wallet state updates must succeed") + { + // write modified state to disk. + wallet_state + .finalize_update_wallet_state_with_new_block(updates) + .await + .expect("Updating wallet state with genesis block must succeed"); + } } wallet_state @@ -250,6 +261,8 @@ impl WalletState { &self, transaction: &Transaction, ) -> Vec<(Utxo, AbsoluteIndexSet, u64)> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + let confirmed_absolute_index_sets = transaction .kernel .inputs @@ -283,6 +296,8 @@ impl WalletState { &'a self, transaction: &'a Transaction, ) -> impl Iterator + 'a { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + // scan for announced utxos for every known key of every key type. self.get_all_known_spending_keys() .into_iter() @@ -309,13 +324,15 @@ impl WalletState { /// n = number of ExpectedUtxo in database. (all-time) /// m = number of transaction outputs. /// - /// see https://github.com/Neptune-Crypto/neptune-core/pull/175#issuecomment-2302511025 + /// see /// /// Returns an iterator of [AnnouncedUtxo]. (addition record, UTXO, sender randomness, receiver_preimage) pub async fn scan_for_expected_utxos<'a>( &'a self, transaction: &'a Transaction, ) -> impl Iterator + 'a { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + let expected_utxos = self.wallet_db.expected_utxos().get_all().await; let eu_map: HashMap<_, _> = expected_utxos .into_iter() @@ -353,6 +370,8 @@ impl WalletState { /// So it is implemented by clearing all ExpectedUtxo from DB and /// adding back those that are not stale. pub async fn prune_stale_expected_utxos(&mut self) { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + // prune un-received ExpectedUtxo after 28 days in secs const UNRECEIVED_UTXO_SECS: u64 = 28 * 24 * 60 * 60; @@ -392,6 +411,8 @@ impl WalletState { // returns Some(SpendingKey) if the utxo can be unlocked by one of the known // wallet keys. pub fn find_spending_key_for_utxo(&self, utxo: &Utxo) -> Option { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + self.get_all_known_spending_keys() .into_iter() .find(|k| k.to_address().lock_script().hash() == utxo.lock_script_hash) @@ -399,6 +420,8 @@ impl WalletState { /// returns all spending keys of all key types with derivation index less than current counter pub fn get_all_known_spending_keys(&self) -> Vec { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + KeyType::all_types() .into_iter() .flat_map(|key_type| self.get_known_spending_keys(key_type)) @@ -479,13 +502,34 @@ impl WalletState { self.wallet_secret.nth_symmetric_key(0) } - /// Update wallet state with new block. Assume the given block - /// is valid and that the wallet state is not up to date yet. - pub async fn update_wallet_state_with_new_block( - &mut self, + /// prepares to update wallet state with new block. + /// + /// This immutable method returns a [WalletBlockUpdate] record that must be + /// applied with [Self::finalize_update_wallet_state_with_new_block()]. + /// + /// note: assumes the given block is valid and that the wallet state is not + /// up to date yet. + /// + /// concurrency and atomicity: + /// + /// The prepare/finalize pattern separates read ops from write ops so the + /// reads can be performed with either a read-lock or write-lock. + /// + /// The prepare/finalize pattern provides atomic read and atomic write with + /// improved concurrency however read+write is not atomic unless read+write + /// is performed within a single write-lock, which sacrifices concurrency. + pub(crate) async fn prepare_update_wallet_state_with_new_block( + &self, current_mutator_set_accumulator: &MutatorSetAccumulator, new_block: &Block, - ) -> Result<()> { + ) -> Result> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + + // make a cache for local processing of monitored-utxos. + // (writes are local, reads fetch from db if needed) + let mut mutxo_cache = + MonitoredUtxoReadOnlyCache::new(self.wallet_db.monitored_utxos()).await; + let transaction: Transaction = new_block.kernel.body.transaction.clone(); let spent_inputs: Vec<(Utxo, AbsoluteIndexSet, u64)> = @@ -522,16 +566,15 @@ impl WalletState { // the process update existing membership proofs with // updates from this block - let monitored_utxos = self.wallet_db.monitored_utxos_mut(); let mut incoming_utxo_recovery_data_list = vec![]; // return early if there are no monitored utxos and this // block does not affect our balance if spent_inputs.is_empty() && addition_record_to_utxo_info.is_empty() - && monitored_utxos.is_empty().await + && mutxo_cache.is_empty() { - return Ok(()); + return Ok(None); } // Find the membership proofs that were valid at the previous tip. They have @@ -542,7 +585,7 @@ impl WalletState { > = HashMap::default(); { - let stream = monitored_utxos.stream().await; + let stream = mutxo_cache.stream().await; pin_mut!(stream); // needed for iteration while let Some((i, monitored_utxo)) = stream.next().await { @@ -657,7 +700,7 @@ impl WalletState { }; incoming_utxo_recovery_data_list.push(utxo_ms_recovery_data); - let mutxos_len = monitored_utxos.len().await; + let mutxos_len = mutxo_cache.len(); valid_membership_proofs_and_own_utxo_count.insert( StrongUtxoKey::new( @@ -674,7 +717,7 @@ impl WalletState { new_block.kernel.header.timestamp, new_block.kernel.header.height, )); - monitored_utxos.push(mutxo).await; + mutxo_cache.push(mutxo); } // Update mutator set to bring it to the correct state for the next call to batch-update @@ -683,7 +726,7 @@ impl WalletState { // sanity check { - let stream = monitored_utxos.stream_values().await; + let stream = mutxo_cache.stream_values().await; pin_mut!(stream); // needed for iteration let mutxo_with_valid_mps = stream @@ -710,6 +753,7 @@ impl WalletState { new_block.kernel.body.transaction.kernel.inputs.len() ); let mut block_tx_input_count: usize = 0; + while let Some(removal_record) = removal_records.pop() { let res = MsMembershipProof::batch_update_from_remove( &mut valid_membership_proofs_and_own_utxo_count @@ -741,13 +785,13 @@ impl WalletState { block_tx_input_count ); - let mut spent_mutxo = monitored_utxos.get(*mutxo_list_index).await; + let mut spent_mutxo = mutxo_cache.get(*mutxo_list_index).await; spent_mutxo.spent_in_block = Some(( new_block.hash(), new_block.kernel.header.timestamp, new_block.kernel.header.height, )); - monitored_utxos.set(*mutxo_list_index, spent_mutxo).await; + mutxo_cache.set(*mutxo_list_index, spent_mutxo); } } @@ -769,7 +813,7 @@ impl WalletState { debug!("Number of mutated membership proofs: {}", changed_mps.len()); let num_unspent_utxos = { - let stream = monitored_utxos.stream_values().await; + let stream = mutxo_cache.stream_values().await; pin_mut!(stream); // needed for iteration stream @@ -784,7 +828,7 @@ impl WalletState { valid_membership_proofs_and_own_utxo_count.iter() { let StrongUtxoKey { utxo_digest, .. } = strong_utxo_key; - let mut monitored_utxo = monitored_utxos.get(*own_utxo_index).await; + let mut monitored_utxo = mutxo_cache.get(*own_utxo_index).await; monitored_utxo.add_membership_proof_for_tip(new_block.hash(), updated_ms_mp.to_owned()); // Sanity check that membership proofs of non-spent transactions are still valid @@ -793,7 +837,7 @@ impl WalletState { || msa_state.verify(utxo_digest, updated_ms_mp) ); - monitored_utxos.set(*own_utxo_index, monitored_utxo).await; + mutxo_cache.set(*own_utxo_index, monitored_utxo); // TODO: What if a newly added transaction replaces a transaction that was in another fork? // How do we ensure that this transaction is not counted twice? @@ -801,20 +845,15 @@ impl WalletState { // Another option is to attempt to mark those abandoned monitored UTXOs as reorganized. } - // write these to disk. - for item in incoming_utxo_recovery_data_list.into_iter() { - self.store_utxo_ms_recovery_data(item).await?; - } - // Mark all expected UTXOs that were received in this block as received - let updates = self + let expected_utxos = self .wallet_db .expected_utxos() .get_all() .await .into_iter() .enumerate() - .filter(|(_, eu)| { + .filter(move |(_, eu)| { offchain_received_outputs .iter() .any(|au| au.addition_record == eu.addition_record) @@ -822,15 +861,106 @@ impl WalletState { .map(|(idx, mut eu)| { eu.mined_in_block = Some((new_block.hash(), new_block.kernel.header.timestamp)); (idx as Index, eu) - }); - self.wallet_db.expected_utxos_mut().set_many(updates).await; + }) + .collect_vec(); - self.wallet_db.set_sync_label(new_block.hash()).await; + let (mutxo_updates, mutxo_additions) = mutxo_cache.into_updates_and_additions(); + Ok(Some(WalletBlockUpdate { + new_block_hash: new_block.hash(), + incoming_utxo_recovery_data_list, + mutxo_updates, + mutxo_additions, + expected_utxos, + })) + } + + /// updates wallet state with new block. + /// + /// This mutable method applies updates generated by + /// [Self::prepare_update_wallet_state_with_new_block()]. + /// + /// note: assumes the input data is valid/correct. + /// + /// concurrency and atomicity: + /// + /// The prepare/finalize pattern separates read ops from write ops so the + /// reads can be performed with either a read-lock or write-lock. + /// + /// The prepare/finalize pattern provides atomic read and atomic write with + /// improved concurrency however read+write is not atomic unless read+write + /// is performed within a single write-lock, which sacrifices concurrency. + pub(crate) async fn finalize_update_wallet_state_with_new_block( + &mut self, + updates: WalletBlockUpdate, + ) -> Result<()> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + + let WalletBlockUpdate { + new_block_hash, + incoming_utxo_recovery_data_list, + mutxo_updates, + mutxo_additions, + expected_utxos, + } = updates; + + // write these to disk. + for item in incoming_utxo_recovery_data_list { + self.store_utxo_ms_recovery_data(item).await?; + } + + // add new mutxos + for (_, mutxo) in mutxo_additions { + self.wallet_db.monitored_utxos_mut().push(mutxo).await; + } + + // update existing mutxos + self.wallet_db + .monitored_utxos_mut() + .set_many(mutxo_updates) + .await; + + // update expected utxos + self.wallet_db + .expected_utxos_mut() + .set_many(expected_utxos) + .await; + + self.wallet_db.set_sync_label(new_block_hash).await; self.wallet_db.persist().await; Ok(()) } + /// updates wallet state with new block. + /// + /// This mutable method is a wrapper for [Self::prepare_update_wallet_state_with_new_block()] + /// and [Self::finalize_update_wallet_state_with_new_block()]. + /// + /// concurrency and atomicity: + /// + /// This method is atomic over read+write. It is not concurrent. + /// + /// Calling this method requires the caller to hold the global write-lock + /// for both the lengthy read operations and the shorter write operations. + /// All other tasks that read/write app state are blocked while this + /// processes. + pub(crate) async fn update_wallet_state_with_new_block( + &mut self, + current_mutator_set_accumulator: &MutatorSetAccumulator, + new_block: &Block, + ) -> Result<()> { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + + if let Some(updates) = self + .prepare_update_wallet_state_with_new_block(current_mutator_set_accumulator, new_block) + .await? + { + self.finalize_update_wallet_state_with_new_block(updates) + .await?; + } + Ok(()) + } + pub async fn is_synced_to(&self, tip_hash: Digest) -> bool { let db_sync_digest = self.wallet_db.get_sync_label().await; if db_sync_digest != tip_hash { @@ -850,6 +980,8 @@ impl WalletState { } pub async fn get_wallet_status_from_lock(&self, tip_digest: Digest) -> WalletStatus { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + let monitored_utxos = self.wallet_db.monitored_utxos(); let mut synced_unspent = vec![]; let mut unsynced_unspent = vec![]; @@ -901,6 +1033,8 @@ impl WalletState { tip_digest: Digest, timestamp: Timestamp, ) -> Result { + let _ = crate::ScopeDurationLogger::new(&crate::macros::fn_name!()); + // We only attempt to generate a transaction using those UTXOs that have up-to-date // membership proofs. let wallet_status = self.get_wallet_status_from_lock(tip_digest).await; @@ -991,6 +1125,131 @@ impl WalletState { } } +/// represents updates to apply to wallet state for a new block +pub(crate) struct WalletBlockUpdate { + new_block_hash: Digest, + incoming_utxo_recovery_data_list: Vec, + mutxo_updates: BTreeMap, + mutxo_additions: BTreeMap, + expected_utxos: Vec<(Index, ExpectedUtxo)>, +} + +/// A read-only wrapper for DbtVec. +/// +/// This type enables code to work with `monitored_utxos` such that reads query +/// the database if necessary but modifications do not modify DB. +/// +/// The purpose is to facilitate the prepare/finalize pattern in which length +/// read ops are performed (with read-lock) to generate a set of updates which +/// can be quickly applied (with write lock) +/// +/// Built for `WalletState::prepare_update_wallet_state_with_new_block()`. Only +/// impls a subset of DbtVec methods that are used by that fn. +/// +/// When using this type all read+writes in a logical operation must be +/// performed with this API. Mixing with reads/writes directly from the DB may +/// result in inconsistent data. +/// +/// When finished with processing Updates and Additions can be queried in order +/// to write them to DB in batch(es). +struct MonitoredUtxoReadOnlyCache<'a> { + mutxos: &'a DbtVec, + len: Index, + updates: BTreeMap, + additions: BTreeMap, +} + +impl<'a> MonitoredUtxoReadOnlyCache<'a> { + /// create new instance + pub async fn new(mutxos: &'a DbtVec) -> Self { + Self { + mutxos, + len: mutxos.len().await, + updates: Default::default(), + additions: Default::default(), + } + } + + /// returns true if empty + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// returns count of [MonitoredUtxo] + pub fn len(&self) -> Index { + self.len + } + + /// retrieves element matching `index` + pub async fn get(&self, index: Index) -> MonitoredUtxo { + if let Some(v) = self.additions.get(&index) { + v.to_owned() + } else if let Some(v) = self.updates.get(&index) { + v.to_owned() + } else { + self.mutxos.get(index).await + } + } + + /// append element + pub fn push(&mut self, mutxo: MonitoredUtxo) { + self.additions.insert(self.len, mutxo); + self.len += 1; + } + + /// overwrite element at `index` + /// + /// returned value will reflect any local changes (not yet written to DB) + pub fn set(&mut self, index: Index, mutxo: MonitoredUtxo) { + use std::collections::btree_map::Entry::Vacant; + if let Vacant(_) = self + .additions + .entry(index) + .and_modify(|m| *m = mutxo.clone()) + { + if index >= self.len - self.additions.len() as Index { + panic!("index out of range"); + } + self.updates + .entry(index) + .and_modify(|m| *m = mutxo.clone()) + .or_insert(mutxo); + } + } + + /// returns async [Stream] of ([Index], [MonitoredUtxo]) + /// + /// values will reflect any local changes (not yet written to DB) + pub async fn stream(&self) -> impl Stream + '_ { + self.mutxos + .stream() + .await + .map(|(i, v)| (i, self.updates.get(&i).unwrap_or(&v).to_owned())) + .chain(stream! { + for (i, v) in self.additions.iter() { + yield (*i, v.to_owned()) + } + }) + } + + /// returns async [Stream] of [MonitoredUtxo] + /// + /// values will reflect any local changes (not yet written to DB) + pub async fn stream_values(&self) -> impl Stream + '_ { + self.stream().await.map(|(_, v)| v) + } + + /// convert into tuple of modified data: (updates, additions) + pub fn into_updates_and_additions( + self, + ) -> ( + BTreeMap, + BTreeMap, + ) { + (self.updates, self.additions) + } +} + #[cfg(test)] mod tests { use num_traits::One; @@ -1099,7 +1358,7 @@ mod tests { rng.gen(), ); own_global_state - .set_new_self_mined_tip( + .set_new_self_mined_tip_atomic( block_3a, ExpectedUtxo::new( block_3a_coinbase_utxo, @@ -1142,7 +1401,7 @@ mod tests { let (block_3b, _block_3b_coinbase_utxo, _block_3b_coinbase_sender_randomness) = make_mock_block(&latest_block, None, other_recipient_address, rng.gen()); own_global_state - .set_new_tip(block_3b.clone()) + .set_new_tip_atomic(block_3b.clone()) .await .unwrap(); @@ -1173,7 +1432,7 @@ mod tests { let (new_block, _new_block_coinbase_utxo, _new_block_coinbase_sender_randomness) = make_mock_block(&latest_block, None, other_recipient_address, rng.gen()); own_global_state - .set_new_tip(new_block.clone()) + .set_new_tip_atomic(new_block.clone()) .await .unwrap(); @@ -1205,7 +1464,7 @@ mod tests { let (block_12, _, _) = make_mock_block(&latest_block, None, other_recipient_address, rng.gen()); own_global_state - .set_new_tip(block_12.clone()) + .set_new_tip_atomic(block_12.clone()) .await .unwrap(); diff --git a/src/peer_loop.rs b/src/peer_loop.rs index a7a4a0d34..dfae13434 100644 --- a/src/peer_loop.rs +++ b/src/peer_loop.rs @@ -1493,7 +1493,7 @@ mod peer_loop_tests { .to_address(); let (block_1, _, _) = make_mock_block_with_valid_pow(&genesis_block, None, a_recipient_address, rng.gen()); - global_state_mut.set_new_tip(block_1.clone()).await?; + global_state_mut.set_new_tip_atomic(block_1.clone()).await?; drop(global_state_mut); let mock_peer_messages = Mock::new(vec![ @@ -1566,11 +1566,19 @@ mod peer_loop_tests { let (block_3_b, _, _) = make_mock_block_with_valid_pow(&block_2_b, None, a_recipient_address, rng.gen()); - global_state_mut.set_new_tip(block_1.clone()).await?; - global_state_mut.set_new_tip(block_2_a.clone()).await?; - global_state_mut.set_new_tip(block_2_b.clone()).await?; - global_state_mut.set_new_tip(block_3_b.clone()).await?; - global_state_mut.set_new_tip(block_3_a.clone()).await?; + global_state_mut.set_new_tip_atomic(block_1.clone()).await?; + global_state_mut + .set_new_tip_atomic(block_2_a.clone()) + .await?; + global_state_mut + .set_new_tip_atomic(block_2_b.clone()) + .await?; + global_state_mut + .set_new_tip_atomic(block_3_b.clone()) + .await?; + global_state_mut + .set_new_tip_atomic(block_3_a.clone()) + .await?; drop(global_state_mut); @@ -1658,11 +1666,19 @@ mod peer_loop_tests { let (block_3_b, _, _) = make_mock_block_with_valid_pow(&block_2_b, None, a_recipient_address, rng.gen()); - global_state_mut.set_new_tip(block_1.clone()).await?; - global_state_mut.set_new_tip(block_2_a.clone()).await?; - global_state_mut.set_new_tip(block_2_b.clone()).await?; - global_state_mut.set_new_tip(block_3_b.clone()).await?; - global_state_mut.set_new_tip(block_3_a.clone()).await?; + global_state_mut.set_new_tip_atomic(block_1.clone()).await?; + global_state_mut + .set_new_tip_atomic(block_2_a.clone()) + .await?; + global_state_mut + .set_new_tip_atomic(block_2_b.clone()) + .await?; + global_state_mut + .set_new_tip_atomic(block_3_b.clone()) + .await?; + global_state_mut + .set_new_tip_atomic(block_3_a.clone()) + .await?; drop(global_state_mut); @@ -1724,11 +1740,19 @@ mod peer_loop_tests { let (block_3_b, _, _) = make_mock_block_with_valid_pow(&block_2_b, None, a_recipient_address, rng.gen()); - global_state_mut.set_new_tip(block_1.clone()).await?; - global_state_mut.set_new_tip(block_2_a.clone()).await?; - global_state_mut.set_new_tip(block_2_b.clone()).await?; - global_state_mut.set_new_tip(block_3_b.clone()).await?; - global_state_mut.set_new_tip(block_3_a.clone()).await?; + global_state_mut.set_new_tip_atomic(block_1.clone()).await?; + global_state_mut + .set_new_tip_atomic(block_2_a.clone()) + .await?; + global_state_mut + .set_new_tip_atomic(block_2_b.clone()) + .await?; + global_state_mut + .set_new_tip_atomic(block_3_b.clone()) + .await?; + global_state_mut + .set_new_tip_atomic(block_3_a.clone()) + .await?; drop(global_state_mut); @@ -1951,7 +1975,7 @@ mod peer_loop_tests { own_recipient_address, rng.gen(), ); - global_state_mut.set_new_tip(block_1.clone()).await?; + global_state_mut.set_new_tip_atomic(block_1.clone()).await?; drop(global_state_mut); @@ -2039,7 +2063,7 @@ mod peer_loop_tests { make_mock_block_with_valid_pow(&block_2.clone(), None, a_recipient_address, rng.gen()); let (block_4, _, _) = make_mock_block_with_valid_pow(&block_3.clone(), None, a_recipient_address, rng.gen()); - global_state_mut.set_new_tip(block_1.clone()).await?; + global_state_mut.set_new_tip_atomic(block_1.clone()).await?; drop(global_state_mut); let mock = Mock::new(vec![ @@ -2215,7 +2239,7 @@ mod peer_loop_tests { make_mock_block_with_valid_pow(&block_3.clone(), None, a_recipient_address, rng.gen()); let (block_5, _, _) = make_mock_block_with_valid_pow(&block_4.clone(), None, a_recipient_address, rng.gen()); - global_state_mut.set_new_tip(block_1.clone()).await?; + global_state_mut.set_new_tip_atomic(block_1.clone()).await?; drop(global_state_mut); let mock = Mock::new(vec![ @@ -2329,7 +2353,7 @@ mod peer_loop_tests { make_mock_block_with_valid_pow(&block_2.clone(), None, a_recipient_address, rng.gen()); let (block_4, _, _) = make_mock_block_with_valid_pow(&block_3.clone(), None, a_recipient_address, rng.gen()); - global_state_mut.set_new_tip(block_1.clone()).await?; + global_state_mut.set_new_tip_atomic(block_1.clone()).await?; drop(global_state_mut); let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1).await; diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 158afcbf5..848768eed 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1423,7 +1423,7 @@ mod rpc_server_tests { state_lock .lock_guard_mut() .await - .set_new_self_mined_tip( + .set_new_self_mined_tip_atomic( block_1, ExpectedUtxo::new( cb_utxo, diff --git a/src/tests/shared.rs b/src/tests/shared.rs index 33b6533b8..9e483e424 100644 --- a/src/tests/shared.rs +++ b/src/tests/shared.rs @@ -280,7 +280,10 @@ pub async fn add_block_to_archival_state( ) -> Result<()> { archival_state.write_block_as_tip(&new_block).await?; - archival_state.update_mutator_set(&new_block).await.unwrap(); + archival_state + .update_mutator_set_atomic(&new_block) + .await + .unwrap(); Ok(()) } diff --git a/src/util_types/mutator_set/archival_mutator_set.rs b/src/util_types/mutator_set/archival_mutator_set.rs index 6f26b4792..6f430bafb 100644 --- a/src/util_types/mutator_set/archival_mutator_set.rs +++ b/src/util_types/mutator_set/archival_mutator_set.rs @@ -296,7 +296,7 @@ where /// Determine whether the given `AdditionRecord` can be reversed. /// Equivalently, determine if it was added last. - pub async fn add_is_reversible(&mut self, addition_record: &AdditionRecord) -> bool { + pub async fn add_is_reversible(&self, addition_record: &AdditionRecord) -> bool { let leaf_index = self.aocl.count_leaves().await - 1; let digest = self.aocl.get_leaf_async(leaf_index).await; addition_record.canonical_commitment == digest diff --git a/src/util_types/mutator_set/removal_record.rs b/src/util_types/mutator_set/removal_record.rs index 076057874..145a62ce6 100644 --- a/src/util_types/mutator_set/removal_record.rs +++ b/src/util_types/mutator_set/removal_record.rs @@ -37,7 +37,7 @@ use super::shared::NUM_TRIALS; use super::MutatorSetError; #[derive(Debug, Clone, PartialEq, Eq, BFieldCodec, Arbitrary)] -pub struct AbsoluteIndexSet([u128; NUM_TRIALS as usize]); +pub struct AbsoluteIndexSet([u128; NUM_TRIALS as usize]); // 720 bytes impl GetSize for AbsoluteIndexSet { fn get_stack_size() -> usize { @@ -167,7 +167,7 @@ impl<'de> Deserialize<'de> for AbsoluteIndexSet { Clone, Debug, Deserialize, Serialize, PartialEq, Eq, GetSize, BFieldCodec, TasmObject, Arbitrary, )] pub struct RemovalRecord { - pub absolute_indices: AbsoluteIndexSet, + pub absolute_indices: AbsoluteIndexSet, // 720 bytes pub target_chunks: ChunkDictionary, } @@ -203,16 +203,11 @@ impl RemovalRecord { // Collect all indices for all removal records that are being updated let mut chunk_index_to_rr_index: HashMap> = HashMap::new(); removal_records.iter().enumerate().for_each(|(i, rr)| { - let indices = &rr.absolute_indices; - let chunks_set: HashSet = indices + rr.absolute_indices .to_array() .iter() .map(|x| (x / CHUNK_SIZE as u128) as u64) - .collect(); - - chunks_set - .iter() - .for_each(|chnkidx| chunk_index_to_rr_index.entry(*chnkidx).or_default().push(i)); + .for_each(|chnkidx| chunk_index_to_rr_index.entry(chnkidx).or_default().push(i)); }); // Find the removal records that need a new dictionary entry for the chunk