Skip to content

Commit

Permalink
feat : added worker queues and listeners to consume the message (#92)
Browse files Browse the repository at this point in the history
* feat : added worker queues and listeners to consume the message

* changelog

* refactor : removed excess queues

* fear : removed duplicate macro

* feat : renamed vars

* refactor : code refactor according the comments above
  • Loading branch information
ocdbytes authored Aug 23, 2024
1 parent 7926588 commit 5728df2
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 77 deletions.
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ AWS_DEFAULT_REGION=
# SQS
SQS_JOB_PROCESSING_QUEUE_URL=
SQS_JOB_VERIFICATION_QUEUE_URL=
SQS_JOB_HANDLE_FAILURE_QUEUE_URL=
SQS_WORKER_TRIGGER_QUEUE_URL=

# S3
AWS_S3_BUCKET_NAME=
Expand Down
2 changes: 2 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ AWS_S3_BUCKET_REGION="us-east-1"
AWS_ENDPOINT_URL="http://localhost.localstack.cloud:4566"
SQS_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_processing_queue"
SQS_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_verification_queue"
SQS_JOB_HANDLE_FAILURE_QUEUE_URL=
SQS_WORKER_TRIGGER_QUEUE_URL=
AWS_DEFAULT_REGION="localhost"

##### On chain config #####
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- Worker queues to listen for trigger events.
- Tests for prover client.
- Added Rust Cache for Coverage Test CI.
- support for fetching PIE file from storage client in proving job.
Expand Down
22 changes: 0 additions & 22 deletions crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,6 @@ use dotenvy::dotenv;
use orchestrator::config::config;
use orchestrator::queue::init_consumers;
use orchestrator::routes::app_router;
use orchestrator::workers::data_submission_worker::DataSubmissionWorker;
use orchestrator::workers::proof_registration::ProofRegistrationWorker;
use orchestrator::workers::proving::ProvingWorker;
use orchestrator::workers::snos::SnosWorker;
use orchestrator::workers::update_state::UpdateStateWorker;
use orchestrator::workers::*;
use utils::env_utils::get_env_var_or_default;

/// Start the server
Expand All @@ -27,22 +21,6 @@ async fn main() {
// init consumer
init_consumers().await.expect("Failed to init consumers");

// spawn a thread for each workers
// changes in rollup mode - sovereign, validity, validiums etc.
// will likely involve changes in these workers as well
tokio::spawn(start_cron(Box::new(SnosWorker), 60));
tokio::spawn(start_cron(Box::new(ProvingWorker), 60));
tokio::spawn(start_cron(Box::new(ProofRegistrationWorker), 60));
tokio::spawn(start_cron(Box::new(UpdateStateWorker), 60));
tokio::spawn(start_cron(Box::new(DataSubmissionWorker), 60));

tracing::info!("Listening on http://{}", address);
axum::serve(listener, app).await.expect("Failed to start axum server");
}

async fn start_cron(worker: Box<dyn Worker>, interval: u64) {
loop {
worker.run_worker_if_enabled().await.expect("Error in running the worker.");
tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
}
}
233 changes: 184 additions & 49 deletions crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;

use color_eyre::eyre::Context;
use color_eyre::Result as EyreResult;
use omniqueue::QueueError;
use omniqueue::{Delivery, QueueError};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use tracing::log;
Expand All @@ -14,9 +14,18 @@ use crate::jobs::{handle_job_failure, process_job, verify_job, JobError, OtherEr

pub const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";
pub const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue";
// Below is the Data Letter Queue for the the above two jobs.
// Below is the Data Letter Queue for the above two jobs.
pub const JOB_HANDLE_FAILURE_QUEUE: &str = "madara_orchestrator_job_handle_failure_queue";

// Queues for SNOS worker trigger listening
pub const WORKER_TRIGGER_QUEUE: &str = "madara_orchestrator_worker_trigger_queue";

use crate::workers::data_submission_worker::DataSubmissionWorker;
use crate::workers::proof_registration::ProofRegistrationWorker;
use crate::workers::proving::ProvingWorker;
use crate::workers::snos::SnosWorker;
use crate::workers::update_state::UpdateStateWorker;
use crate::workers::Worker;
use thiserror::Error;

#[derive(Error, Debug, PartialEq)]
Expand All @@ -27,6 +36,9 @@ pub enum ConsumptionError {
#[error("Failed to handle job with id {job_id:?}. Error: {error_msg:?}")]
FailedToHandleJob { job_id: Uuid, error_msg: String },

#[error("Failed to spawn {worker_trigger_type:?} worker. Error: {error_msg:?}")]
FailedToSpawnWorker { worker_trigger_type: WorkerTriggerType, error_msg: String },

#[error("Other error: {0}")]
Other(#[from] OtherError),
}
Expand All @@ -36,6 +48,25 @@ pub struct JobQueueMessage {
pub(crate) id: Uuid,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub enum WorkerTriggerType {
Snos,
Proving,
ProofRegistration,
DataSubmission,
UpdateState,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct WorkerTriggerMessage {
pub(crate) worker: WorkerTriggerType,
}

enum DeliveryReturnType {
Message(Delivery),
NoMessage,
}

pub async fn add_job_to_process_queue(id: Uuid) -> EyreResult<()> {
log::info!("Adding job with id {:?} to processing queue", id);
add_job_to_queue(id, JOB_PROCESSING_QUEUE.to_string(), None).await
Expand All @@ -52,60 +83,158 @@ where
Fut: Future<Output = Result<(), JobError>>,
{
log::info!("Consuming from queue {:?}", queue);
let config = config().await;
let delivery = match config.queue().consume_message_from_queue(queue.clone()).await {
Ok(d) => d,
Err(QueueError::NoData) => {
return Ok(());
}
Err(e) => {
return Err(ConsumptionError::FailedToConsumeFromQueue { error_msg: e.to_string() });
}
let delivery = get_delivery_from_queue(&queue).await?;

let message = match delivery {
DeliveryReturnType::Message(message) => message,
DeliveryReturnType::NoMessage => return Ok(()),
};

let job_message = parse_job_message(&message)?;

if let Some(job_message) = job_message {
handle_job_message(job_message, message, handler).await?;
}

Ok(())
}

/// Function to consume the message from the worker trigger queues and spawn the worker
/// for respective message received.
pub async fn consume_worker_trigger_messages_from_queue<F, Fut>(
queue: String,
handler: F,
) -> Result<(), ConsumptionError>
where
F: FnOnce(Box<dyn Worker>) -> Fut,
Fut: Future<Output = color_eyre::Result<()>>,
{
log::info!("Consuming from queue {:?}", queue);
let delivery = get_delivery_from_queue(&queue).await?;

let message = match delivery {
DeliveryReturnType::Message(message) => message,
DeliveryReturnType::NoMessage => return Ok(()),
};
let job_message: Option<JobQueueMessage> = delivery

let job_message = parse_worker_message(&message)?;

if let Some(job_message) = job_message {
handle_worker_message(job_message, message, handler).await?;
}

Ok(())
}

fn parse_job_message(message: &Delivery) -> Result<Option<JobQueueMessage>, ConsumptionError> {
message
.payload_serde_json()
.wrap_err("Payload Serde Error")
.map_err(|e| ConsumptionError::Other(OtherError::from(e)))?;

match job_message {
Some(job_message) => {
log::info!("Handling job with id {:?} for queue {:?}", job_message.id, queue);
match handler(job_message.id).await {
Ok(_) => delivery
.ack()
.await
.map_err(|(e, _)| e)
.wrap_err("Queue Error")
.map_err(|e| ConsumptionError::Other(OtherError::from(e)))?,
Err(e) => {
log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e);

// if the queue as a retry logic at the source, it will be attempted
// after the nack
match delivery.nack().await {
Ok(_) => Err(ConsumptionError::FailedToHandleJob {
job_id: job_message.id,
error_msg: "Job handling failed, message nack-ed".to_string(),
})?,
Err(delivery_nack_error) => Err(ConsumptionError::FailedToHandleJob {
job_id: job_message.id,
error_msg: delivery_nack_error.0.to_string(),
})?,
}
}
};
.map_err(|e| ConsumptionError::Other(OtherError::from(e)))
}

fn parse_worker_message(message: &Delivery) -> Result<Option<WorkerTriggerMessage>, ConsumptionError> {
message
.payload_serde_json()
.wrap_err("Payload Serde Error")
.map_err(|e| ConsumptionError::Other(OtherError::from(e)))
}

async fn handle_job_message<F, Fut>(
job_message: JobQueueMessage,
message: Delivery,
handler: F,
) -> Result<(), ConsumptionError>
where
F: FnOnce(Uuid) -> Fut,
Fut: Future<Output = Result<(), JobError>>,
{
log::info!("Handling job with id {:?}", job_message.id);

match handler(job_message.id).await {
Ok(_) => {
message
.ack()
.await
.map_err(|(e, _)| e)
.wrap_err("Queue Error")
.map_err(|e| ConsumptionError::Other(OtherError::from(e)))?;
Ok(())
}
None => return Ok(()),
};
Err(e) => {
log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e);
match message.nack().await {
Ok(_) => Err(ConsumptionError::FailedToHandleJob {
job_id: job_message.id,
error_msg: "Job handling failed, message nack-ed".to_string(),
}),
Err(delivery_nack_error) => Err(ConsumptionError::FailedToHandleJob {
job_id: job_message.id,
error_msg: delivery_nack_error.0.to_string(),
}),
}
}
}
}

Ok(())
async fn handle_worker_message<F, Fut>(
job_message: WorkerTriggerMessage,
message: Delivery,
handler: F,
) -> Result<(), ConsumptionError>
where
F: FnOnce(Box<dyn Worker>) -> Fut,
Fut: Future<Output = color_eyre::Result<()>>,
{
log::info!("Handling worker trigger for worker type : {:?}", job_message.worker);
let worker_handler = get_worker_handler_from_worker_trigger_type(job_message.worker.clone());

match handler(worker_handler).await {
Ok(_) => {
message
.ack()
.await
.map_err(|(e, _)| e)
.wrap_err("Queue Error")
.map_err(|e| ConsumptionError::Other(OtherError::from(e)))?;
Ok(())
}
Err(e) => {
log::error!("Failed to handle worker trigger {:?}. Error: {:?}", job_message.worker, e);
message.nack().await.map_err(|(e, _)| ConsumptionError::Other(OtherError::from(e.to_string())))?;
Err(ConsumptionError::FailedToSpawnWorker {
worker_trigger_type: job_message.worker,
error_msg: "Worker handling failed, message nack-ed".to_string(),
})
}
}
}

/// To get Box<dyn Worker> handler from `WorkerTriggerType`.
fn get_worker_handler_from_worker_trigger_type(worker_trigger_type: WorkerTriggerType) -> Box<dyn Worker> {
match worker_trigger_type {
WorkerTriggerType::Snos => Box::new(SnosWorker),
WorkerTriggerType::Proving => Box::new(ProvingWorker),
WorkerTriggerType::DataSubmission => Box::new(DataSubmissionWorker),
WorkerTriggerType::ProofRegistration => Box::new(ProofRegistrationWorker),
WorkerTriggerType::UpdateState => Box::new(UpdateStateWorker),
}
}

/// To get the delivery from the message queue using the queue name
async fn get_delivery_from_queue(queue: &str) -> Result<DeliveryReturnType, ConsumptionError> {
match config().await.queue().consume_message_from_queue(queue.to_string()).await {
Ok(d) => Ok(DeliveryReturnType::Message(d)),
Err(QueueError::NoData) => Ok(DeliveryReturnType::NoMessage),
Err(e) => Err(ConsumptionError::FailedToConsumeFromQueue { error_msg: e.to_string() }),
}
}

macro_rules! spawn_consumer {
($queue_type :expr, $handler : expr) => {
($queue_type :expr, $handler : expr, $consume_function: expr) => {
tokio::spawn(async move {
loop {
match consume_job_from_queue($queue_type, $handler).await {
match $consume_function($queue_type, $handler).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", $queue_type, e),
}
Expand All @@ -116,10 +245,16 @@ macro_rules! spawn_consumer {
}

pub async fn init_consumers() -> Result<(), JobError> {
spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job);
spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job);
spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure);
spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job, consume_job_from_queue);
spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job, consume_job_from_queue);
spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure, consume_job_from_queue);
spawn_consumer!(WORKER_TRIGGER_QUEUE.to_string(), spawn_worker, consume_worker_trigger_messages_from_queue);
Ok(())
}

/// To spawn the worker by passing the worker struct
async fn spawn_worker(worker: Box<dyn Worker>) -> color_eyre::Result<()> {
worker.run_worker_if_enabled().await.expect("Error in running the worker.");
Ok(())
}

Expand Down
25 changes: 19 additions & 6 deletions crates/orchestrator/src/queue/sqs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
use std::collections::HashMap;
use std::time::Duration;

use crate::queue::job_queue::JOB_PROCESSING_QUEUE;
use crate::queue::job_queue::{
JOB_HANDLE_FAILURE_QUEUE, JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE, WORKER_TRIGGER_QUEUE,
};
use async_trait::async_trait;
use color_eyre::Result;
use lazy_static::lazy_static;
use omniqueue::backends::{SqsBackend, SqsConfig, SqsConsumer, SqsProducer};
use omniqueue::{Delivery, QueueError};
use utils::env_utils::get_env_var_or_panic;

use crate::queue::QueueProvider;
pub struct SqsQueue;

lazy_static! {
/// Maps Queue Name to Env var of queue URL.
pub static ref QUEUE_NAME_TO_ENV_VAR_MAPPING: HashMap<&'static str, &'static str> = HashMap::from([
(JOB_PROCESSING_QUEUE, "SQS_JOB_PROCESSING_QUEUE_URL"),
(JOB_VERIFICATION_QUEUE, "SQS_JOB_VERIFICATION_QUEUE_URL"),
(JOB_HANDLE_FAILURE_QUEUE, "SQS_JOB_HANDLE_FAILURE_QUEUE_URL"),
(WORKER_TRIGGER_QUEUE, "SQS_WORKER_TRIGGER_QUEUE_URL"),
]);
}

#[async_trait]
impl QueueProvider for SqsQueue {
async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option<Duration>) -> Result<()> {
Expand All @@ -31,12 +45,11 @@ impl QueueProvider for SqsQueue {
}
}

/// To fetch the queue URL from the environment variables
fn get_queue_url(queue_name: String) -> String {
if queue_name == JOB_PROCESSING_QUEUE {
get_env_var_or_panic("SQS_JOB_PROCESSING_QUEUE_URL")
} else {
get_env_var_or_panic("SQS_JOB_VERIFICATION_QUEUE_URL")
}
get_env_var_or_panic(
QUEUE_NAME_TO_ENV_VAR_MAPPING.get(queue_name.as_str()).expect("Not able to get the queue env var name."),
)
}

// TODO: store the producer and consumer in memory to avoid creating a new one every time
Expand Down

0 comments on commit 5728df2

Please sign in to comment.