Skip to content

Commit

Permalink
Merge remote-tracking branch 'dan-da/danda/prover_job_queue_pr'
Browse files Browse the repository at this point in the history
Co-authored-by: Alan Szepieniec <[email protected]>
  • Loading branch information
Sword-Smith and aszepieniec committed Nov 4, 2024
2 parents ff2ca0f + 85ebb16 commit cb012ff
Show file tree
Hide file tree
Showing 41 changed files with 1,056 additions and 552 deletions.
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ readonly = "0.2.12"
thiserror = "1.0.65"
systemstat = "0.2.3"
sysinfo = "0.31.4"
async-priority-channel = "0.2.0"

[dev-dependencies]
blake3 = "1.5.4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"benchmark_result": {
"clock_cycle_count": 13237,
"hash_table_height": 5056,
"u32_table_height": 1186,
"u32_table_height": 826,
"op_stack_table_height": 9807,
"ram_table_height": 7605
},
Expand Down
33 changes: 33 additions & 0 deletions src/job_queue/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//! This module implements a prioritized, heterogenous job queue that sends
//! completed job results to the initiator/caller.
//!
//! This is intended for running heavy multi-threaded jobs that should be run
//! one at a time to avoid resource contention. By using this queue, multiple
//! (async) tasks can initiate these tasks and wait for results without need
//! of any other synchronization.
//!
//! note: Other rust job queues I found either did not support waiting for job
//! results or else were overly complicated, requiring backend database, etc.
//!
//! Both blocking and non-blocking (async) jobs are supported. Non-blocking jobs
//! are called inside spawn_blocking() in order to execute on tokio's blocking
//! thread-pool. Async jobs are simply awaited.
//!
//! An async_priority_channel::unbounded is used for queueing the jobs.
//! This is much like tokio::sync::mpsc::unbounded except:
//! 1. it supports prioritizing channel events (jobs)
//! 2. order of events with same priority is undefined.
//! see: https://github.com/rmcgibbo/async-priority-channel/issues/75
//!
//! Using an unbounded channel means that there is no backpressure and no
//! upper limit on the number of jobs. (except RAM).
//!
//! A nice feature is that jobs may be of mixed (heterogenous) types
//! in a single JobQueue instance. Any type that implements the Job trait
//! may be a job.

mod queue;
pub mod traits;
pub mod triton_vm;

pub use queue::JobQueue;
175 changes: 175 additions & 0 deletions src/job_queue/queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use async_priority_channel as mpsc;
use tokio::sync::oneshot;

use super::traits::Job;
use super::traits::JobResult;

// todo: fix it so that jobs with same priority execute FIFO.
/// implements a prioritized job queue that sends result of each job to a listener.
/// At present order of jobs with the same priority is undefined.
type JobResultOneShotChannel = oneshot::Sender<Box<dyn JobResult>>;

pub struct JobQueue<P: Ord> {
tx: mpsc::Sender<(Box<dyn Job>, JobResultOneShotChannel), P>,
}

impl<P: Ord> std::fmt::Debug for JobQueue<P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JobQueue")
.field("tx", &"mpsc::Sender")
.finish()
}
}

impl<P: Ord> Clone for JobQueue<P> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
}
}
}

impl<P: Ord + Send + Sync + 'static> JobQueue<P> {
// creates job queue and starts it processing. returns immediately.
pub fn start() -> Self {
let (tx, rx) = mpsc::unbounded::<(Box<dyn Job>, JobResultOneShotChannel), P>();

// spawns background task that processes job-queue and runs jobs.
tokio::spawn(async move {
while let Ok(r) = rx.recv().await {
let (job, otx) = r.0;

let result = match job.is_async() {
true => job.run_async().await,
false => tokio::task::spawn_blocking(move || job.run())
.await
.unwrap(),
};
let _ = otx.send(result);
}
});

Self { tx }
}

// alias of Self::start().
// here for two reasons:
// 1. backwards compat with existing tests
// 2. if tests call dummy() instead of start(), then it is easier
// to find where start() is called for real.
#[cfg(test)]
pub fn dummy() -> Self {
Self::start()
}

// adds job to job-queue and returns immediately.
pub async fn add_job(
&self,
job: Box<dyn Job>,
priority: P,
) -> anyhow::Result<oneshot::Receiver<Box<dyn JobResult>>> {
let (otx, orx) = oneshot::channel();
self.tx.send((job, otx), priority).await?;
Ok(orx)
}

