diff --git a/contracts/script/ZKDeployer.s.sol b/contracts/script/ZKDeployer.s.sol index bfcad132..1217bd89 100644 --- a/contracts/script/ZKDeployer.s.sol +++ b/contracts/script/ZKDeployer.s.sol @@ -11,6 +11,7 @@ contract ZKDeployer is Script, Utils { vm.startBroadcast(); Config memory config = readJsonWithRPCFromEnv("zkconfig.json"); + // TODO: This seems wrong. Why are we using the msg.sender as a proxy? config.l2OutputOracleProxy = address(new Proxy(msg.sender)); address zkL2OutputOracleImpl = address(new ZKL2OutputOracle()); diff --git a/contracts/zkconfig.json b/contracts/zkconfig.json index 6ae95077..9bd08921 100644 --- a/contracts/zkconfig.json +++ b/contracts/zkconfig.json @@ -1,5 +1,5 @@ { - "startingBlockNumber": 16621800, + "startingBlockNumber": 16795981, "l2RollupNode": "", "submissionInterval": 150, "l2BlockTime": 2, @@ -11,4 +11,4 @@ "vkey": "0x0010ea62be193a3c288183b203657e75184b8b92d5f872a25e2ff065d5e9c84d", "verifierGateway": "0x3B6041173B80E77f038f3F2C0f9744f04837185e", "l2OutputOracleProxy": "0x863508f057c09f7b94e582d74404859ecd36a306" -} \ No newline at end of file +} diff --git a/proposer/succinct/bin/cost_estimator.rs b/proposer/succinct/bin/cost_estimator.rs index d5943b3a..8af4c709 100644 --- a/proposer/succinct/bin/cost_estimator.rs +++ b/proposer/succinct/bin/cost_estimator.rs @@ -2,25 +2,19 @@ use anyhow::Result; use clap::Parser; use kona_host::HostCli; use kona_primitives::RollupConfig; -use log::{error, info}; +use log::info; use op_succinct_host_utils::{ fetcher::{ChainMode, OPSuccinctDataFetcher}, get_proof_stdin, stats::{get_execution_stats, ExecutionStats}, + witnessgen::WitnessGenExecutor, ProgramType, }; -use op_succinct_proposer::run_native_host; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use reqwest::Client; use serde::{Deserialize, Serialize}; use sp1_sdk::{utils, ProverClient}; -use std::{ - cmp::min, - env, fs, - future::Future, - path::PathBuf, - time::{Duration, Instant}, -}; +use std::{cmp::min, env, fs, future::Future, path::PathBuf, time::Instant}; use tokio::task::block_in_place; pub const MULTI_BLOCK_ELF: &[u8] = include_bytes!("../../../elf/range-elf"); @@ -143,28 +137,30 @@ async fn run_native_data_generation( split_ranges: &[SpanBatchRange], ) -> Vec { const CONCURRENT_NATIVE_HOST_RUNNERS: usize = 5; - const NATIVE_HOST_TIMEOUT: Duration = Duration::from_secs(300); - // TODO: Shut down all processes when the program exits OR a Ctrl+C is pressed. - let futures = split_ranges.chunks(CONCURRENT_NATIVE_HOST_RUNNERS).map(|chunk| { - futures::future::join_all(chunk.iter().map(|range| async { + let futures = split_ranges.chunks(CONCURRENT_NATIVE_HOST_RUNNERS).map(|chunk| async { + let mut witnessgen_executor = WitnessGenExecutor::default(); + + let mut batch_host_clis = Vec::new(); + for range in chunk.iter() { let host_cli = data_fetcher .get_host_cli_args(range.start, range.end, ProgramType::Multi) .await .unwrap(); + batch_host_clis.push(BatchHostCli { + host_cli: host_cli.clone(), + start: range.start, + end: range.end, + }); + witnessgen_executor + .spawn_witnessgen(&host_cli) + .await + .expect("Failed to spawn witness generation process."); + } - let data_dir = host_cli.data_dir.clone().expect("Data directory is not set."); - - fs::create_dir_all(&data_dir).unwrap(); - - let res = run_native_host(&host_cli, NATIVE_HOST_TIMEOUT).await; - if res.is_err() { - error!("Failed to run native host: {:?}", res.err().unwrap()); - std::process::exit(1); - } + witnessgen_executor.flush().await.expect("Failed to flush witness generation."); - BatchHostCli { host_cli, start: range.start, end: range.end } - })) + batch_host_clis }); futures::future::join_all(futures).await.into_iter().flatten().collect() diff --git a/proposer/succinct/bin/server.rs b/proposer/succinct/bin/server.rs index a84802b3..490ace9f 100644 --- a/proposer/succinct/bin/server.rs +++ b/proposer/succinct/bin/server.rs @@ -10,9 +10,9 @@ use base64::{engine::general_purpose, Engine as _}; use log::info; use op_succinct_client_utils::{RawBootInfo, BOOT_INFO_SIZE}; use op_succinct_host_utils::{ - fetcher::OPSuccinctDataFetcher, get_agg_proof_stdin, get_proof_stdin, ProgramType, + fetcher::OPSuccinctDataFetcher, get_agg_proof_stdin, get_proof_stdin, + witnessgen::WitnessGenExecutor, ProgramType, }; -use op_succinct_proposer::run_native_host; use serde::{Deserialize, Deserializer, Serialize}; use sp1_sdk::{ network::client::NetworkClient, @@ -87,9 +87,9 @@ async fn request_span_proof( // Start the server and native client with a timeout. // Note: Ideally, the server should call out to a separate process that executes the native // host, and return an ID that the client can poll on to check if the proof was submitted. - // TODO: If this fails, we should definitely NOT request a proof! Otherwise, we get execution - // failures on the cluster. - run_native_host(&host_cli, Duration::from_secs(60)).await?; + let mut witnessgen_executor = WitnessGenExecutor::default(); + witnessgen_executor.spawn_witnessgen(&host_cli).await?; + witnessgen_executor.flush().await?; let sp1_stdin = get_proof_stdin(&host_cli)?; diff --git a/proposer/succinct/src/lib.rs b/proposer/succinct/src/lib.rs deleted file mode 100644 index d19359e0..00000000 --- a/proposer/succinct/src/lib.rs +++ /dev/null @@ -1,62 +0,0 @@ -use anyhow::Result; -use std::time::Duration; -use tokio::time::timeout; - -use kona_host::HostCli; - -/// Convert the HostCli to a vector of arguments that can be passed to a command. -pub fn convert_host_cli_to_args(host_cli: &HostCli) -> Vec { - let mut args = vec![ - format!("--l1-head={}", host_cli.l1_head), - format!("--l2-head={}", host_cli.l2_head), - format!("--l2-output-root={}", host_cli.l2_output_root), - format!("--l2-claim={}", host_cli.l2_claim), - format!("--l2-block-number={}", host_cli.l2_block_number), - format!("--l2-chain-id={}", host_cli.l2_chain_id), - ]; - if let Some(addr) = &host_cli.l2_node_address { - args.push("--l2-node-address".to_string()); - args.push(addr.to_string()); - } - if let Some(addr) = &host_cli.l1_node_address { - args.push("--l1-node-address".to_string()); - args.push(addr.to_string()); - } - if let Some(addr) = &host_cli.l1_beacon_address { - args.push("--l1-beacon-address".to_string()); - args.push(addr.to_string()); - } - if let Some(dir) = &host_cli.data_dir { - args.push("--data-dir".to_string()); - args.push(dir.to_string_lossy().into_owned()); - } - if let Some(exec) = &host_cli.exec { - args.push("--exec".to_string()); - args.push(exec.to_string()); - } - if host_cli.server { - args.push("--server".to_string()); - } - args -} - -/// Run the native host with a timeout. Use a binary to execute the native host, as opposed to -/// spawning a new thread in the same process due to the static cursors employed by the host. -pub async fn run_native_host( - host_cli: &HostCli, - timeout_duration: Duration, -) -> Result { - let metadata = - cargo_metadata::MetadataCommand::new().exec().expect("Failed to get cargo metadata"); - let target_dir = - metadata.target_directory.join("native_host_runner/release/native_host_runner"); - let args = convert_host_cli_to_args(host_cli); - - // Run the native host runner. - let mut child = - tokio::process::Command::new(target_dir).args(&args).env("RUST_LOG", "info").spawn()?; - - // Return the child process handle. - // TODO: There's no nice way to retry the native host runner/executor. - Ok(timeout(timeout_duration, child.wait()).await??) -} diff --git a/scripts/prove/bin/multi.rs b/scripts/prove/bin/multi.rs index 27c331d6..a7a8c143 100644 --- a/scripts/prove/bin/multi.rs +++ b/scripts/prove/bin/multi.rs @@ -2,11 +2,11 @@ use std::{fs, time::Instant}; use anyhow::Result; use clap::Parser; -use kona_host::start_server_and_native_client; use op_succinct_host_utils::{ fetcher::{ChainMode, OPSuccinctDataFetcher}, get_proof_stdin, stats::get_execution_stats, + witnessgen::WitnessGenExecutor, ProgramType, }; use sp1_sdk::{utils, ProverClient}; @@ -57,7 +57,9 @@ async fn main() -> Result<()> { fs::create_dir_all(&data_dir).unwrap(); // Start the server and native client. - start_server_and_native_client(host_cli.clone()).await?; + let mut witnessgen_executor = WitnessGenExecutor::default(); + witnessgen_executor.spawn_witnessgen(&host_cli).await?; + witnessgen_executor.flush().await?; } let execution_duration = start_time.elapsed(); println!("Execution Duration: {:?}", execution_duration); diff --git a/scripts/prove/bin/single.rs b/scripts/prove/bin/single.rs index 704e5c99..5f4fbdc2 100644 --- a/scripts/prove/bin/single.rs +++ b/scripts/prove/bin/single.rs @@ -2,9 +2,10 @@ use std::{env, fs}; use anyhow::Result; use clap::Parser; -use kona_host::start_server_and_native_client; use num_format::{Locale, ToFormattedString}; -use op_succinct_host_utils::{fetcher::OPSuccinctDataFetcher, get_proof_stdin, ProgramType}; +use op_succinct_host_utils::{ + fetcher::OPSuccinctDataFetcher, get_proof_stdin, witnessgen::WitnessGenExecutor, ProgramType, +}; use sp1_sdk::{utils, ProverClient}; pub const SINGLE_BLOCK_ELF: &[u8] = include_bytes!("../../../elf/fault-proof-elf"); @@ -46,7 +47,9 @@ async fn main() -> Result<()> { fs::create_dir_all(&data_dir).unwrap(); // Start the server and native client. - start_server_and_native_client(host_cli.clone()).await.unwrap(); + let mut witnessgen_executor = WitnessGenExecutor::default(); + witnessgen_executor.spawn_witnessgen(&host_cli).await?; + witnessgen_executor.flush().await?; } // Get the stdin for the block. diff --git a/scripts/witnessgen/bin/native_host_runner.rs b/scripts/witnessgen/bin/native_host_runner.rs index cfd66ade..60652efa 100644 --- a/scripts/witnessgen/bin/native_host_runner.rs +++ b/scripts/witnessgen/bin/native_host_runner.rs @@ -1,7 +1,7 @@ use anyhow::Result; use clap::Parser; use kona_host::{init_tracing_subscriber, start_server, start_server_and_native_client, HostCli}; -use log::{error, info}; +use log::info; // Source: https://github.com/ethereum-optimism/kona/blob/main/bin/host/src/main.rs #[tokio::main(flavor = "multi_thread")] @@ -12,11 +12,7 @@ async fn main() -> Result<()> { if cfg.server { start_server(cfg).await?; } else { - let res = start_server_and_native_client(cfg).await; - if res.is_err() { - error!("Failed to run server and native client: {:?}", res.err().unwrap()); - std::process::exit(1); - } + start_server_and_native_client(cfg).await?; } info!("Exiting host program."); diff --git a/utils/host/src/lib.rs b/utils/host/src/lib.rs index 0db38cc1..4d8befe0 100644 --- a/utils/host/src/lib.rs +++ b/utils/host/src/lib.rs @@ -1,6 +1,7 @@ pub mod fetcher; pub mod helpers; pub mod stats; +pub mod witnessgen; use alloy_consensus::Header; use alloy_primitives::B256; diff --git a/utils/host/src/witnessgen.rs b/utils/host/src/witnessgen.rs new file mode 100644 index 00000000..072194f2 --- /dev/null +++ b/utils/host/src/witnessgen.rs @@ -0,0 +1,145 @@ +use anyhow::Result; +use std::time::Duration; +use tokio::time::timeout; + +use kona_host::HostCli; + +/// Convert the HostCli to a vector of arguments that can be passed to a command. +pub fn convert_host_cli_to_args(host_cli: &HostCli) -> Vec { + let mut args = vec![ + format!("--l1-head={}", host_cli.l1_head), + format!("--l2-head={}", host_cli.l2_head), + format!("--l2-output-root={}", host_cli.l2_output_root), + format!("--l2-claim={}", host_cli.l2_claim), + format!("--l2-block-number={}", host_cli.l2_block_number), + format!("--l2-chain-id={}", host_cli.l2_chain_id), + ]; + if let Some(addr) = &host_cli.l2_node_address { + args.push("--l2-node-address".to_string()); + args.push(addr.to_string()); + } + if let Some(addr) = &host_cli.l1_node_address { + args.push("--l1-node-address".to_string()); + args.push(addr.to_string()); + } + if let Some(addr) = &host_cli.l1_beacon_address { + args.push("--l1-beacon-address".to_string()); + args.push(addr.to_string()); + } + if let Some(dir) = &host_cli.data_dir { + args.push("--data-dir".to_string()); + args.push(dir.to_string_lossy().into_owned()); + } + if let Some(exec) = &host_cli.exec { + args.push("--exec".to_string()); + args.push(exec.to_string()); + } + if host_cli.server { + args.push("--server".to_string()); + } + args +} + +/// Default timeout for witness generation. +pub const WITNESSGEN_TIMEOUT: Duration = Duration::from_secs(300); + +struct WitnessGenProcess { + child: tokio::process::Child, + exec: String, +} + +/// Stateful executor for witness generation. Useful for executing several witness generation +/// processes in parallel. +pub struct WitnessGenExecutor { + ongoing_processes: Vec, + timeout: Duration, +} + +impl Default for WitnessGenExecutor { + fn default() -> Self { + Self::new(WITNESSGEN_TIMEOUT) + } +} + +impl WitnessGenExecutor { + pub fn new(timeout: Duration) -> Self { + Self { ongoing_processes: Vec::new(), timeout } + } + + /// Spawn a witness generation process for the given host CLI, and adds it to the list of + /// ongoing processes. + pub async fn spawn_witnessgen(&mut self, host_cli: &HostCli) -> Result<()> { + let metadata = + cargo_metadata::MetadataCommand::new().exec().expect("Failed to get cargo metadata"); + let target_dir = + metadata.target_directory.join("native_host_runner/release/native_host_runner"); + let args = convert_host_cli_to_args(host_cli); + + // Run the native host runner. + let child = + tokio::process::Command::new(target_dir).args(&args).env("RUST_LOG", "info").spawn()?; + self.ongoing_processes + .push(WitnessGenProcess { child, exec: host_cli.exec.clone().unwrap() }); + Ok(()) + } + + /// Wait for all ongoing witness generation processes to complete. If any process fails, + /// kill all ongoing processes and return an error. + pub async fn flush(&mut self) -> Result<()> { + let binary_name = self.ongoing_processes[0].exec.split('/').last().unwrap().to_string(); + + // TODO: If any process fails or a Ctrl+C is received, kill all ongoing processes. This is + // quite involved, as the behavior differs between Unix and Windows. When using + // Ctrl+C handler, you also need to be careful to restore the original behavior + // after your custom behavior, otherwise you won't be able to terminate "normally". + + // Wait for all processes to complete. + let result = self.wait_for_processes().await; + let any_failed = result.is_err(); + + if any_failed { + self.kill_all(binary_name).await?; + Err(anyhow::anyhow!("One or more child processes failed or timed out")) + } else { + Ok(()) + } + } + + /// Wait for all ongoing witness generation processes to complete. If any process fails, return + /// an error. + async fn wait_for_processes(&mut self) -> Result<()> { + for child in &mut self.ongoing_processes { + match timeout(self.timeout, child.child.wait()).await { + Ok(Ok(status)) if !status.success() => { + return Err(anyhow::anyhow!("Child process exited with non-zero status")); + } + Ok(Err(e)) => { + eprintln!("Child process error: {}", e); + return Err(anyhow::anyhow!("Child process error")); + } + Err(_) => { + eprintln!("Child process timed out"); + return Err(anyhow::anyhow!("Child process timed out")); + } + _ => {} + } + } + Ok(()) + } + + /// Kill all ongoing "native client" processes and the associated spawned witness gen + /// programs. Specifically, whenever witness generation is spawned, there is a "native + /// client" process that spawns a "witness gen" program. Just killing the "native client" + /// process will not kill the "witness gen" program, so we need to explicitly kill the + /// "witness gen" program as well. + async fn kill_all(&mut self, binary_name: String) -> Result<()> { + // Kill the "native client" processes. + for mut child in self.ongoing_processes.drain(..) { + child.child.kill().await?; + } + + // Kill the spawned witness gen program. + std::process::Command::new("pkill").arg("-f").arg(binary_name).output()?; + Ok(()) + } +}