Skip to content

Commit

Permalink
fix: optimize segment proof aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
atanmarko committed Jul 19, 2024
1 parent 312cb60 commit 164c94b
Showing 1 changed file with 45 additions and 21 deletions.
66 changes: 45 additions & 21 deletions zero_bin/prover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use trace_decoder::{
use tracing::info;
use zero_bin_common::fs::generate_block_proof_file_name;

const GENERATION_INPUTS_AGG_CHUNK_SIZE: usize = 2;

#[derive(Debug, Deserialize, Serialize)]
pub struct BlockProverInput {
pub block_trace: BlockTrace,
Expand Down Expand Up @@ -56,41 +58,63 @@ impl BlockProverInput {
batch_size,
)?;

// Create segment proof
let seg_ops = ops::SegmentProof {
save_inputs_on_error,
};

// Generate segment data.
let agg_ops = ops::SegmentAggProof {
save_inputs_on_error,
};

let seg_ops = ops::SegmentProof {
// Aggregate transaction proofs
let txn_agg_proof = ops::TxnAggProof {
save_inputs_on_error,
};

// Map the transactions to a stream of transaction proofs.
let tx_proof_futs: FuturesUnordered<_> = txs
.iter()
let tx_proof_futs : FuturesUnordered<_> = txs.chunks(GENERATION_INPUTS_AGG_CHUNK_SIZE)
.enumerate()
.map(|(idx, txn)| {
let data_iterator = SegmentDataIterator {
partial_next_data: None,
inputs: txn,
max_cpu_len_log: Some(max_cpu_len_log),
};

Directive::map(IndexedStream::from(data_iterator), &seg_ops)
.fold(&agg_ops)
.run(runtime)
.map(move |e| {
e.map(|p| (idx, proof_gen::proof_types::TxnAggregatableProof::from(p)))
})
})
.map(|(idy, chunk)|
{
let chunk_tx_proof_futs: FuturesUnordered<_> = chunk
.iter()
.enumerate()
.map(|(idx, txn)| {
let data_iterator = SegmentDataIterator {
partial_next_data: None,
inputs: txn,
max_cpu_len_log: Some(max_cpu_len_log),
};

Directive::map(IndexedStream::from(data_iterator), &seg_ops)
.fold(&agg_ops)
.run(runtime)
.map(move |e| {
e.map(|p| (idx, proof_gen::proof_types::TxnAggregatableProof::from(p)))
})
})
.collect();

let result = Directive::fold(
IndexedStream::new(chunk_tx_proof_futs),
&txn_agg_proof,
).run(runtime);
result.map(move |p| (idy, p))
})
.collect();

let mut txn_aggregatable_proofs = Vec::new();
for fut in tx_proof_futs {
let (_idx, txn_aggregatable_proof) = fut.await;
txn_aggregatable_proofs.push(txn_aggregatable_proof?);
}

// Fold the transaction proof stream into a single transaction proof.
let final_txn_proof = Directive::fold(
IndexedStream::new(tx_proof_futs),
&ops::TxnAggProof {
save_inputs_on_error,
},
IndexedStream::from(txn_aggregatable_proofs.into_iter()),
&txn_agg_proof,
)
.run(runtime)
.await?;
Expand Down

0 comments on commit 164c94b

Please sign in to comment.