// adds job to job-queue, waits for job completion, and returns job result.
pub async fn add_and_await_job(
&self,
job: Box<dyn Job>,
priority: P,
) -> anyhow::Result<Box<dyn JobResult>> {
let (otx, orx) = oneshot::channel();
self.tx.send((job, otx), priority).await?;
Ok(orx.await?)
}

#[cfg(test)]
pub async fn wait_until_queue_empty(&self) {
loop {
if self.tx.is_empty() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
}

#[cfg(test)]
mod tests {
use std::any::Any;

use super::*;

#[derive(PartialEq, Debug)]
struct DoubleJobResult(u64, u64);
impl JobResult for DoubleJobResult {
fn as_any(&self) -> &dyn Any {
self
}
}

// a job that doubles the input value. implements Job.
struct DoubleJob {
data: u64,
}

impl Job for DoubleJob {
fn is_async(&self) -> bool {
false
}

fn run(&self) -> Box<dyn JobResult> {
let r = DoubleJobResult(self.data, self.data * 2);

println!("{} * 2 = {}", r.0, r.1);
Box::new(r)
}
}

// todo: make test(s) for async jobs.

/// todo: this should verify the priority order of jobs.
/// presently each job just prints result and
/// human can manually verify output.
#[tokio::test]
async fn run_jobs_by_priority() -> anyhow::Result<()> {
// create a job queue
let job_queue = JobQueue::<u8>::start();

// create 10 jobs
for i in 0..10 {
let job1 = Box::new(DoubleJob { data: i });
let job2 = Box::new(DoubleJob { data: i * 100 });
let job3 = Box::new(DoubleJob { data: i * 1000 });

// process job and print results.
job_queue.add_job(job1, 1).await?;
job_queue.add_job(job2, 2).await?;
job_queue.add_job(job3, 3).await?;
}

job_queue.wait_until_queue_empty().await;

Ok(())
}

#[tokio::test]
async fn get_result() -> anyhow::Result<()> {
// create a job queue
let job_queue = JobQueue::<u8>::start();

// create 10 jobs
for i in 0..10 {
let job = Box::new(DoubleJob { data: i });

let result = job_queue.add_and_await_job(job, 1).await?;
assert_eq!(
Some(&DoubleJobResult(i, i * 2)),
result.as_any().downcast_ref::<DoubleJobResult>()
);
}

Ok(())
}
}
24 changes: 24 additions & 0 deletions src/job_queue/traits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::any::Any;

pub trait JobResult: Any + Send + Sync + std::fmt::Debug {
fn as_any(&self) -> &dyn Any;
}

// represents any kind of job
#[async_trait::async_trait]
pub trait Job: Send + Sync {
fn is_async(&self) -> bool;

// note: we provide unimplemented default methods for
// run and run_async. This is so that implementing types
// only need to impl the appropriate method.

fn run(&self) -> Box<dyn JobResult> {
unimplemented!()
}

// fn run_async(&self) -> std::future::Future<Output = Box<dyn JobResult>> + Send;
async fn run_async(&self) -> Box<dyn JobResult> {
unimplemented!()
}
}
4 changes: 4 additions & 0 deletions src/job_queue/triton_vm/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod triton_vm_job_queue;

pub use triton_vm_job_queue::TritonVmJobPriority;
pub use triton_vm_job_queue::TritonVmJobQueue;
16 changes: 16 additions & 0 deletions src/job_queue/triton_vm/triton_vm_job_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use super::super::JobQueue;

// todo: maybe we want to have more levels or just make it an integer eg u8.
// or maybe name the levels by type/usage of job/proof.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
pub enum TritonVmJobPriority {
Lowest = 1,
Low = 2,
#[default]
Normal = 3,
High = 4,
Highest = 5,
}

/// provides type safety and clarity in case we implement multiple job queues.
pub type TritonVmJobQueue = JobQueue<TritonVmJobPriority>;
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
pub mod config_models;
pub mod connect_to_peers;
pub mod database;
pub mod job_queue;
pub mod locks;
pub mod macros;
pub mod main_loop;
Expand Down
Loading

0 comments on commit cb012ff

Please sign in to comment.