Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
temaniarpit27 committed Sep 13, 2024
1 parent fb687b2 commit fab7935
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 50 deletions.
4 changes: 2 additions & 2 deletions zero_bin/common/src/proof_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use paladin::runtime::Runtime;

pub struct ProofRuntime {
pub block_proof_runtime: Runtime,
pub segment_proof_runtime: Runtime,
pub light_proof_runtime: Runtime,
pub heavy_proof_runtime: Runtime,
}
8 changes: 4 additions & 4 deletions zero_bin/leader/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ pub(crate) struct Cli {
#[clap(flatten)]
pub(crate) prover_state_config: CliProverStateConfig,

// Mode to use for worker for setup (split or unified)
#[arg(long = "worker-run-mode", help_heading = WORKER_HELP_HEADING, value_enum, default_value = "unified")]
// Mode to use for worker for setup (affinity or default)
#[arg(long = "worker-run-mode", help_heading = WORKER_HELP_HEADING, value_enum, default_value = "default")]
pub(crate) worker_run_mode: WorkerRunMode,
}

#[derive(ValueEnum, Clone, PartialEq, Debug)]
pub enum WorkerRunMode {
Split,
Unified,
Affinity,
Default,
}

#[derive(Subcommand)]
Expand Down
4 changes: 2 additions & 2 deletions zero_bin/leader/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ pub(crate) async fn client_main(
}
}

proof_runtime.block_proof_runtime.close().await?;
proof_runtime.segment_proof_runtime.close().await?;
proof_runtime.light_proof_runtime.close().await?;
proof_runtime.heavy_proof_runtime.close().await?;

if test_only {
info!("All proof witnesses have been generated successfully.");
Expand Down
35 changes: 17 additions & 18 deletions zero_bin/leader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ fn get_previous_proof(path: Option<PathBuf>) -> Result<Option<GeneratedBlockProo
Ok(Some(proof))
}

const SEGMENT_PROOF_ROUTING_KEY: &str = "segment-proof";
const BLOCK_PROOF_ROUTING_KEY: &str = "block-proof";
const HEAVY_PROOF_ROUTING_KEY: &str = "heavy-proof";
const LIGHT_PROOF_ROUTING_KEY: &str = "block-proof";
const DEFAULT_ROUTING_KEY: &str = paladin::runtime::DEFAULT_ROUTING_KEY;

