From 406b5672f2384f554b58a2e856f98bb372b2cbff Mon Sep 17 00:00:00 2001 From: KanadeSiina Date: Wed, 17 Jan 2024 14:32:36 +0000 Subject: [PATCH 1/3] Prepare for judger client --- judger/Cargo.toml | 7 ++ judger/src/client/client.rs | 17 ++++ judger/src/client/error.rs | 27 ++++++ judger/src/client/main.rs | 167 ++++++++++++++++++++++++++++++++++++ 4 files changed, 218 insertions(+) create mode 100644 judger/src/client/client.rs create mode 100644 judger/src/client/error.rs create mode 100644 judger/src/client/main.rs diff --git a/judger/Cargo.toml b/judger/Cargo.toml index 711ca6e..3c8dee4 100644 --- a/judger/Cargo.toml +++ b/judger/Cargo.toml @@ -11,6 +11,9 @@ judge-core = { path = "../judge-core" } # CLI clap = { version = "4.0", features = ["derive"] } +# Client +reqwest = { version = "0.11.23", features = ["json"] } + # Async runtime tokio = { version = "1", features = ["full"] } @@ -50,3 +53,7 @@ path ="src/cli/main.rs" name ="judger-server" path ="src/server/main.rs" +[[bin]] +name ="judger-client" +path ="src/client/main.rs" + diff --git a/judger/src/client/client.rs b/judger/src/client/client.rs new file mode 100644 index 0000000..f1fe643 --- /dev/null +++ b/judger/src/client/client.rs @@ -0,0 +1,17 @@ +pub struct HttpClient { + client: reqwest::Client, + base_url: String, +} + +impl HttpClient { + pub fn new(base_url: String) -> Self { + let client = reqwest::Client::new(); + Self { client, base_url } + } + pub fn _get(&self, path: String) -> reqwest::RequestBuilder { + self.client.get(format!("{}{}", self.base_url, path)) + } + pub fn post(&self, path: String) -> reqwest::RequestBuilder { + self.client.post(format!("{}{}", self.base_url, path)) + } +} diff --git a/judger/src/client/error.rs b/judger/src/client/error.rs new file mode 100644 index 0000000..ef854be --- /dev/null +++ b/judger/src/client/error.rs @@ -0,0 +1,27 @@ +use judge_core::error::JudgeCoreError; + +#[derive(Debug, thiserror::Error)] +pub enum ClientError { + #[error("Internal Error: {0}")] + InternalError(anyhow::Error), + #[error("Pick Error: {0}")] + PickFail(anyhow::Error), + #[error("Report Error: {0}")] + ReportFail(anyhow::Error), + #[error("Reqwest Error: {0}")] + ReqwestError(reqwest::Error), + #[error("Judge Core Error")] + JudgeError(JudgeCoreError), +} + +impl From for ClientError { + fn from(err: reqwest::Error) -> ClientError { + ClientError::ReqwestError(err) + } +} + +impl From for ClientError { + fn from(err: JudgeCoreError) -> ClientError { + ClientError::JudgeError(err) + } +} diff --git a/judger/src/client/main.rs b/judger/src/client/main.rs new file mode 100644 index 0000000..ee7bd7c --- /dev/null +++ b/judger/src/client/main.rs @@ -0,0 +1,167 @@ +mod client; +mod error; +use client::HttpClient; +use error::ClientError; +use judge_core::judge; +use judge_core::{ + compiler::Language, + judge::builder::{JudgeBuilder, JudgeBuilderInput}, + judge::result::JudgeResultInfo, + judge::JudgeConfig, + package::PackageType, +}; +use serde::{Deserialize, Serialize}; +use std::time::Duration; +use std::{fs, path::PathBuf}; +use tokio::time::interval; + +#[derive(Serialize)] +struct PickBody { + consumer: String, +} +#[derive(Deserialize, Debug)] +struct PickResponse { + task: JudgeTask, +} +#[derive(Serialize)] +struct ReportBody { + consumer: String, + stream_id: String, + verdict_json: String, +} +#[derive(Deserialize, Debug)] +struct ReportResponse { + message: String, +} +#[derive(Deserialize, Debug)] +struct JudgeTask { + #[serde(rename = "submissionUID")] + submission_uid: String, + #[serde(rename = "problemSlug")] + problem_slug: String, + code: String, + language: Language, + #[serde(rename = "redisStreamID")] + redis_stream_id: String, +} + +#[tokio::main] +async fn main() { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("debug")).init(); + let mut interval = interval(Duration::from_secs(10)); + let base_url = "http://localhost:8080/api/v1/judge".to_string(); + let client = client::HttpClient::new(base_url); + + loop { + interval.tick().await; + match pick_task(&client).await { + Ok(task) => { + let stream_id = task.redis_stream_id.clone(); + let submission_uid = task.submission_uid.clone(); + log::info!("Received task: {:?}", task); + match run_judge(task) { + Ok(results) => { + let report_response = report_task(&client, &stream_id, results).await; + if report_response.is_err() { + log::debug!("Report failed {:?}", report_response); + return; + } + log::debug!("Submission {:?} report success", submission_uid); + } + Err(e) => log::info!("Error judge task: {:?}", e), + } + } + Err(e) => log::debug!("Error sending request: {:?}", e), + } + } +} + +async fn pick_task(client: &HttpClient) -> Result { + let pick_url = "/task/pick"; + let body = PickBody { + consumer: "".to_string(), + }; + let response = client.post(pick_url.to_string()).json(&body).send().await?; + + match response.status() { + reqwest::StatusCode::OK => Ok(response.json::().await?.task), + _ => Err(ClientError::PickFail(anyhow::anyhow!("Queue is empty"))), + } +} + +async fn report_task( + client: &HttpClient, + stream_id: &str, + results: Vec, +) -> Result<(), ClientError> { + let report_url = "/task/report"; + let body = ReportBody { + consumer: "".to_string(), + stream_id: stream_id.to_owned(), + verdict_json: serde_json::to_string(&results).unwrap(), + }; + let response = client + .post(report_url.to_string()) + .json(&body) + .send() + .await?; + + match response.status() { + reqwest::StatusCode::OK => { + log::debug!( + "Report message: {:?}", + response.json::().await?.message + ); + Ok(()) + } + _ => Err(ClientError::ReportFail(anyhow::anyhow!("Report Failed"))), + } +} + +fn run_judge(task: JudgeTask) -> Result, ClientError> { + let problem_package_dir = PathBuf::from("dev-problem-package"); + let problem_slug = task.problem_slug; + let uuid = uuid::Uuid::new_v4(); + let runtime_path = PathBuf::from("/tmp").join(uuid.to_string()); + let src_file_name = format!("src.{}", task.language.get_extension()); + log::debug!("runtime_path: {:?}", runtime_path); + fs::create_dir_all(runtime_path.clone()).map_err(|e| { + log::debug!("Failed to create runtime dir: {:?}", e); + ClientError::InternalError(anyhow::anyhow!("Failed to create runtime dir")) + })?; + fs::write(runtime_path.clone().join(&src_file_name), task.code.clone()).map_err(|e| { + log::debug!("Failed to write src file: {:?}", e); + ClientError::InternalError(anyhow::anyhow!("Failed to write src file")) + })?; + + let new_builder_result = JudgeBuilder::new(JudgeBuilderInput { + package_type: PackageType::ICPC, + package_path: problem_package_dir.join(problem_slug.clone()), + runtime_path: runtime_path.clone(), + src_language: task.language, + src_path: runtime_path.clone().join(&src_file_name), + }); + if new_builder_result.is_err() { + return Err(ClientError::InternalError(anyhow::anyhow!( + "Failed to new builder result: {:?}", + new_builder_result.err() + ))); + } + let builder = new_builder_result?; + log::debug!("Builder created: {:?}", builder); + let mut results: Vec = vec![]; + for idx in 0..builder.testdata_configs.len() { + let judge_config = JudgeConfig { + test_data: builder.testdata_configs[idx].clone(), + program: builder.program_config.clone(), + checker: builder.checker_config.clone(), + runtime: builder.runtime_config.clone(), + }; + let result = judge::common::run_judge(&judge_config)?; + log::debug!("Judge result: {:?}", result); + results.push(result); + } + + log::debug!("Judge finished"); + Ok(results) +} From 90e55bf60d60dd620721209b67850c9a0411345a Mon Sep 17 00:00:00 2001 From: KanadeSiina Date: Thu, 18 Jan 2024 06:24:26 +0000 Subject: [PATCH 2/3] Add dotenv for config --- .../src/client/environment/.env.development | 2 ++ judger/src/client/environment/mod.rs | 31 +++++++++++++++++++ judger/src/client/main.rs | 9 ++++-- 3 files changed, 39 insertions(+), 3 deletions(-) create mode 100644 judger/src/client/environment/.env.development create mode 100644 judger/src/client/environment/mod.rs diff --git a/judger/src/client/environment/.env.development b/judger/src/client/environment/.env.development new file mode 100644 index 0000000..c85ad28 --- /dev/null +++ b/judger/src/client/environment/.env.development @@ -0,0 +1,2 @@ +BASE_URL="http://localhost:8080/api/v1/judge" +RUST_LOG=DEBUG \ No newline at end of file diff --git a/judger/src/client/environment/mod.rs b/judger/src/client/environment/mod.rs new file mode 100644 index 0000000..152288f --- /dev/null +++ b/judger/src/client/environment/mod.rs @@ -0,0 +1,31 @@ +use structopt::StructOpt; + +#[derive(StructOpt, Debug, Clone)] +#[structopt(name = "judge-client")] +pub struct JudgeClientOpt { + #[structopt(long)] + pub env_path: Option, + + /// Port to listen to + #[structopt(env = "BASE_URL", default_value = "http://localhost:8080/api/v1/judge")] + pub base_url: String, +} + +pub fn load_option() -> JudgeClientOpt { + // First load env_path from Args + let opt = JudgeClientOpt::from_args(); + if let Some(env_path) = opt.env_path { + dotenv::from_path(env_path).ok(); + } else { + dotenv::dotenv().ok(); + } + + // Load opt again with ENV + let opt = JudgeClientOpt::from_args(); + log::debug!("load opt: {:?}", opt); + opt +} + +pub fn setup_logger() { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("debug")).init(); +} diff --git a/judger/src/client/main.rs b/judger/src/client/main.rs index ee7bd7c..19d2fb7 100644 --- a/judger/src/client/main.rs +++ b/judger/src/client/main.rs @@ -1,5 +1,7 @@ mod client; +mod environment; mod error; + use client::HttpClient; use error::ClientError; use judge_core::judge; @@ -47,9 +49,10 @@ struct JudgeTask { #[tokio::main] async fn main() { - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("debug")).init(); + let opt = environment::load_option(); + environment::setup_logger(); let mut interval = interval(Duration::from_secs(10)); - let base_url = "http://localhost:8080/api/v1/judge".to_string(); + let base_url = opt.base_url; let client = client::HttpClient::new(base_url); loop { @@ -66,7 +69,7 @@ async fn main() { log::debug!("Report failed {:?}", report_response); return; } - log::debug!("Submission {:?} report success", submission_uid); + log::info!("Submission {:?} report success", submission_uid); } Err(e) => log::info!("Error judge task: {:?}", e), } From 0c356b04942abf318e62898ec3402598ad26bfee Mon Sep 17 00:00:00 2001 From: KanadeSiina Date: Thu, 18 Jan 2024 10:18:38 +0000 Subject: [PATCH 3/3] Set interval in env --- judger/src/client/environment/.env.development | 1 + judger/src/client/environment/mod.rs | 3 +++ 2 files changed, 4 insertions(+) diff --git a/judger/src/client/environment/.env.development b/judger/src/client/environment/.env.development index c85ad28..bdbf771 100644 --- a/judger/src/client/environment/.env.development +++ b/judger/src/client/environment/.env.development @@ -1,2 +1,3 @@ BASE_URL="http://localhost:8080/api/v1/judge" +INTERVAL=10 RUST_LOG=DEBUG \ No newline at end of file diff --git a/judger/src/client/environment/mod.rs b/judger/src/client/environment/mod.rs index 152288f..efc681f 100644 --- a/judger/src/client/environment/mod.rs +++ b/judger/src/client/environment/mod.rs @@ -9,6 +9,9 @@ pub struct JudgeClientOpt { /// Port to listen to #[structopt(env = "BASE_URL", default_value = "http://localhost:8080/api/v1/judge")] pub base_url: String, + + #[structopt(env = "INTERVAL", default_value = "10")] + pub interval: i32, } pub fn load_option() -> JudgeClientOpt {