Skip to content

Commit

Permalink
feat(block-producer): inflight state custody of transaction data (#534)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mirko-von-Leipzig authored Nov 2, 2024
1 parent ad20a30 commit 225b6f4
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 91 deletions.
160 changes: 80 additions & 80 deletions crates/block-producer/src/mempool/inflight_state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
collections::{btree_map::Entry, BTreeMap, BTreeSet, VecDeque},
sync::Arc,
};

Expand Down Expand Up @@ -45,10 +45,13 @@ pub struct InflightState {
/// Some of these may already be consumed - check the nullifiers.
output_notes: BTreeMap<NoteId, OutputNoteState>,

/// Delta's representing the impact of each recently committed blocks on the inflight state.
/// Inflight transaction deltas.
///
/// These are used to prune committed state after `num_retained_blocks` have passed.
committed_state: VecDeque<StateDelta>,
/// This _excludes_ deltas in committed blocks.
transaction_deltas: BTreeMap<TransactionId, Delta>,

/// Committed block deltas.
committed_blocks: VecDeque<BTreeMap<TransactionId, Delta>>,

/// Amount of recently committed blocks we retain in addition to the inflight state.
///
Expand All @@ -61,35 +64,23 @@ pub struct InflightState {
chain_tip: BlockNumber,
}

/// The aggregated impact of a set of sequential transactions on the [InflightState].
#[derive(Clone, Default, Debug, PartialEq)]
struct StateDelta {
/// The number of transactions that affected each account.
account_transactions: BTreeMap<AccountId, usize>,

/// The nullifiers consumed by the transactions.
/// A summary of a transaction's impact on the state.
#[derive(Clone, Debug, PartialEq)]
struct Delta {
/// The account this transaction updated.
account: AccountId,
/// The nullifiers produced by this transaction.
nullifiers: BTreeSet<Nullifier>,

/// The notes produced by the transactions.
/// The output notes created by this transaction.
output_notes: BTreeSet<NoteId>,
}

impl StateDelta {
fn new(txs: &[AuthenticatedTransaction]) -> Self {
let mut account_transactions = BTreeMap::<AccountId, usize>::new();
let mut nullifiers = BTreeSet::new();
let mut output_notes = BTreeSet::new();

for tx in txs {
*account_transactions.entry(tx.account_id()).or_default() += 1;
nullifiers.extend(tx.nullifiers());
output_notes.extend(tx.output_notes());
}

impl Delta {
fn new(tx: &AuthenticatedTransaction) -> Self {
Self {
account_transactions,
nullifiers,
output_notes,
account: tx.account_id(),
nullifiers: tx.nullifiers().collect(),
output_notes: tx.output_notes().collect(),
}
}
}
Expand All @@ -104,7 +95,8 @@ impl InflightState {
accounts: Default::default(),
nullifiers: Default::default(),
output_notes: Default::default(),
committed_state: Default::default(),
transaction_deltas: Default::default(),
committed_blocks: Default::default(),
}
}

Expand All @@ -126,7 +118,7 @@ impl InflightState {

fn oldest_committed_state(&self) -> BlockNumber {
let committed_len: u32 = self
.committed_state
.committed_blocks
.len()
.try_into()
.expect("We should not be storing many blocks");
Expand Down Expand Up @@ -213,6 +205,7 @@ impl InflightState {

/// Aggregate the transaction into the state, returning its parent transactions.
fn insert_transaction(&mut self, tx: &AuthenticatedTransaction) -> BTreeSet<TransactionId> {
self.transaction_deltas.insert(tx.id(), Delta::new(tx));
let account_parent = self
.accounts
.entry(tx.account_id())
Expand All @@ -236,29 +229,30 @@ impl InflightState {
account_parent.into_iter().chain(note_parents).collect()
}

/// Reverts the given state diff.
/// Reverts the given set of _uncommitted_ transactions.
///
/// # Panics
///
/// Panics if any part of the diff isn't present in the state. Callers should take
/// care to only revert transaction sets who's ancestors are all either committed or reverted.
pub fn revert_transactions(&mut self, txs: &[AuthenticatedTransaction]) {
let delta = StateDelta::new(txs);
for (account, count) in delta.account_transactions {
let status = self.accounts.get_mut(&account).expect("Account must exist").revert(count);
/// Panics if any transactions is not part of the uncommitted state. Callers should take care to
/// only revert transaction sets who's ancestors are all either committed or reverted.
pub fn revert_transactions(&mut self, txs: BTreeSet<TransactionId>) {
for tx in txs {
let delta = self.transaction_deltas.remove(&tx).expect("Transaction delta must exist");

// SAFETY: Since the delta exists, so must the account.
let account_status = self.accounts.get_mut(&delta.account).unwrap().revert(1);
// Prune empty accounts.
if status.is_empty() {
self.accounts.remove(&account);
if account_status.is_empty() {
self.accounts.remove(&delta.account);
}
}

for nullifier in delta.nullifiers {
assert!(self.nullifiers.remove(&nullifier), "Nullifier must exist");
}
for nullifier in delta.nullifiers {
assert!(self.nullifiers.remove(&nullifier), "Nullifier must exist");
}

for note in delta.output_notes {
assert!(self.output_notes.remove(&note).is_some(), "Output note must exist");
for note in delta.output_notes {
assert!(self.output_notes.remove(&note).is_some(), "Output note must exist");
}
}
}

Expand All @@ -272,53 +266,57 @@ impl InflightState {
///
/// # Panics
///
/// Panics if the accounts don't have enough inflight transactions to commit or if
/// the output notes don't exist.
pub fn commit_block(&mut self, txs: &[AuthenticatedTransaction]) {
let delta = StateDelta::new(txs);
for (account, count) in &delta.account_transactions {
self.accounts.get_mut(account).expect("Account must exist").commit(*count);
}
/// Panics if any transactions is not part of the uncommitted state.
pub fn commit_block(&mut self, txs: impl IntoIterator<Item = TransactionId>) {
let mut block_deltas = BTreeMap::new();
for tx in txs.into_iter() {
let delta = self.transaction_deltas.remove(&tx).expect("Transaction delta must exist");

for note in &delta.output_notes {
self.output_notes.get_mut(note).expect("Output note must exist").commit();
}
// SAFETY: Since the delta exists, so must the account.
self.accounts.get_mut(&delta.account).unwrap().commit(1);

self.committed_state.push_back(delta);
for note in &delta.output_notes {
self.output_notes.get_mut(note).expect("Output note must exist").commit();
}

if self.committed_state.len() > self.num_retained_blocks {
let delta = self.committed_state.pop_front().expect("Must be some due to length check");
self.prune_committed_state(delta);
block_deltas.insert(tx, delta);
}

self.committed_blocks.push_back(block_deltas);
self.prune_block();
self.chain_tip.increment();
}

/// Removes the delta from inflight state.
/// Prunes the state from the oldest committed block _IFF_ there are more than the number we
/// wish to retain.
///
/// # Panics
///
/// Panics if the accounts don't have enough inflight transactions to commit.
fn prune_committed_state(&mut self, diff: StateDelta) {
for (account, count) in diff.account_transactions {
let status = self
.accounts
.get_mut(&account)
.expect("Account must exist")
.prune_committed(count);
/// This is used to bound the size of the inflight state.
fn prune_block(&mut self) {
// Keep the required number of committed blocks.
//
// This would occur on startup until we have accumulated enough blocks.
if self.committed_blocks.len() <= self.num_retained_blocks {
return;
}
// SAFETY: The length check above guarantees that we have at least one committed block.
let block = self.committed_blocks.pop_front().unwrap();

for (tx_id, delta) in block {
// SAFETY: Since the delta exists, so must the account.
let status = self.accounts.get_mut(&delta.account).unwrap().prune_committed(1);

// Prune empty accounts.
if status.is_empty() {
self.accounts.remove(&account);
self.accounts.remove(&delta.account);
}
}

for nullifier in diff.nullifiers {
self.nullifiers.remove(&nullifier);
}
for nullifier in delta.nullifiers {
self.nullifiers.remove(&nullifier);
}

for output_note in diff.output_notes {
self.output_notes.remove(&output_note);
for output_note in delta.output_notes {
self.output_notes.remove(&output_note);
}
}
}
}
Expand Down Expand Up @@ -541,7 +539,7 @@ mod tests {
uut.add_transaction(&tx1.clone()).unwrap();

// Commit the parents, which should remove them from dependency tracking.
uut.commit_block(&[tx0, tx1]);
uut.commit_block([tx0.id(), tx1.id()]);

let parents = uut
.add_transaction(&AuthenticatedTransaction::from_inner(tx).with_empty_store_state())
Expand Down Expand Up @@ -584,7 +582,9 @@ mod tests {
panic!("Inserting tx #{idx} in iteration {i} should succeed: {err}")
});
}
reverted.revert_transactions(&txs[txs.len() - i..]);
reverted.revert_transactions(
txs.iter().rev().take(i).rev().map(AuthenticatedTransaction::id).collect(),
);

let mut inserted = InflightState::new(BlockNumber::default(), 1);
for (idx, tx) in txs.iter().rev().skip(i).rev().enumerate() {
Expand Down Expand Up @@ -636,7 +636,7 @@ mod tests {
panic!("Inserting tx #{idx} in iteration {i} should succeed: {err}")
});
}
committed.commit_block(&txs[..i]);
committed.commit_block(txs.iter().take(i).map(AuthenticatedTransaction::id));

let mut inserted = InflightState::new(BlockNumber::new(1), 0);
for (idx, tx) in txs.iter().skip(i).enumerate() {
Expand Down
12 changes: 3 additions & 9 deletions crates/block-producer/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,12 @@ impl Mempool {
// Remove committed batches and transactions from graphs.
let batches = self.block_in_progress.take().expect("No block in progress to commit");
let transactions = self.batches.prune_committed(batches).expect("Batches failed to commit");
let transactions = self
.transactions
self.transactions
.commit_transactions(&transactions)
.expect("Transaction graph malformed");

// Inform inflight state about committed data.
self.state.commit_block(&transactions);
self.state.commit_block(transactions);

self.chain_tip.increment();
}
Expand All @@ -237,11 +236,6 @@ impl Mempool {
.expect("Transaction graph is malformed");

// Rollback state.
let transactions = transactions
.into_iter()
// FIXME
.map(|tx_id| todo!("Inflight state should remember diffs"))
.collect::<Vec<_>>();
self.state.revert_transactions(&transactions);
self.state.revert_transactions(transactions);
}
}
5 changes: 3 additions & 2 deletions crates/block-producer/src/mempool/transaction_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ impl TransactionGraph {
pub fn commit_transactions(
&mut self,
tx_ids: &[TransactionId],
) -> Result<Vec<AuthenticatedTransaction>, GraphError<TransactionId>> {
) -> Result<(), GraphError<TransactionId>> {
// TODO: revisit this api.
let tx_ids = tx_ids.iter().cloned().collect();
self.inner.prune_processed(tx_ids)
self.inner.prune_processed(tx_ids)?;
Ok(())
}

/// Removes the transactions and all their descendants from the graph.
Expand Down

0 comments on commit 225b6f4

Please sign in to comment.