#[tokio::main]
Expand All @@ -61,32 +61,31 @@ async fn main() -> Result<()> {

let args = cli::Cli::parse();

let mut block_proof_routing_key = DEFAULT_ROUTING_KEY.to_string();
let mut segment_proof_routing_key = DEFAULT_ROUTING_KEY.to_string();
if args.worker_run_mode == cli::WorkerRunMode::Split {
// If we're running in split mode, we need to set the routing key for the
let mut light_proof_routing_key = DEFAULT_ROUTING_KEY.to_string();
let mut heavy_proof_routing_key = DEFAULT_ROUTING_KEY.to_string();
if args.worker_run_mode == cli::WorkerRunMode::Affinity {
// If we're running in affinity mode, we need to set the routing key for the
// block proof and segment proof.
info!("Workers running in split mode");
block_proof_routing_key = BLOCK_PROOF_ROUTING_KEY.to_string();
segment_proof_routing_key = SEGMENT_PROOF_ROUTING_KEY.to_string();
info!("Workers running in affinity mode");
light_proof_routing_key = LIGHT_PROOF_ROUTING_KEY.to_string();
heavy_proof_routing_key = HEAVY_PROOF_ROUTING_KEY.to_string();
}

let mut block_proof_paladin_args = args.paladin.clone();
block_proof_paladin_args.task_bus_routing_key = Some(block_proof_routing_key);
let mut light_proof_paladin_args = args.paladin.clone();
light_proof_paladin_args.task_bus_routing_key = Some(light_proof_routing_key);

let mut segment_proof_paladin_args = args.paladin.clone();
segment_proof_paladin_args.task_bus_routing_key = Some(segment_proof_routing_key);
let mut heavy_proof_paladin_args = args.paladin.clone();
heavy_proof_paladin_args.task_bus_routing_key = Some(heavy_proof_routing_key);

let block_proof_runtime = Runtime::from_config(&block_proof_paladin_args, register()).await?;
let segment_proof_runtime =
Runtime::from_config(&segment_proof_paladin_args, register()).await?;
let light_proof_runtime = Runtime::from_config(&light_proof_paladin_args, register()).await?;
let heavy_proof_runtime = Runtime::from_config(&heavy_proof_paladin_args, register()).await?;
if let Command::Clean = args.command {
return zero_bin_common::prover_state::persistence::delete_all();
}

let proof_runtime = ProofRuntime {
block_proof_runtime,
segment_proof_runtime,
light_proof_runtime,
heavy_proof_runtime,
};

let proof_runtime = Arc::new(proof_runtime);
Expand Down
4 changes: 2 additions & 2 deletions zero_bin/leader/src/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ pub(crate) async fn stdio_main(
}
}

proof_runtime.block_proof_runtime.close().await?;
proof_runtime.segment_proof_runtime.close().await?;
proof_runtime.light_proof_runtime.close().await?;
proof_runtime.heavy_proof_runtime.close().await?;

if prover_config.test_only {
info!("All proof witnesses have been generated successfully.");
Expand Down
32 changes: 10 additions & 22 deletions zero_bin/prover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::sync::Arc;
use alloy::primitives::U256;
use anyhow::{Context, Result};
use evm_arithmetization::SegmentDataIterator;
use futures::StreamExt;
use futures::{
future, future::BoxFuture, stream::FuturesUnordered, FutureExt, TryFutureExt, TryStreamExt,
};
Expand Down Expand Up @@ -98,31 +97,20 @@ impl BlockProverInput {
save_inputs_on_error,
};

let vec_segment_agg_proofs: FuturesUnordered<_> = block_generation_inputs
// Segment the batches, prove segments and aggregate them to resulting batch
// proofs.
let batch_proof_futs: FuturesUnordered<_> = block_generation_inputs
.iter()
.map(|txn_batch| {
.enumerate()
.map(|(idx, txn_batch)| {
let segment_data_iterator = SegmentDataIterator::<proof_gen::types::Field>::new(
txn_batch,
Some(max_cpu_len_log),
);

// Map over the indexed stream with the given seg_prove_ops operation.
Directive::map(IndexedStream::from(segment_data_iterator), &seg_prove_ops)
.run(&proof_runtime.segment_proof_runtime)
})
.collect();

// Await all the futures concurrently
let vec_segment_agg_proofs: Vec<_> = vec_segment_agg_proofs.collect().await;

let batch_proof_futs: FuturesUnordered<_> = vec_segment_agg_proofs
.into_iter()
.enumerate()
.map(|(idx, result)| {
// todo - check how to handle the unwrap
let res = result.unwrap();
Directive::fold(IndexedStream::new(res), &seg_agg_ops)
.run(&proof_runtime.block_proof_runtime)
.fold(&seg_agg_ops)
.run(&proof_runtime.heavy_proof_runtime)
.map(move |e| {
e.map(|p| (idx, proof_gen::proof_types::BatchAggregatableProof::from(p)))
})
Expand All @@ -132,7 +120,7 @@ impl BlockProverInput {
// Fold the batch aggregated proof stream into a single proof.
let final_batch_proof =
Directive::fold(IndexedStream::new(batch_proof_futs), &batch_agg_ops)
.run(&proof_runtime.block_proof_runtime)
.run(&proof_runtime.light_proof_runtime)
.await?;

if let proof_gen::proof_types::BatchAggregatableProof::Agg(proof) = final_batch_proof {
Expand All @@ -149,7 +137,7 @@ impl BlockProverInput {
prev,
save_inputs_on_error,
})
.run(&proof_runtime.block_proof_runtime)
.run(&proof_runtime.light_proof_runtime)
.await?;

info!("Successfully proved block {block_number}");
Expand Down Expand Up @@ -202,7 +190,7 @@ impl BlockProverInput {
);

simulation
.run(&proof_runtime.block_proof_runtime)
.run(&proof_runtime.light_proof_runtime)
.await?
.try_for_each(|_| future::ok(()))
.await?;
Expand Down

0 comments on commit fab7935

Please sign in to comment.