Skip to content

Commit

Permalink
witnessgen start
Browse files Browse the repository at this point in the history
  • Loading branch information
ratankaliani committed Sep 3, 2024
1 parent dc3e411 commit e949c6c
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 37 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions contracts/script/ZKDeployer.s.sol
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ contract ZKDeployer is Script, Utils {
vm.startBroadcast();

Config memory config = readJsonWithRPCFromEnv("zkconfig.json");
// Note: This seems wrong. Why are we using the msg.sender as a proxy? Unless the owner is that.
config.l2OutputOracleProxy = address(new Proxy(msg.sender));

address zkL2OutputOracleImpl = address(new ZKL2OutputOracle());
Expand Down
38 changes: 19 additions & 19 deletions proposer/succinct/bin/cost_estimator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use op_succinct_host_utils::{
stats::{get_execution_stats, ExecutionStats},
ProgramType,
};
use op_succinct_proposer::run_native_host;
use op_succinct_proposer::run_parallel_witnessgen;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use reqwest::Client;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -146,25 +146,25 @@ async fn run_native_data_generation(
const NATIVE_HOST_TIMEOUT: Duration = Duration::from_secs(300);

// TODO: Shut down all processes when the program exits OR a Ctrl+C is pressed.
let futures = split_ranges.chunks(CONCURRENT_NATIVE_HOST_RUNNERS).map(|chunk| {
futures::future::join_all(chunk.iter().map(|range| async {
let host_cli = data_fetcher
.get_host_cli_args(range.start, range.end, ProgramType::Multi)
.await
.unwrap();

let data_dir = host_cli.data_dir.clone().expect("Data directory is not set.");

fs::create_dir_all(&data_dir).unwrap();

let res = run_native_host(&host_cli, NATIVE_HOST_TIMEOUT).await;
if res.is_err() {
error!("Failed to run native host: {:?}", res.err().unwrap());
std::process::exit(1);
}
let futures = split_ranges.chunks(CONCURRENT_NATIVE_HOST_RUNNERS).map(|chunk| async {
let batch_host_clis: Vec<BatchHostCli> =
futures::future::join_all(chunk.iter().map(|range| async {
let host_cli = data_fetcher
.get_host_cli_args(range.start, range.end, ProgramType::Multi)
.await
.unwrap();
BatchHostCli { host_cli, start: range.start, end: range.end }
}))
.await;

let host_clis = batch_host_clis.iter().map(|b| b.host_cli.clone()).collect::<Vec<_>>();

let output = run_parallel_witnessgen(host_clis, NATIVE_HOST_TIMEOUT).await;
if output.is_err() {
panic!("Running witness generation failed: {}", output.err().unwrap());
}

BatchHostCli { host_cli, start: range.start, end: range.end }
}))
batch_host_clis
});

futures::future::join_all(futures).await.into_iter().flatten().collect()
Expand Down
4 changes: 2 additions & 2 deletions proposer/succinct/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use op_succinct_client_utils::{RawBootInfo, BOOT_INFO_SIZE};
use op_succinct_host_utils::{
fetcher::OPSuccinctDataFetcher, get_agg_proof_stdin, get_proof_stdin, ProgramType,
};
use op_succinct_proposer::run_native_host;
use op_succinct_proposer::run_witnessgen;
use serde::{Deserialize, Deserializer, Serialize};
use sp1_sdk::{
network::client::NetworkClient,
Expand Down Expand Up @@ -89,7 +89,7 @@ async fn request_span_proof(
// host, and return an ID that the client can poll on to check if the proof was submitted.
// TODO: If this fails, we should definitely NOT request a proof! Otherwise, we get execution
// failures on the cluster.
run_native_host(&host_cli, Duration::from_secs(60)).await?;
run_witnessgen(&host_cli, Duration::from_secs(60)).await?;

let sp1_stdin = get_proof_stdin(&host_cli)?;

Expand Down
83 changes: 76 additions & 7 deletions proposer/succinct/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ pub fn convert_host_cli_to_args(host_cli: &HostCli) -> Vec<String> {

/// Run the native host with a timeout. Use a binary to execute the native host, as opposed to
/// spawning a new thread in the same process due to the static cursors employed by the host.
pub async fn run_native_host(
host_cli: &HostCli,
timeout_duration: Duration,
) -> Result<std::process::ExitStatus> {
pub async fn run_witnessgen(host_cli: &HostCli, timeout_duration: Duration) -> Result<()> {
let metadata =
cargo_metadata::MetadataCommand::new().exec().expect("Failed to get cargo metadata");
let target_dir =
Expand All @@ -56,7 +53,79 @@ pub async fn run_native_host(
let mut child =
tokio::process::Command::new(target_dir).args(&args).env("RUST_LOG", "info").spawn()?;

// Return the child process handle.
// TODO: There's no nice way to retry the native host runner/executor.
Ok(timeout(timeout_duration, child.wait()).await??)
// Set up a timeout that will kill the process if it exceeds the duration
match tokio::time::timeout(timeout_duration, child.wait()).await {
Ok(result) => {
let result = result.unwrap();
if result.success() {
Ok(())
} else {
Err(anyhow::anyhow!("Process failed with exit code: {}", result.code().unwrap()))
}
}
Err(_) => {
// Timeout occurred, kill the process
child.kill().await?;
Err(anyhow::anyhow!("Process timed out and was killed."))
}
}
}

pub async fn run_parallel_witnessgen(
host_clis: Vec<HostCli>,
timeout_duration: Duration,
) -> Result<()> {
let metadata =
cargo_metadata::MetadataCommand::new().exec().expect("Failed to get cargo metadata");
let target_dir =
metadata.target_directory.join("native_host_runner/release/native_host_runner");

let mut command_args = Vec::new();
for host_cli in host_clis.clone() {
let args = convert_host_cli_to_args(&host_cli);
command_args.push(args);
}

let mut children = Vec::new();
for args in command_args {
let child = tokio::process::Command::new(&target_dir)
.args(&args)
.env("RUST_LOG", "info")
.spawn()?;
children.push(child);
}

let mut any_failed = false;
for child in &mut children {
match timeout(timeout_duration, child.wait()).await {
Ok(Ok(status)) if !status.success() => {
any_failed = true;
break;
}
Ok(Err(e)) => {
any_failed = true;
eprintln!("Child process error: {}", e);
break;
}
Err(_) => {
any_failed = true;
eprintln!("Child process timed out");
break;
}
_ => {}
}
}

if any_failed {
// Terminate remaining children
for mut child in children {
let _ = child.kill().await?;
}
// Kill the spawned witness gen program.
let binary_name = host_clis[0].exec.as_ref().unwrap().split('/').last().unwrap();
std::process::Command::new("pkill").arg("-f").arg(binary_name).output()?;
Err(anyhow::anyhow!("One or more child processes failed or timed out"))
} else {
Ok(())
}
}
2 changes: 2 additions & 0 deletions scripts/prove/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ kona-host = { workspace = true }
# local
op-succinct-host-utils.workspace = true
op-succinct-client-utils.workspace = true
op-succinct-proposer.workspace = true

# sp1
sp1-sdk = { workspace = true }
Expand All @@ -46,3 +47,4 @@ csv.workspace = true
[build-dependencies]
sp1-build = { workspace = true }
op-succinct-host-utils = { workspace = true }
cargo_metadata = { workspace = true }
11 changes: 9 additions & 2 deletions scripts/prove/bin/multi.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{fs, time::Instant};
use std::{
fs,
time::{Duration, Instant},
};

use anyhow::Result;
use clap::Parser;
Expand All @@ -9,6 +12,7 @@ use op_succinct_host_utils::{
stats::get_execution_stats,
ProgramType,
};
use op_succinct_proposer::{run_parallel_witnessgen, run_witnessgen};
use sp1_sdk::{utils, ProverClient};

pub const MULTI_BLOCK_ELF: &[u8] = include_bytes!("../../../elf/range-elf");
Expand Down Expand Up @@ -57,7 +61,10 @@ async fn main() -> Result<()> {
fs::create_dir_all(&data_dir).unwrap();

// Start the server and native client.
start_server_and_native_client(host_cli.clone()).await?;
let output = run_parallel_witnessgen(vec![host_cli.clone()], Duration::from_secs(1)).await;
if output.is_err() {
panic!("Running witness generation failed: {}", output.err().unwrap());
}
}
let execution_duration = start_time.elapsed();
println!("Execution Duration: {:?}", execution_duration);
Expand Down
8 changes: 6 additions & 2 deletions scripts/prove/bin/single.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{env, fs};
use std::{env, fs, time::Duration};

use anyhow::Result;
use clap::Parser;
use kona_host::start_server_and_native_client;
use num_format::{Locale, ToFormattedString};
use op_succinct_host_utils::{fetcher::OPSuccinctDataFetcher, get_proof_stdin, ProgramType};
use op_succinct_proposer::run_witnessgen;
use sp1_sdk::{utils, ProverClient};

pub const SINGLE_BLOCK_ELF: &[u8] = include_bytes!("../../../elf/fault-proof-elf");
Expand Down Expand Up @@ -46,7 +47,10 @@ async fn main() -> Result<()> {
fs::create_dir_all(&data_dir).unwrap();

// Start the server and native client.
start_server_and_native_client(host_cli.clone()).await.unwrap();
let output = run_witnessgen(&host_cli, Duration::from_secs(0)).await;
if output.is_err() {
panic!("Running witness generation failed: {}", output.err().unwrap());
}
}

// Get the stdin for the block.
Expand Down
30 changes: 30 additions & 0 deletions scripts/prove/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,32 @@
// },
// );
// }
use std::process::Command;

/// Build the native host runner to a separate target directory to avoid build lockups.
fn build_native_host_runner() {
let metadata =
cargo_metadata::MetadataCommand::new().exec().expect("Failed to get cargo metadata");
let target_dir = metadata.target_directory.join("native_host_runner");

let status = Command::new("cargo")
.args([
"build",
"--workspace",
"--bin",
"native_host_runner",
"--release",
"--target-dir",
target_dir.as_ref(),
])
.status()
.expect("Failed to execute cargo build command");
if !status.success() {
panic!("Failed to build native_host_runner");
}

println!("cargo:warning=native_host_runner built with release profile",);
}

fn main() {
// let programs = vec!["range"];
Expand All @@ -36,4 +62,8 @@ fn main() {
// }

// build_zkvm_program("aggregation");

// This runs the build script every time as the timestamp ont he build script updates.
println!("cargo:rerun-if-changed=build.rs");
build_native_host_runner();
}
6 changes: 1 addition & 5 deletions scripts/witnessgen/bin/native_host_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ async fn main() -> Result<()> {
if cfg.server {
start_server(cfg).await?;
} else {
let res = start_server_and_native_client(cfg).await;
if res.is_err() {
error!("Failed to run server and native client: {:?}", res.err().unwrap());
std::process::exit(1);
}
start_server_and_native_client(cfg).await?;
}

info!("Exiting host program.");
Expand Down

0 comments on commit e949c6c

Please sign in to comment.