diff --git a/zero_bin/prover/src/lib.rs b/zero_bin/prover/src/lib.rs index aa5367dc0..0605358f3 100644 --- a/zero_bin/prover/src/lib.rs +++ b/zero_bin/prover/src/lib.rs @@ -18,6 +18,8 @@ use trace_decoder::{ use tracing::info; use zero_bin_common::fs::generate_block_proof_file_name; +const GENERATION_INPUTS_AGG_CHUNK_SIZE: usize = 2; + #[derive(Debug, Deserialize, Serialize)] pub struct BlockProverInput { pub block_trace: BlockTrace, @@ -56,41 +58,63 @@ impl BlockProverInput { batch_size, )?; + // Create segment proof + let seg_ops = ops::SegmentProof { + save_inputs_on_error, + }; + // Generate segment data. let agg_ops = ops::SegmentAggProof { save_inputs_on_error, }; - let seg_ops = ops::SegmentProof { + // Aggregate transaction proofs + let txn_agg_proof = ops::TxnAggProof { save_inputs_on_error, }; // Map the transactions to a stream of transaction proofs. - let tx_proof_futs: FuturesUnordered<_> = txs - .iter() + let tx_proof_futs : FuturesUnordered<_> = txs.chunks(GENERATION_INPUTS_AGG_CHUNK_SIZE) .enumerate() - .map(|(idx, txn)| { - let data_iterator = SegmentDataIterator { - partial_next_data: None, - inputs: txn, - max_cpu_len_log: Some(max_cpu_len_log), - }; - - Directive::map(IndexedStream::from(data_iterator), &seg_ops) - .fold(&agg_ops) - .run(runtime) - .map(move |e| { - e.map(|p| (idx, proof_gen::proof_types::TxnAggregatableProof::from(p))) - }) - }) + .map(|(idy, chunk)| + { + let chunk_tx_proof_futs: FuturesUnordered<_> = chunk + .iter() + .enumerate() + .map(|(idx, txn)| { + let data_iterator = SegmentDataIterator { + partial_next_data: None, + inputs: txn, + max_cpu_len_log: Some(max_cpu_len_log), + }; + + Directive::map(IndexedStream::from(data_iterator), &seg_ops) + .fold(&agg_ops) + .run(runtime) + .map(move |e| { + e.map(|p| (idx, proof_gen::proof_types::TxnAggregatableProof::from(p))) + }) + }) + .collect(); + + let result = Directive::fold( + IndexedStream::new(chunk_tx_proof_futs), + &txn_agg_proof, + ).run(runtime); + result.map(move |p| (idy, p)) + }) .collect(); + let mut txn_aggregatable_proofs = Vec::new(); + for fut in tx_proof_futs { + let (_idx, txn_aggregatable_proof) = fut.await; + txn_aggregatable_proofs.push(txn_aggregatable_proof?); + } + // Fold the transaction proof stream into a single transaction proof. let final_txn_proof = Directive::fold( - IndexedStream::new(tx_proof_futs), - &ops::TxnAggProof { - save_inputs_on_error, - }, + IndexedStream::from(txn_aggregatable_proofs.into_iter()), + &txn_agg_proof, ) .run(runtime) .await?;