From 021998ae6568f7a27a3e93e2b282ebfde24e2a16 Mon Sep 17 00:00:00 2001 From: KanadeSiina Date: Thu, 18 Jan 2024 19:38:04 +0800 Subject: [PATCH] Judger client (#136) * Prepare for judger client * Add dotenv for config * Set interval in env --- judger/Cargo.toml | 7 + judger/src/client/client.rs | 17 ++ .../src/client/environment/.env.development | 3 + judger/src/client/environment/mod.rs | 34 ++++ judger/src/client/error.rs | 27 +++ judger/src/client/main.rs | 170 ++++++++++++++++++ 6 files changed, 258 insertions(+) create mode 100644 judger/src/client/client.rs create mode 100644 judger/src/client/environment/.env.development create mode 100644 judger/src/client/environment/mod.rs create mode 100644 judger/src/client/error.rs create mode 100644 judger/src/client/main.rs diff --git a/judger/Cargo.toml b/judger/Cargo.toml index c9e3344..6af1cc3 100644 --- a/judger/Cargo.toml +++ b/judger/Cargo.toml @@ -11,6 +11,9 @@ judge-core = { path = "../judge-core" } # CLI clap = { version = "4.0", features = ["derive"] } +# Client +reqwest = { version = "0.11.23", features = ["json"] } + # Async runtime tokio = { version = "1", features = ["full"] } @@ -50,3 +53,7 @@ path ="src/cli/main.rs" name ="judger-server" path ="src/server/main.rs" +[[bin]] +name ="judger-client" +path ="src/client/main.rs" + diff --git a/judger/src/client/client.rs b/judger/src/client/client.rs new file mode 100644 index 0000000..f1fe643 --- /dev/null +++ b/judger/src/client/client.rs @@ -0,0 +1,17 @@ +pub struct HttpClient { + client: reqwest::Client, + base_url: String, +} + +impl HttpClient { + pub fn new(base_url: String) -> Self { + let client = reqwest::Client::new(); + Self { client, base_url } + } + pub fn _get(&self, path: String) -> reqwest::RequestBuilder { + self.client.get(format!("{}{}", self.base_url, path)) + } + pub fn post(&self, path: String) -> reqwest::RequestBuilder { + self.client.post(format!("{}{}", self.base_url, path)) + } +} diff --git a/judger/src/client/environment/.env.development b/judger/src/client/environment/.env.development new file mode 100644 index 0000000..bdbf771 --- /dev/null +++ b/judger/src/client/environment/.env.development @@ -0,0 +1,3 @@ +BASE_URL="http://localhost:8080/api/v1/judge" +INTERVAL=10 +RUST_LOG=DEBUG \ No newline at end of file diff --git a/judger/src/client/environment/mod.rs b/judger/src/client/environment/mod.rs new file mode 100644 index 0000000..efc681f --- /dev/null +++ b/judger/src/client/environment/mod.rs @@ -0,0 +1,34 @@ +use structopt::StructOpt; + +#[derive(StructOpt, Debug, Clone)] +#[structopt(name = "judge-client")] +pub struct JudgeClientOpt { + #[structopt(long)] + pub env_path: Option, + + /// Port to listen to + #[structopt(env = "BASE_URL", default_value = "http://localhost:8080/api/v1/judge")] + pub base_url: String, + + #[structopt(env = "INTERVAL", default_value = "10")] + pub interval: i32, +} + +pub fn load_option() -> JudgeClientOpt { + // First load env_path from Args + let opt = JudgeClientOpt::from_args(); + if let Some(env_path) = opt.env_path { + dotenv::from_path(env_path).ok(); + } else { + dotenv::dotenv().ok(); + } + + // Load opt again with ENV + let opt = JudgeClientOpt::from_args(); + log::debug!("load opt: {:?}", opt); + opt +} + +pub fn setup_logger() { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("debug")).init(); +} diff --git a/judger/src/client/error.rs b/judger/src/client/error.rs new file mode 100644 index 0000000..ef854be --- /dev/null +++ b/judger/src/client/error.rs @@ -0,0 +1,27 @@ +use judge_core::error::JudgeCoreError; + +#[derive(Debug, thiserror::Error)] +pub enum ClientError { + #[error("Internal Error: {0}")] + InternalError(anyhow::Error), + #[error("Pick Error: {0}")] + PickFail(anyhow::Error), + #[error("Report Error: {0}")] + ReportFail(anyhow::Error), + #[error("Reqwest Error: {0}")] + ReqwestError(reqwest::Error), + #[error("Judge Core Error")] + JudgeError(JudgeCoreError), +} + +impl From for ClientError { + fn from(err: reqwest::Error) -> ClientError { + ClientError::ReqwestError(err) + } +} + +impl From for ClientError { + fn from(err: JudgeCoreError) -> ClientError { + ClientError::JudgeError(err) + } +} diff --git a/judger/src/client/main.rs b/judger/src/client/main.rs new file mode 100644 index 0000000..19d2fb7 --- /dev/null +++ b/judger/src/client/main.rs @@ -0,0 +1,170 @@ +mod client; +mod environment; +mod error; + +use client::HttpClient; +use error::ClientError; +use judge_core::judge; +use judge_core::{ + compiler::Language, + judge::builder::{JudgeBuilder, JudgeBuilderInput}, + judge::result::JudgeResultInfo, + judge::JudgeConfig, + package::PackageType, +}; +use serde::{Deserialize, Serialize}; +use std::time::Duration; +use std::{fs, path::PathBuf}; +use tokio::time::interval; + +#[derive(Serialize)] +struct PickBody { + consumer: String, +} +#[derive(Deserialize, Debug)] +struct PickResponse { + task: JudgeTask, +} +#[derive(Serialize)] +struct ReportBody { + consumer: String, + stream_id: String, + verdict_json: String, +} +#[derive(Deserialize, Debug)] +struct ReportResponse { + message: String, +} +#[derive(Deserialize, Debug)] +struct JudgeTask { + #[serde(rename = "submissionUID")] + submission_uid: String, + #[serde(rename = "problemSlug")] + problem_slug: String, + code: String, + language: Language, + #[serde(rename = "redisStreamID")] + redis_stream_id: String, +} + +#[tokio::main] +async fn main() { + let opt = environment::load_option(); + environment::setup_logger(); + let mut interval = interval(Duration::from_secs(10)); + let base_url = opt.base_url; + let client = client::HttpClient::new(base_url); + + loop { + interval.tick().await; + match pick_task(&client).await { + Ok(task) => { + let stream_id = task.redis_stream_id.clone(); + let submission_uid = task.submission_uid.clone(); + log::info!("Received task: {:?}", task); + match run_judge(task) { + Ok(results) => { + let report_response = report_task(&client, &stream_id, results).await; + if report_response.is_err() { + log::debug!("Report failed {:?}", report_response); + return; + } + log::info!("Submission {:?} report success", submission_uid); + } + Err(e) => log::info!("Error judge task: {:?}", e), + } + } + Err(e) => log::debug!("Error sending request: {:?}", e), + } + } +} + +async fn pick_task(client: &HttpClient) -> Result { + let pick_url = "/task/pick"; + let body = PickBody { + consumer: "".to_string(), + }; + let response = client.post(pick_url.to_string()).json(&body).send().await?; + + match response.status() { + reqwest::StatusCode::OK => Ok(response.json::().await?.task), + _ => Err(ClientError::PickFail(anyhow::anyhow!("Queue is empty"))), + } +} + +async fn report_task( + client: &HttpClient, + stream_id: &str, + results: Vec, +) -> Result<(), ClientError> { + let report_url = "/task/report"; + let body = ReportBody { + consumer: "".to_string(), + stream_id: stream_id.to_owned(), + verdict_json: serde_json::to_string(&results).unwrap(), + }; + let response = client + .post(report_url.to_string()) + .json(&body) + .send() + .await?; + + match response.status() { + reqwest::StatusCode::OK => { + log::debug!( + "Report message: {:?}", + response.json::().await?.message + ); + Ok(()) + } + _ => Err(ClientError::ReportFail(anyhow::anyhow!("Report Failed"))), + } +} + +fn run_judge(task: JudgeTask) -> Result, ClientError> { + let problem_package_dir = PathBuf::from("dev-problem-package"); + let problem_slug = task.problem_slug; + let uuid = uuid::Uuid::new_v4(); + let runtime_path = PathBuf::from("/tmp").join(uuid.to_string()); + let src_file_name = format!("src.{}", task.language.get_extension()); + log::debug!("runtime_path: {:?}", runtime_path); + fs::create_dir_all(runtime_path.clone()).map_err(|e| { + log::debug!("Failed to create runtime dir: {:?}", e); + ClientError::InternalError(anyhow::anyhow!("Failed to create runtime dir")) + })?; + fs::write(runtime_path.clone().join(&src_file_name), task.code.clone()).map_err(|e| { + log::debug!("Failed to write src file: {:?}", e); + ClientError::InternalError(anyhow::anyhow!("Failed to write src file")) + })?; + + let new_builder_result = JudgeBuilder::new(JudgeBuilderInput { + package_type: PackageType::ICPC, + package_path: problem_package_dir.join(problem_slug.clone()), + runtime_path: runtime_path.clone(), + src_language: task.language, + src_path: runtime_path.clone().join(&src_file_name), + }); + if new_builder_result.is_err() { + return Err(ClientError::InternalError(anyhow::anyhow!( + "Failed to new builder result: {:?}", + new_builder_result.err() + ))); + } + let builder = new_builder_result?; + log::debug!("Builder created: {:?}", builder); + let mut results: Vec = vec![]; + for idx in 0..builder.testdata_configs.len() { + let judge_config = JudgeConfig { + test_data: builder.testdata_configs[idx].clone(), + program: builder.program_config.clone(), + checker: builder.checker_config.clone(), + runtime: builder.runtime_config.clone(), + }; + let result = judge::common::run_judge(&judge_config)?; + log::debug!("Judge result: {:?}", result); + results.push(result); + } + + log::debug!("Judge finished"); + Ok(results) +}