diff --git a/.env.test b/.env.test index aae41417..583b0d41 100644 --- a/.env.test +++ b/.env.test @@ -21,8 +21,19 @@ AWS_S3_BUCKET_NAME="madara-orchestrator-test-bucket" ##### QUEUE ##### QUEUE_PROVIDER="sqs" -SQS_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_processing_queue" -SQS_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_verification_queue" + +SQS_SNOS_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_snos_job_processing_queue" +SQS_SNOS_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_snos_job_verification_queue" + +SQS_PROVING_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_proving_job_processing_queue" +SQS_PROVING_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_proving_job_verification_queue" + +SQS_DATA_SUBMISSION_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_data_submission_job_processing_queue" +SQS_DATA_SUBMISSION_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_data_submission_job_verification_queue" + +SQS_UPDATE_STATE_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_update_state_job_verification_queue" +SQS_UPDATE_STATE_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_update_state_job_verification_queue" + SQS_JOB_HANDLE_FAILURE_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_handle_failure_queue" SQS_WORKER_TRIGGER_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_worker_trigger_queue" @@ -77,4 +88,4 @@ STARKNET_OPERATOR_ADDRESS="0x5b98B836969A60FEC50Fa925905Dd1D382a7db43" AWS_SNS_ARN_NAME="madara-orchestrator-arn" MADARA_BINARY_PATH="/path/to/madara" # pick up by AWS sdk -AWS_ENDPOINT_URL="http://localhost.localstack.cloud:4566" \ No newline at end of file +AWS_ENDPOINT_URL="http://localhost.localstack.cloud:4566" diff --git a/CHANGELOG.md b/CHANGELOG.md index 3880f0b5..af109bad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Added +- Add multiple queues for processing and verification based on job type - added logs - added MongoDB migrations using nodejs - added dockerfile diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 2694768e..767ed515 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -42,7 +42,7 @@ pub enum JobError { #[error("Job already exists for internal_id {internal_id:?} and job_type {job_type:?}. Skipping!")] JobAlreadyExists { internal_id: String, job_type: JobType }, - #[error("Invalid status {id:?} for job with id {job_status:?}. Cannot process.")] + #[error("Invalid status {job_status:?} for job with id {id:?}. Cannot process.")] InvalidStatus { id: Uuid, job_status: JobStatus }, #[error("Failed to find job with id {id:?}")] @@ -169,7 +169,9 @@ pub async fn create_job( let job_item = job_handler.create_job(config.clone(), internal_id.clone(), metadata).await?; config.database().create_job(job_item.clone()).await?; - add_job_to_process_queue(job_item.id, config.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?; + add_job_to_process_queue(job_item.id, &job_type, config.clone()) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; let attributes = [ KeyValue::new("job_type", format!("{:?}", job_type)), @@ -258,6 +260,7 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> tracing::debug!(job_id = ?id, "Adding job to verification queue"); add_job_to_verification_queue( job.id, + &job.job_type, Duration::from_secs(job_handler.verification_polling_delay_seconds()), config.clone(), ) @@ -340,6 +343,7 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { attempt = process_attempts + 1, "Verification failed. Retrying job processing" ); + config .database() .update_job( @@ -354,8 +358,9 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to VerificationFailed"); JobError::Other(OtherError(e)) })?; - add_job_to_process_queue(job.id, config.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?; - return Ok(()); + add_job_to_process_queue(job.id, &job.job_type, config.clone()) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; } else { tracing::warn!(job_id = ?id, "Max process attempts reached. Job will not be retried"); return move_job_to_failed( @@ -394,6 +399,7 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { tracing::debug!(job_id = ?id, "Adding job back to verification queue"); add_job_to_verification_queue( job.id, + &job.job_type, Duration::from_secs(job_handler.verification_polling_delay_seconds()), config.clone(), ) diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index 66b48b09..d8ee8e2f 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -12,6 +12,7 @@ use tokio::time::sleep; use uuid::Uuid; use crate::config::Config; +use crate::jobs::types::JobType; use crate::jobs::{handle_job_failure, process_job, verify_job, JobError, OtherError}; use crate::workers::data_submission_worker::DataSubmissionWorker; use crate::workers::proof_registration::ProofRegistrationWorker; @@ -20,9 +21,23 @@ use crate::workers::snos::SnosWorker; use crate::workers::update_state::UpdateStateWorker; use crate::workers::Worker; -pub const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; -pub const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue"; -// Below is the Data Letter Queue for the above two jobs. +pub const SNOS_JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_snos_job_processing_queue"; +pub const SNOS_JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_snos_job_verification_queue"; + +pub const PROVING_JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_proving_job_processing_queue"; +pub const PROVING_JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_proving_job_verification_queue"; + +pub const PROOF_REGISTRATION_JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_proof_registration_job_processing_queue"; +pub const PROOF_REGISTRATION_JOB_VERIFICATION_QUEUE: &str = + "madara_orchestrator_proof_registration_job_verification_queue"; + +pub const DATA_SUBMISSION_JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_data_submission_job_processing_queue"; +pub const DATA_SUBMISSION_JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_data_submission_job_verification_queue"; + +pub const UPDATE_STATE_JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_update_state_job_processing_queue"; +pub const UPDATE_STATE_JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_update_state_job_verification_queue"; + +// Below is the Dead Letter Queue for the above queues. pub const JOB_HANDLE_FAILURE_QUEUE: &str = "madara_orchestrator_job_handle_failure_queue"; // Queues for SNOS worker trigger listening @@ -102,14 +117,47 @@ enum DeliveryReturnType { NoMessage, } -pub async fn add_job_to_process_queue(id: Uuid, config: Arc) -> EyreResult<()> { +pub trait QueueNameForJobType { + fn process_queue_name(&self) -> String; + fn verify_queue_name(&self) -> String; +} + +impl QueueNameForJobType for JobType { + fn process_queue_name(&self) -> String { + match self { + JobType::SnosRun => SNOS_JOB_PROCESSING_QUEUE, + JobType::ProofCreation => PROVING_JOB_PROCESSING_QUEUE, + JobType::ProofRegistration => PROOF_REGISTRATION_JOB_PROCESSING_QUEUE, + JobType::DataSubmission => DATA_SUBMISSION_JOB_PROCESSING_QUEUE, + JobType::StateTransition => UPDATE_STATE_JOB_PROCESSING_QUEUE, + } + .to_string() + } + fn verify_queue_name(&self) -> String { + match self { + JobType::SnosRun => SNOS_JOB_VERIFICATION_QUEUE, + JobType::ProofCreation => PROVING_JOB_VERIFICATION_QUEUE, + JobType::ProofRegistration => PROOF_REGISTRATION_JOB_VERIFICATION_QUEUE, + JobType::DataSubmission => DATA_SUBMISSION_JOB_VERIFICATION_QUEUE, + JobType::StateTransition => UPDATE_STATE_JOB_VERIFICATION_QUEUE, + } + .to_string() + } +} + +pub async fn add_job_to_process_queue(id: Uuid, job_type: &JobType, config: Arc) -> EyreResult<()> { tracing::info!("Adding job with id {:?} to processing queue", id); - add_job_to_queue(id, JOB_PROCESSING_QUEUE.to_string(), None, config).await + add_job_to_queue(id, job_type.process_queue_name(), None, config).await } -pub async fn add_job_to_verification_queue(id: Uuid, delay: Duration, config: Arc) -> EyreResult<()> { +pub async fn add_job_to_verification_queue( + id: Uuid, + job_type: &JobType, + delay: Duration, + config: Arc, +) -> EyreResult<()> { tracing::info!("Adding job with id {:?} to verification queue", id); - add_job_to_queue(id, JOB_VERIFICATION_QUEUE.to_string(), Some(delay), config).await + add_job_to_queue(id, job_type.verify_queue_name(), Some(delay), config).await } pub async fn consume_job_from_queue( @@ -122,7 +170,7 @@ where F: Send + 'static, Fut: Future> + Send, { - tracing::info!(queue = %queue, "Attempting to consume job from queue"); + tracing::trace!(queue = %queue, "Attempting to consume job from queue"); let delivery = get_delivery_from_queue(&queue, config.clone()).await?; @@ -322,9 +370,35 @@ macro_rules! spawn_consumer { } pub async fn init_consumers(config: Arc) -> Result<(), JobError> { - spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job, consume_job_from_queue, config.clone()); - spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job, consume_job_from_queue, config.clone()); + spawn_consumer!(SNOS_JOB_PROCESSING_QUEUE.to_string(), process_job, consume_job_from_queue, config.clone()); + spawn_consumer!(SNOS_JOB_VERIFICATION_QUEUE.to_string(), verify_job, consume_job_from_queue, config.clone()); + + spawn_consumer!(PROVING_JOB_PROCESSING_QUEUE.to_string(), process_job, consume_job_from_queue, config.clone()); + spawn_consumer!(PROVING_JOB_VERIFICATION_QUEUE.to_string(), verify_job, consume_job_from_queue, config.clone()); + + spawn_consumer!( + DATA_SUBMISSION_JOB_PROCESSING_QUEUE.to_string(), + process_job, + consume_job_from_queue, + config.clone() + ); + spawn_consumer!( + DATA_SUBMISSION_JOB_VERIFICATION_QUEUE.to_string(), + verify_job, + consume_job_from_queue, + config.clone() + ); + + spawn_consumer!(UPDATE_STATE_JOB_PROCESSING_QUEUE.to_string(), process_job, consume_job_from_queue, config.clone()); + spawn_consumer!( + UPDATE_STATE_JOB_VERIFICATION_QUEUE.to_string(), + verify_job, + consume_job_from_queue, + config.clone() + ); + spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure, consume_job_from_queue, config.clone()); + spawn_consumer!(WORKER_TRIGGER_QUEUE.to_string(), spawn_worker, consume_worker_trigger_messages_from_queue, config); Ok(()) } diff --git a/crates/orchestrator/src/queue/sqs/mod.rs b/crates/orchestrator/src/queue/sqs/mod.rs index 9f0b4872..dc20d21d 100644 --- a/crates/orchestrator/src/queue/sqs/mod.rs +++ b/crates/orchestrator/src/queue/sqs/mod.rs @@ -9,7 +9,10 @@ use omniqueue::{Delivery, QueueError}; use utils::env_utils::get_env_var_or_panic; use crate::queue::job_queue::{ - JOB_HANDLE_FAILURE_QUEUE, JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE, WORKER_TRIGGER_QUEUE, + DATA_SUBMISSION_JOB_PROCESSING_QUEUE, DATA_SUBMISSION_JOB_VERIFICATION_QUEUE, JOB_HANDLE_FAILURE_QUEUE, + PROOF_REGISTRATION_JOB_PROCESSING_QUEUE, PROOF_REGISTRATION_JOB_VERIFICATION_QUEUE, PROVING_JOB_PROCESSING_QUEUE, + PROVING_JOB_VERIFICATION_QUEUE, SNOS_JOB_PROCESSING_QUEUE, SNOS_JOB_VERIFICATION_QUEUE, + UPDATE_STATE_JOB_PROCESSING_QUEUE, UPDATE_STATE_JOB_VERIFICATION_QUEUE, WORKER_TRIGGER_QUEUE, }; use crate::queue::QueueProvider; pub struct SqsQueue; @@ -17,8 +20,16 @@ pub struct SqsQueue; lazy_static! { /// Maps Queue Name to Env var of queue URL. pub static ref QUEUE_NAME_TO_ENV_VAR_MAPPING: HashMap<&'static str, &'static str> = HashMap::from([ - (JOB_PROCESSING_QUEUE, "SQS_JOB_PROCESSING_QUEUE_URL"), - (JOB_VERIFICATION_QUEUE, "SQS_JOB_VERIFICATION_QUEUE_URL"), + (DATA_SUBMISSION_JOB_PROCESSING_QUEUE, "SQS_DATA_SUBMISSION_JOB_PROCESSING_QUEUE_URL"), + (DATA_SUBMISSION_JOB_VERIFICATION_QUEUE, "SQS_DATA_SUBMISSION_JOB_VERIFICATION_QUEUE_URL"), + (PROOF_REGISTRATION_JOB_PROCESSING_QUEUE, "SQS_PROOF_REGISTRATION_JOB_PROCESSING_QUEUE_URL"), + (PROOF_REGISTRATION_JOB_VERIFICATION_QUEUE, "SQS_PROOF_REGISTRATION_JOB_VERIFICATION_QUEUE_URL"), + (PROVING_JOB_PROCESSING_QUEUE, "SQS_PROVING_JOB_PROCESSING_QUEUE_URL"), + (PROVING_JOB_VERIFICATION_QUEUE, "SQS_PROVING_JOB_VERIFICATION_QUEUE_URL"), + (SNOS_JOB_PROCESSING_QUEUE, "SQS_SNOS_JOB_PROCESSING_QUEUE_URL"), + (SNOS_JOB_VERIFICATION_QUEUE, "SQS_SNOS_JOB_VERIFICATION_QUEUE_URL"), + (UPDATE_STATE_JOB_PROCESSING_QUEUE, "SQS_UPDATE_STATE_JOB_PROCESSING_QUEUE_URL"), + (UPDATE_STATE_JOB_VERIFICATION_QUEUE, "SQS_UPDATE_STATE_JOB_VERIFICATION_QUEUE_URL"), (JOB_HANDLE_FAILURE_QUEUE, "SQS_JOB_HANDLE_FAILURE_QUEUE_URL"), (WORKER_TRIGGER_QUEUE, "SQS_WORKER_TRIGGER_QUEUE_URL"), ]); diff --git a/crates/orchestrator/src/tests/common/mod.rs b/crates/orchestrator/src/tests/common/mod.rs index 9c6e4499..336abf46 100644 --- a/crates/orchestrator/src/tests/common/mod.rs +++ b/crates/orchestrator/src/tests/common/mod.rs @@ -21,7 +21,12 @@ use crate::database::mongodb::MongoDb; use crate::jobs::types::JobStatus::Created; use crate::jobs::types::JobType::DataSubmission; use crate::jobs::types::{ExternalId, JobItem}; -use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE}; +use crate::queue::job_queue::{ + DATA_SUBMISSION_JOB_PROCESSING_QUEUE, DATA_SUBMISSION_JOB_VERIFICATION_QUEUE, + PROOF_REGISTRATION_JOB_PROCESSING_QUEUE, PROOF_REGISTRATION_JOB_VERIFICATION_QUEUE, PROVING_JOB_PROCESSING_QUEUE, + PROVING_JOB_VERIFICATION_QUEUE, SNOS_JOB_PROCESSING_QUEUE, SNOS_JOB_VERIFICATION_QUEUE, + UPDATE_STATE_JOB_PROCESSING_QUEUE, UPDATE_STATE_JOB_VERIFICATION_QUEUE, +}; #[fixture] pub fn default_job_item() -> JobItem { @@ -82,8 +87,17 @@ pub async fn create_sqs_queues(provider_config: Arc) -> color_ey } // Creating SQS queues - sqs_client.create_queue().queue_name(JOB_PROCESSING_QUEUE).send().await?; - sqs_client.create_queue().queue_name(JOB_VERIFICATION_QUEUE).send().await?; + sqs_client.create_queue().queue_name(DATA_SUBMISSION_JOB_PROCESSING_QUEUE).send().await?; + sqs_client.create_queue().queue_name(DATA_SUBMISSION_JOB_VERIFICATION_QUEUE).send().await?; + sqs_client.create_queue().queue_name(SNOS_JOB_PROCESSING_QUEUE).send().await?; + sqs_client.create_queue().queue_name(SNOS_JOB_VERIFICATION_QUEUE).send().await?; + sqs_client.create_queue().queue_name(PROVING_JOB_PROCESSING_QUEUE).send().await?; + sqs_client.create_queue().queue_name(PROVING_JOB_VERIFICATION_QUEUE).send().await?; + sqs_client.create_queue().queue_name(PROOF_REGISTRATION_JOB_PROCESSING_QUEUE).send().await?; + sqs_client.create_queue().queue_name(PROOF_REGISTRATION_JOB_VERIFICATION_QUEUE).send().await?; + sqs_client.create_queue().queue_name(UPDATE_STATE_JOB_PROCESSING_QUEUE).send().await?; + sqs_client.create_queue().queue_name(UPDATE_STATE_JOB_VERIFICATION_QUEUE).send().await?; + Ok(()) } diff --git a/crates/orchestrator/src/tests/jobs/mod.rs b/crates/orchestrator/src/tests/jobs/mod.rs index 705240cd..a04b1ce8 100644 --- a/crates/orchestrator/src/tests/jobs/mod.rs +++ b/crates/orchestrator/src/tests/jobs/mod.rs @@ -18,7 +18,9 @@ use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificatio use crate::jobs::{ create_job, handle_job_failure, increment_key_in_metadata, process_job, verify_job, Job, JobError, MockJob, }; -use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE}; +use crate::queue::job_queue::{ + QueueNameForJobType, DATA_SUBMISSION_JOB_PROCESSING_QUEUE, DATA_SUBMISSION_JOB_VERIFICATION_QUEUE, +}; use crate::tests::common::MessagePayloadType; use crate::tests::config::{ConfigType, TestConfigBuilder}; @@ -76,7 +78,7 @@ async fn create_job_job_does_not_exists_in_db_works() { // Queue checks. let consumed_messages = - services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap(); + services.config.queue().consume_message_from_queue(job_item.job_type.process_queue_name()).await.unwrap(); let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); assert_eq!(consumed_message_payload.id, job_item.id); } @@ -94,7 +96,7 @@ async fn create_job_job_exists_in_db_works() { .await; let database_client = services.config.database(); - database_client.create_job(job_item).await.unwrap(); + database_client.create_job(job_item.clone()).await.unwrap(); assert!( create_job(JobType::ProofCreation, "0".to_string(), HashMap::new(), services.config.clone()).await.is_err() @@ -105,7 +107,7 @@ async fn create_job_job_exists_in_db_works() { // Queue checks. let consumed_messages = - services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(job_item.job_type.process_queue_name()).await.unwrap_err(); assert_matches!(consumed_messages, QueueError::NoData); } @@ -125,16 +127,16 @@ async fn create_job_job_handler_is_not_implemented_panics() { let ctx = mock_factory::get_job_handler_context(); ctx.expect().times(1).returning(|_| panic!("Job type not implemented yet.")); - assert!( - create_job(JobType::ProofCreation, "0".to_string(), HashMap::new(), services.config.clone()).await.is_err() - ); + let job_type = JobType::ProofCreation; + + assert!(create_job(job_type.clone(), "0".to_string(), HashMap::new(), services.config.clone()).await.is_err()); // Waiting for 5 secs for message to be passed into the queue sleep(Duration::from_secs(5)).await; // Queue checks. let consumed_messages = - services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(job_type.process_queue_name()).await.unwrap_err(); assert_matches!(consumed_messages, QueueError::NoData); } @@ -184,7 +186,7 @@ async fn process_job_with_job_exists_in_db_and_valid_job_processing_status_works // Queue checks let consumed_messages = - services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap(); + services.config.queue().consume_message_from_queue(job_type.verify_queue_name()).await.unwrap(); let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); assert_eq!(consumed_message_payload.id, job_item.id); } @@ -219,7 +221,7 @@ async fn process_job_with_job_exists_in_db_with_invalid_job_processing_status_er // Queue checks. let consumed_messages = - services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(job_item.job_type.verify_queue_name()).await.unwrap_err(); assert_matches!(consumed_messages, QueueError::NoData); } @@ -245,7 +247,7 @@ async fn process_job_job_does_not_exists_in_db_works() { // Queue checks. let consumed_messages = - services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(job_item.job_type.verify_queue_name()).await.unwrap_err(); assert_matches!(consumed_messages, QueueError::NoData); } @@ -380,11 +382,19 @@ async fn verify_job_with_verified_status_works() { sleep(Duration::from_secs(5)).await; // Queue checks. - let consumed_messages_verification_queue = - services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + let consumed_messages_verification_queue = services + .config + .queue() + .consume_message_from_queue(DATA_SUBMISSION_JOB_VERIFICATION_QUEUE.to_string()) + .await + .unwrap_err(); assert_matches!(consumed_messages_verification_queue, QueueError::NoData); - let consumed_messages_processing_queue = - services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + let consumed_messages_processing_queue = services + .config + .queue() + .consume_message_from_queue(DATA_SUBMISSION_JOB_PROCESSING_QUEUE.to_string()) + .await + .unwrap_err(); assert_matches!(consumed_messages_processing_queue, QueueError::NoData); } @@ -426,8 +436,12 @@ async fn verify_job_with_rejected_status_adds_to_queue_works() { sleep(Duration::from_secs(5)).await; // Queue checks. - let consumed_messages = - services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap(); + let consumed_messages = services + .config + .queue() + .consume_message_from_queue(DATA_SUBMISSION_JOB_PROCESSING_QUEUE.to_string()) + .await + .unwrap(); let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); assert_eq!(consumed_message_payload.id, job_item.id); } @@ -478,7 +492,7 @@ async fn verify_job_with_rejected_status_works() { // Queue checks. let consumed_messages_processing_queue = - services.config.queue().consume_message_from_queue(JOB_PROCESSING_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(job_item.job_type.process_queue_name()).await.unwrap_err(); assert_matches!(consumed_messages_processing_queue, QueueError::NoData); } @@ -524,7 +538,7 @@ async fn verify_job_with_pending_status_adds_to_queue_works() { // Queue checks let consumed_messages = - services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap(); + services.config.queue().consume_message_from_queue(job_item.job_type.verify_queue_name()).await.unwrap(); let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); assert_eq!(consumed_message_payload.id, job_item.id); } @@ -576,7 +590,7 @@ async fn verify_job_with_pending_status_works() { // Queue checks. let consumed_messages_verification_queue = - services.config.queue().consume_message_from_queue(JOB_VERIFICATION_QUEUE.to_string()).await.unwrap_err(); + services.config.queue().consume_message_from_queue(job_item.job_type.verify_queue_name()).await.unwrap_err(); assert_matches!(consumed_messages_verification_queue, QueueError::NoData); } diff --git a/crates/orchestrator/src/tests/workers/proving/mod.rs b/crates/orchestrator/src/tests/workers/proving/mod.rs index 05051e54..24a602b9 100644 --- a/crates/orchestrator/src/tests/workers/proving/mod.rs +++ b/crates/orchestrator/src/tests/workers/proving/mod.rs @@ -15,6 +15,7 @@ use crate::database::MockDatabase; use crate::jobs::job_handler_factory::mock_factory; use crate::jobs::types::{JobItem, JobStatus, JobType}; use crate::jobs::{Job, MockJob}; +use crate::queue::job_queue::PROVING_JOB_PROCESSING_QUEUE; use crate::queue::MockQueueProvider; use crate::tests::config::TestConfigBuilder; use crate::tests::workers::utils::{db_checks_proving_worker, get_job_by_mock_id_vector}; @@ -32,8 +33,6 @@ async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { queue .expect_send_message_to_queue() .returning(|_, _, _| Ok(())) - .withf(|queue, _payload, _delay| queue == JOB_PROCESSING_QUEUE); + .withf(|queue, _payload, _delay| queue == SNOS_JOB_PROCESSING_QUEUE); // mock block number (madara) : 5 let rpc_response_block_number = block; diff --git a/crates/orchestrator/src/tests/workers/update_state/mod.rs b/crates/orchestrator/src/tests/workers/update_state/mod.rs index 355f5fe5..f1f51e2a 100644 --- a/crates/orchestrator/src/tests/workers/update_state/mod.rs +++ b/crates/orchestrator/src/tests/workers/update_state/mod.rs @@ -14,6 +14,7 @@ use crate::database::MockDatabase; use crate::jobs::job_handler_factory::mock_factory; use crate::jobs::types::{JobStatus, JobType}; use crate::jobs::{Job, MockJob}; +use crate::queue::job_queue::UPDATE_STATE_JOB_PROCESSING_QUEUE; use crate::queue::MockQueueProvider; use crate::tests::config::TestConfigBuilder; use crate::tests::workers::utils::{get_job_by_mock_id_vector, get_job_item_mock_by_id}; @@ -33,8 +34,6 @@ async fn test_update_state_worker( let mut db = MockDatabase::new(); let mut queue = MockQueueProvider::new(); - const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; - // Mocking the get_job_handler function. let mut job_handler = MockJob::new(); @@ -102,7 +101,7 @@ async fn test_update_state_worker( queue .expect_send_message_to_queue() .returning(|_, _, _| Ok(())) - .withf(|queue, _payload, _delay| queue == JOB_PROCESSING_QUEUE); + .withf(|queue, _payload, _delay| queue == UPDATE_STATE_JOB_PROCESSING_QUEUE); let provider = JsonRpcClient::new(HttpTransport::new( Url::parse(format!("http://localhost:{}", server.port()).as_str()).expect("Failed to parse URL"), diff --git a/crates/orchestrator/src/workers/data_submission_worker.rs b/crates/orchestrator/src/workers/data_submission_worker.rs index bdba3458..aae03644 100644 --- a/crates/orchestrator/src/workers/data_submission_worker.rs +++ b/crates/orchestrator/src/workers/data_submission_worker.rs @@ -18,7 +18,7 @@ impl Worker for DataSubmissionWorker { // 2. Fetch the latest DA job creation. // 3. Create jobs from after the lastest DA job already created till latest completed proving job. async fn run_worker(&self, config: Arc) -> Result<(), Box> { - tracing::info!(log_type = "starting", category = "DataSubmissionWorker", "DataSubmissionWorker started."); + tracing::trace!(log_type = "starting", category = "DataSubmissionWorker", "DataSubmissionWorker started."); // provides latest completed proof creation job id let latest_proven_job_id = config @@ -71,7 +71,7 @@ impl Worker for DataSubmissionWorker { } } - tracing::info!(log_type = "completed", category = "DataSubmissionWorker", "DataSubmissionWorker completed."); + tracing::trace!(log_type = "completed", category = "DataSubmissionWorker", "DataSubmissionWorker completed."); Ok(()) } } diff --git a/crates/orchestrator/src/workers/proving.rs b/crates/orchestrator/src/workers/proving.rs index 8f0e02db..464e684d 100644 --- a/crates/orchestrator/src/workers/proving.rs +++ b/crates/orchestrator/src/workers/proving.rs @@ -15,7 +15,7 @@ impl Worker for ProvingWorker { /// 1. Fetch all successful SNOS job runs that don't have a proving job /// 2. Create a proving job for each SNOS job run async fn run_worker(&self, config: Arc) -> Result<(), Box> { - tracing::info!(log_type = "starting", category = "ProvingWorker", "ProvingWorker started."); + tracing::trace!(log_type = "starting", category = "ProvingWorker", "ProvingWorker started."); let successful_snos_jobs = config .database() @@ -34,7 +34,7 @@ impl Worker for ProvingWorker { } } - tracing::info!(log_type = "completed", category = "ProvingWorker", "ProvingWorker completed."); + tracing::trace!(log_type = "completed", category = "ProvingWorker", "ProvingWorker completed."); Ok(()) } } diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index 4673b459..b0f11742 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -18,7 +18,7 @@ impl Worker for SnosWorker { /// 2. Fetch the last block that had a SNOS job run. /// 3. Create SNOS run jobs for all the remaining blocks async fn run_worker(&self, config: Arc) -> Result<(), Box> { - tracing::info!(log_type = "starting", category = "SnosWorker", "SnosWorker started."); + tracing::trace!(log_type = "starting", category = "SnosWorker", "SnosWorker started."); let provider = config.starknet_client(); let latest_block_number = provider.block_number().await?; @@ -65,7 +65,7 @@ impl Worker for SnosWorker { for x in latest_block_processed + 1..latest_block_number + 1 { create_job(JobType::SnosRun, x.to_string(), HashMap::new(), config.clone()).await?; } - tracing::info!(log_type = "completed", category = "SnosWorker", "SnosWorker completed."); + tracing::trace!(log_type = "completed", category = "SnosWorker", "SnosWorker completed."); Ok(()) } } diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs index 283c374b..1675eabf 100644 --- a/crates/orchestrator/src/workers/update_state.rs +++ b/crates/orchestrator/src/workers/update_state.rs @@ -17,7 +17,7 @@ impl Worker for UpdateStateWorker { /// 2. Fetch all successful proving jobs covering blocks after the last state update /// 3. Create state updates for all the blocks that don't have a state update job async fn run_worker(&self, config: Arc) -> Result<(), Box> { - tracing::info!(log_type = "starting", category = "UpdateStateWorker", "UpdateStateWorker started."); + tracing::trace!(log_type = "starting", category = "UpdateStateWorker", "UpdateStateWorker started."); let latest_successful_job = config.database().get_latest_job_by_type_and_status(JobType::StateTransition, JobStatus::Completed).await?; @@ -57,7 +57,7 @@ impl Worker for UpdateStateWorker { } } - tracing::info!(log_type = "completed", category = "UpdateStateWorker", "UpdateStateWorker completed."); + tracing::trace!(log_type = "completed", category = "UpdateStateWorker", "UpdateStateWorker completed."); Ok(()) } None => { @@ -90,7 +90,7 @@ impl Worker for UpdateStateWorker { } } - tracing::info!(log_type = "completed", category = "UpdateStateWorker", "UpdateStateWorker completed."); + tracing::trace!(log_type = "completed", category = "UpdateStateWorker", "UpdateStateWorker completed."); Ok(()) } } diff --git a/docker-compose.yml b/docker-compose.yml index 82dbef6b..e82ac9c1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,8 +14,16 @@ services: - DATA_STORAGE=${DATA_STORAGE:-s3} - AWS_S3_BUCKET_NAME=${AWS_S3_BUCKET_NAME} - QUEUE_PROVIDER=${QUEUE_PROVIDER:-sqs} - - SQS_JOB_PROCESSING_QUEUE_URL=${SQS_JOB_PROCESSING_QUEUE_URL} - - SQS_JOB_VERIFICATION_QUEUE_URL=${SQS_JOB_VERIFICATION_QUEUE_URL} + - SQS_SNOS_JOB_PROCESSING_QUEUE_URL=${SQS_SNOS_JOB_PROCESSING_QUEUE_URL} + - SQS_SNOS_JOB_VERIFICATION_QUEUE_URL=${SQS_SNOS_JOB_VERIFICATION_QUEUE_URL} + - SQS_DATA_SUBMISSION_JOB_PROCESSING_QUEUE_URL=${SQS_DATA_SUBMISSION_JOB_PROCESSING_QUEUE_URL} + - SQS_DATA_SUBMISSION_JOB_VERIFICATION_QUEUE_URL=${SQS_DATA_SUBMISSION_JOB_VERIFICATION_QUEUE_URL} + - SQS_PROOF_REGISTRATION_JOB_PROCESSING_QUEUE_URL=${SQS_PROOF_REGISTRATION_JOB_PROCESSING_QUEUE_URL} + - SQS_PROOF_REGISTRATION_JOB_VERIFICATION_QUEUE_URL=${SQS_PROOF_REGISTRATION_JOB_VERIFICATION_QUEUE_URL} + - SQS_PROVING_JOB_PROCESSING_QUEUE_URL=${SQS_PROVING_JOB_PROCESSING_QUEUE_URL} + - SQS_PROVING_JOB_VERIFICATION_QUEUE_URL=${SQS_PROVING_JOB_VERIFICATION_QUEUE_URL} + - SQS_UPDATE_STATE_JOB_PROCESSING_QUEUE_URL=${SQS_UPDATE_STATE_JOB_PROCESSING_QUEUE_URL} + - SQS_UPDATE_STATE_JOB_VERIFICATION_QUEUE_URL=${SQS_UPDATE_STATE_JOB_VERIFICATION_QUEUE_URL} - SQS_JOB_HANDLE_FAILURE_QUEUE_URL=${SQS_JOB_HANDLE_FAILURE_QUEUE_URL} - SQS_WORKER_TRIGGER_QUEUE_URL=${SQS_WORKER_TRIGGER_QUEUE_URL} - ALERTS=${ALERTS:-sns} diff --git a/e2e-tests/src/localstack.rs b/e2e-tests/src/localstack.rs index 405702dd..5252900c 100644 --- a/e2e-tests/src/localstack.rs +++ b/e2e-tests/src/localstack.rs @@ -10,8 +10,11 @@ use orchestrator::config::ProviderConfig; use orchestrator::data_storage::aws_s3::AWSS3; use orchestrator::data_storage::DataStorage; use orchestrator::queue::job_queue::{ - JobQueueMessage, WorkerTriggerMessage, WorkerTriggerType, JOB_HANDLE_FAILURE_QUEUE, JOB_PROCESSING_QUEUE, - JOB_VERIFICATION_QUEUE, WORKER_TRIGGER_QUEUE, + JobQueueMessage, WorkerTriggerMessage, WorkerTriggerType, DATA_SUBMISSION_JOB_PROCESSING_QUEUE, + DATA_SUBMISSION_JOB_VERIFICATION_QUEUE, JOB_HANDLE_FAILURE_QUEUE, PROOF_REGISTRATION_JOB_PROCESSING_QUEUE, + PROOF_REGISTRATION_JOB_VERIFICATION_QUEUE, PROVING_JOB_PROCESSING_QUEUE, PROVING_JOB_VERIFICATION_QUEUE, + SNOS_JOB_PROCESSING_QUEUE, SNOS_JOB_VERIFICATION_QUEUE, UPDATE_STATE_JOB_PROCESSING_QUEUE, + UPDATE_STATE_JOB_VERIFICATION_QUEUE, WORKER_TRIGGER_QUEUE, }; use utils::env_utils::get_env_var_or_panic; use utils::settings::env::EnvSettingsProvider; @@ -56,30 +59,31 @@ impl LocalStack { // Creating SQS queues let mut queue_attributes = HashMap::new(); queue_attributes.insert(VisibilityTimeout, "1".into()); - self.sqs_client - .create_queue() - .queue_name(JOB_PROCESSING_QUEUE) - .set_attributes(Some(queue_attributes.clone())) - .send() - .await?; - self.sqs_client - .create_queue() - .queue_name(JOB_VERIFICATION_QUEUE) - .set_attributes(Some(queue_attributes.clone())) - .send() - .await?; - self.sqs_client - .create_queue() - .queue_name(JOB_HANDLE_FAILURE_QUEUE) - .set_attributes(Some(queue_attributes.clone())) - .send() - .await?; - self.sqs_client - .create_queue() - .queue_name(WORKER_TRIGGER_QUEUE) - .set_attributes(Some(queue_attributes.clone())) - .send() - .await?; + + let queue_names = vec![ + DATA_SUBMISSION_JOB_PROCESSING_QUEUE, + DATA_SUBMISSION_JOB_VERIFICATION_QUEUE, + PROOF_REGISTRATION_JOB_PROCESSING_QUEUE, + PROOF_REGISTRATION_JOB_VERIFICATION_QUEUE, + PROVING_JOB_PROCESSING_QUEUE, + PROVING_JOB_VERIFICATION_QUEUE, + SNOS_JOB_PROCESSING_QUEUE, + SNOS_JOB_VERIFICATION_QUEUE, + UPDATE_STATE_JOB_PROCESSING_QUEUE, + UPDATE_STATE_JOB_VERIFICATION_QUEUE, + JOB_HANDLE_FAILURE_QUEUE, + WORKER_TRIGGER_QUEUE, + ]; + + for queue_name in queue_names { + self.sqs_client + .create_queue() + .queue_name(queue_name) + .set_attributes(Some(queue_attributes.clone())) + .send() + .await?; + } + println!("🌊 SQS queues creation completed."); Ok(()) diff --git a/e2e-tests/tests.rs b/e2e-tests/tests.rs index 7d184eb0..5e3630ed 100644 --- a/e2e-tests/tests.rs +++ b/e2e-tests/tests.rs @@ -279,7 +279,7 @@ pub async fn put_job_data_in_db_snos(mongo_db: &MongoDbServer, l2_block_number: /// as soon as it is picked up by orchestrator pub async fn put_snos_job_in_processing_queue(local_stack: &LocalStack, id: Uuid) -> color_eyre::Result<()> { let message = JobQueueMessage { id }; - local_stack.put_message_in_queue(message, get_env_var_or_panic("SQS_JOB_PROCESSING_QUEUE_URL")).await?; + local_stack.put_message_in_queue(message, get_env_var_or_panic("SQS_SNOS_JOB_PROCESSING_QUEUE_URL")).await?; Ok(()) }