From 98c4750907dc001cf5094559568812e93559ad57 Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Fri, 19 Jul 2024 12:17:25 +0200 Subject: [PATCH] fix: optimize --- zero_bin/prover/src/lib.rs | 81 ++++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/zero_bin/prover/src/lib.rs b/zero_bin/prover/src/lib.rs index 0605358f3..56881294e 100644 --- a/zero_bin/prover/src/lib.rs +++ b/zero_bin/prover/src/lib.rs @@ -18,7 +18,10 @@ use trace_decoder::{ use tracing::info; use zero_bin_common::fs::generate_block_proof_file_name; -const GENERATION_INPUTS_AGG_CHUNK_SIZE: usize = 2; +// Limit the number of segments that are proven in parallel +// to prevent excessive memory consumption +const GENERATION_INPUTS_AGG_CHUNK_SIZE_INNER: usize = 2; +const GENERATION_INPUTS_AGG_CHUNK_SIZE: usize = 5; #[derive(Debug, Deserialize, Serialize)] pub struct BlockProverInput { @@ -69,51 +72,58 @@ impl BlockProverInput { }; // Aggregate transaction proofs - let txn_agg_proof = ops::TxnAggProof { + let txn_agg_proof = ops::TxnAggProof { save_inputs_on_error, }; - // Map the transactions to a stream of transaction proofs. - let tx_proof_futs : FuturesUnordered<_> = txs.chunks(GENERATION_INPUTS_AGG_CHUNK_SIZE) - .enumerate() - .map(|(idy, chunk)| - { + // Tweak inner and outer chunk size if needed for memory/utilization balance + let mut all_txn_aggregatable_proofs = Vec::new(); + for chunk in txs.chunks(GENERATION_INPUTS_AGG_CHUNK_SIZE) { + // Map the chunk of transactions to transaction proofs. let chunk_tx_proof_futs: FuturesUnordered<_> = chunk - .iter() - .enumerate() - .map(|(idx, txn)| { - let data_iterator = SegmentDataIterator { - partial_next_data: None, - inputs: txn, - max_cpu_len_log: Some(max_cpu_len_log), - }; - - Directive::map(IndexedStream::from(data_iterator), &seg_ops) - .fold(&agg_ops) - .run(runtime) - .map(move |e| { - e.map(|p| (idx, proof_gen::proof_types::TxnAggregatableProof::from(p))) + .chunks(GENERATION_INPUTS_AGG_CHUNK_SIZE_INNER) + .map(|generation_inputs| { + let chunk_tx_proof_futs: FuturesUnordered<_> = generation_inputs + .iter() + .enumerate() + .map(|(idx, generation_inputs)| { + let data_iterator = SegmentDataIterator { + partial_next_data: None, + inputs: generation_inputs, + max_cpu_len_log: Some(max_cpu_len_log), + }; + + // Create segment aggregatable proofs from generation segment data + // and aggregate them into single txn aggregatable proof + Directive::map(IndexedStream::from(data_iterator), &seg_ops) + .fold(&agg_ops) + .run(runtime) + .map(move |e| { + e.map(|p| { + (idx, proof_gen::proof_types::TxnAggregatableProof::from(p)) + }) + }) }) + .collect(); + + // Aggregate txn aggregatable proofs for this chunk + Directive::fold(IndexedStream::new(chunk_tx_proof_futs), &txn_agg_proof) + .run(runtime) }) .collect(); - let result = Directive::fold( - IndexedStream::new(chunk_tx_proof_futs), - &txn_agg_proof, - ).run(runtime); - result.map(move |p| (idy, p)) - }) - .collect(); - - let mut txn_aggregatable_proofs = Vec::new(); - for fut in tx_proof_futs { - let (_idx, txn_aggregatable_proof) = fut.await; - txn_aggregatable_proofs.push(txn_aggregatable_proof?); + // Await for the chunk to be computed + all_txn_aggregatable_proofs.extend( + futures::future::join_all(chunk_tx_proof_futs) + .await + .into_iter() + .collect::, _>>()?, + ); } - // Fold the transaction proof stream into a single transaction proof. + // Fold all the agg transaction proofs into a single aggregatable proof let final_txn_proof = Directive::fold( - IndexedStream::from(txn_aggregatable_proofs.into_iter()), + IndexedStream::from(all_txn_aggregatable_proofs.into_iter()), &txn_agg_proof, ) .run(runtime) @@ -128,6 +138,7 @@ impl BlockProverInput { None => None, }; + // Generate the block proof let block_proof = paladin::directive::Literal(proof) .map(&ops::BlockProof { prev,