From 6e80ca86aabe0a80aeb11b44a309aa2005ab4784 Mon Sep 17 00:00:00 2001 From: slhmy Date: Sat, 15 Jun 2024 21:57:21 +0800 Subject: [PATCH] Suit new judge report APIs --- judger/src/agent/http.rs | 12 ++- judger/src/agent/platform/mod.rs | 133 +++++++++++++++++++++++++---- judger/src/handler/state.rs | 3 +- judger/src/main.rs | 67 +++++++++++---- judger/src/worker/mod.rs | 141 +++++++++++++++++++------------ 5 files changed, 264 insertions(+), 92 deletions(-) diff --git a/judger/src/agent/http.rs b/judger/src/agent/http.rs index 0b53dcd..27db9ce 100644 --- a/judger/src/agent/http.rs +++ b/judger/src/agent/http.rs @@ -1,3 +1,5 @@ +use reqwest::Url; + pub struct HttpClient { client: reqwest::Client, base_url: String, @@ -9,7 +11,13 @@ impl HttpClient { Self { client, base_url } } - pub fn post(&self, path: String) -> reqwest::RequestBuilder { - self.client.post(format!("{}{}", self.base_url, path)) + pub fn post(&self, path: String) -> Result { + let url = Url::parse(&format!("{}{}", self.base_url, path))?; + Ok(self.client.post(url)) + } + + pub fn put(&self, path: String) -> Result { + let url = Url::parse(&format!("{}{}", self.base_url, path))?; + Ok(self.client.put(url)) } } diff --git a/judger/src/agent/platform/mod.rs b/judger/src/agent/platform/mod.rs index ac7318e..00026f8 100644 --- a/judger/src/agent/platform/mod.rs +++ b/judger/src/agent/platform/mod.rs @@ -1,5 +1,5 @@ use super::http::HttpClient; -use judge_core::{compiler::Language, judge::result::JudgeResultInfo}; +use judge_core::{compiler::Language, judge::result::JudgeVerdict}; pub struct PlatformClient { client: HttpClient, @@ -12,16 +12,41 @@ impl PlatformClient { } } - pub async fn pick_task(&self) -> Result, anyhow::Error> { - pick_task(&self.client).await + pub async fn pick_judge_task(&self) -> Result, anyhow::Error> { + pick_judge_task(&self.client).await } - pub async fn report_task( + pub async fn report_judge_result_count( + &self, + judge_uid: &str, + result_count: usize, + ) -> Result<(), anyhow::Error> { + report_judge_result_count(&self.client, judge_uid, result_count).await + } + + pub async fn report_judge_result( + &self, + judge_uid: &str, + verdict: JudgeVerdict, + time_usage_ms: usize, + memory_usage_bytes: usize, + ) -> Result<(), anyhow::Error> { + report_judge_result( + &self.client, + judge_uid, + verdict, + time_usage_ms, + memory_usage_bytes, + ) + .await + } + + pub async fn report_judge_task( &self, stream_id: &str, - results: Vec, + verdict: JudgeVerdict, ) -> Result<(), anyhow::Error> { - report_task(&self.client, stream_id, results).await + report_task(&self.client, stream_id, verdict).await } } @@ -45,12 +70,16 @@ struct PickTaskResponse { task: JudgeTask, } -async fn pick_task(client: &HttpClient) -> Result, anyhow::Error> { +async fn pick_judge_task(client: &HttpClient) -> Result, anyhow::Error> { let pick_url = "api/v1/judge/task/pick"; let body = PickTaskBody { consumer: "".to_string(), }; - let response = client.post(pick_url.to_string()).json(&body).send().await?; + let response = client + .post(pick_url.to_string())? + .json(&body) + .send() + .await?; match response.status() { reqwest::StatusCode::OK => Ok(Some(response.json::().await?.task)), @@ -66,29 +95,97 @@ async fn pick_task(client: &HttpClient) -> Result, anyhow::Err } #[derive(Serialize)] -struct ReportTaskBody { +struct ReportJudgeResultCountBody { + #[serde(rename = "judgeUID")] + judge_uid: String, + #[serde(rename = "resultCount")] + result_count: usize, +} + +async fn report_judge_result_count( + client: &HttpClient, + judge_uid: &str, + result_count: usize, +) -> Result<(), anyhow::Error> { + let report_url = "api/v1/judge/task/report/result-count"; + let body = ReportJudgeResultCountBody { + judge_uid: judge_uid.to_owned(), + result_count, + }; + let response = client + .put(report_url.to_string())? + .json(&body) + .send() + .await?; + + match response.status() { + reqwest::StatusCode::OK => Ok(()), + _ => Err(anyhow::anyhow!("Report JudgeResultCount Failed")), + } +} + +#[derive(Serialize)] +struct ReportJudgeResultBody { + #[serde(rename = "judgeUID")] + judge_uid: String, + verdict: JudgeVerdict, + #[serde(rename = "timeUsageMS")] + time_usage_ms: usize, + #[serde(rename = "memoryUsageBytes")] + memory_usage_bytes: usize, +} + +async fn report_judge_result( + client: &HttpClient, + judge_uid: &str, + verdict: JudgeVerdict, + time_usage_ms: usize, + memory_usage_bytes: usize, +) -> Result<(), anyhow::Error> { + let report_url = "api/v1/judge/task/report/result"; + let body = ReportJudgeResultBody { + judge_uid: judge_uid.to_owned(), + verdict, + time_usage_ms, + memory_usage_bytes, + }; + let response = client + .post(report_url.to_string())? + .json(&body) + .send() + .await?; + + match response.status() { + reqwest::StatusCode::OK => Ok(()), + _ => Err(anyhow::anyhow!("Report JudgeResult Failed")), + } +} + +#[derive(Serialize)] +struct ReportJudgeTaskBody { consumer: String, - stream_id: String, - verdict_json: String, + #[serde(rename = "redisStreamID")] + redis_stream_id: String, + verdict: JudgeVerdict, } #[derive(Deserialize, Debug)] -struct ReportTaskResponse { +struct ReportJudgeTaskResponse { message: String, } async fn report_task( client: &HttpClient, stream_id: &str, - results: Vec, + verdict: JudgeVerdict, ) -> Result<(), anyhow::Error> { let report_url = "api/v1/judge/task/report"; - let body = ReportTaskBody { + let body = ReportJudgeTaskBody { consumer: "".to_string(), - stream_id: stream_id.to_owned(), - verdict_json: serde_json::to_string(&results).unwrap(), + redis_stream_id: stream_id.to_owned(), + verdict, }; let response = client - .post(report_url.to_string()) + .put(report_url.to_string())? .json(&body) .send() .await?; @@ -97,7 +194,7 @@ async fn report_task( reqwest::StatusCode::OK => { log::debug!( "Report message: {:?}", - response.json::().await?.message + response.json::().await?.message ); Ok(()) } diff --git a/judger/src/handler/state.rs b/judger/src/handler/state.rs index 87e7d3e..8287192 100644 --- a/judger/src/handler/state.rs +++ b/judger/src/handler/state.rs @@ -1,7 +1,6 @@ -use std::sync::RwLock; - use actix_web::{get, web, HttpResponse}; use lazy_static::lazy_static; +use std::sync::RwLock; use crate::error::ServiceError; diff --git a/judger/src/main.rs b/judger/src/main.rs index fa36cfc..c6db332 100644 --- a/judger/src/main.rs +++ b/judger/src/main.rs @@ -8,10 +8,14 @@ mod worker; extern crate serde_derive; extern crate lazy_static; -use std::{fs, path::PathBuf}; +use std::{fs, path::PathBuf, time::Duration}; use actix_web::{App, HttpServer}; use agent::{platform, rclone::RcloneClient}; +use judge_core::judge::{ + result::{JudgeResultInfo, JudgeVerdict}, + JudgeConfig, +}; use option::JudgerCommad; use worker::JudgeWorker; @@ -49,14 +53,17 @@ async fn main() -> std::io::Result<()> { problem_slug, language, src_path, - } => judge( - maybe_rclone_client, - opt.problem_package_bucket, - opt.problem_package_dir, - problem_slug, - language, - src_path, - ), + } => { + judge( + maybe_rclone_client, + opt.problem_package_bucket, + opt.problem_package_dir, + problem_slug, + language, + src_path, + ) + .await + } } } @@ -95,7 +102,7 @@ async fn serve( .await } -fn judge( +async fn judge( maybe_rclone_client: Option, problem_package_bucket: String, problem_package_dir: PathBuf, @@ -126,13 +133,43 @@ fn judge( } }; - match worker.run_judge(problem_slug, language, code) { - Ok(result) => { - println!("{:?}", result); + let prepare_result = worker.prepare_judge(problem_slug.clone(), language, code.clone()); + if prepare_result.is_err() { + log::error!("Failed to prepare judge: {:?}", prepare_result.err()); + return Ok(()); + } + let judge = prepare_result.unwrap(); + + let mut verdict = JudgeVerdict::Accepted; + for idx in 0..judge.testdata_configs.len() { + let judge_config = JudgeConfig { + test_data: judge.testdata_configs[idx].clone(), + program: judge.program_config.clone(), + checker: judge.checker_config.clone(), + runtime: judge.runtime_config.clone(), + }; + + let judge_result = worker.run_judge(judge_config); + let mut result = JudgeResultInfo { + verdict: JudgeVerdict::SystemError, + time_usage: Duration::from_secs(0), + memory_usage_bytes: 0, + exit_status: -1, + checker_exit_status: -1, + }; + match judge_result { + Ok(r) => { + result = r; + } + Err(e) => { + log::debug!("Failed to run judge: {:?}", e); + } } - Err(e) => { - log::error!("Failed to judge task: {:?}", e); + if result.verdict != JudgeVerdict::Accepted { + verdict = result.verdict; + break; } } + println!("{:?}", verdict); Ok(()) } diff --git a/judger/src/worker/mod.rs b/judger/src/worker/mod.rs index c595eff..099310c 100644 --- a/judger/src/worker/mod.rs +++ b/judger/src/worker/mod.rs @@ -61,50 +61,111 @@ impl JudgeWorker { let mut interval = interval(Duration::from_secs(self.interval_sec)); loop { interval.tick().await; - match platform_client.pick_task().await { + match platform_client.pick_judge_task().await { Ok(maybe_task) => { if maybe_task.is_none() { continue; } let task = maybe_task.unwrap(); log::info!("Received task: {:?}", task); - match self.run_judge( + + // TODO: handle failure for set_busy here & return the task to the queue + let _ = state::set_busy(); + + let prepare_result = self.prepare_judge( task.problem_slug.clone(), task.language, task.code.clone(), - ) { - Ok(results) => { - let report_response = platform_client - .report_task(&task.redis_stream_id.clone(), results) - .await; - if report_response.is_err() { - log::debug!( - "Report failed with error: {:?}", - report_response.err() - ); - return; + ); + if let Err(e) = prepare_result { + log::debug!("Failed to prepare judge: {:?}", e); + let mut verdict = JudgeVerdict::SystemError; + if let JudgeCoreError::CompileError(_) = e { + verdict = JudgeVerdict::CompileError; + } + let _ = platform_client + .report_judge_task(&task.redis_stream_id.clone(), verdict) + .await + .map_err(|e| { + log::debug!("Failed to report judge task: {:?}", e); + }); + continue; + } + let judge: JudgeBuilder = prepare_result.unwrap(); + let _ = platform_client + .report_judge_result_count(&task.judge_uid, judge.testdata_configs.len()) + .await + .map_err(|e| { + log::warn!("Failed to report judge result count: {:?}", e); + }); + + let mut verdict = JudgeVerdict::Accepted; + for idx in 0..judge.testdata_configs.len() { + let judge_config = JudgeConfig { + test_data: judge.testdata_configs[idx].clone(), + program: judge.program_config.clone(), + checker: judge.checker_config.clone(), + runtime: judge.runtime_config.clone(), + }; + + let judge_result = self.run_judge(judge_config); + let mut result = JudgeResultInfo { + verdict: JudgeVerdict::SystemError, + time_usage: Duration::from_secs(0), + memory_usage_bytes: 0, + exit_status: -1, + checker_exit_status: -1, + }; + match judge_result { + Ok(r) => { + result = r; + } + Err(e) => { + log::debug!("Failed to run judge: {:?}", e); } - log::info!("judge {:?} report success", task.judge_uid.clone()); } - Err(e) => log::info!("Error judge task: {:?}", e), + + let _ = platform_client + .report_judge_result( + &task.judge_uid, + result.verdict.clone(), + result.time_usage.as_millis() as usize, + result.memory_usage_bytes as usize, + ) + .await + .map_err(|e| { + log::warn!("Failed to report judge result count: {:?}", e); + }); + if result.verdict != JudgeVerdict::Accepted { + verdict = result.verdict; + break; + } } + + let _ = platform_client + .report_judge_task(&task.redis_stream_id.clone(), verdict) + .await + .map_err(|e| { + log::debug!("Failed to report judge task: {:?}", e); + }); + + state::set_idle() } Err(e) => log::debug!("Error sending request: {:?}", e), } } } - pub fn run_judge( + pub fn prepare_judge( &self, problem_slug: String, language: Language, code: String, - ) -> Result, anyhow::Error> { + ) -> Result { if let Some(rclone_client) = self.maybe_rclone_client.as_ref() { rclone_client.sync_bucket(&self.package_bucket, &self.package_dir)?; } - state::set_busy()?; let problem_package_dir = self.package_dir.join(problem_slug); let uuid = uuid::Uuid::new_v4(); @@ -120,49 +181,19 @@ impl JudgeWorker { anyhow::anyhow!("Failed to write src file") })?; - let new_builder_result = JudgeBuilder::new(JudgeBuilderInput { + let builder = JudgeBuilder::new(JudgeBuilderInput { package_type: PackageType::ICPC, package_path: problem_package_dir, runtime_path: runtime_path.clone(), src_language: language, src_path: runtime_path.clone().join(&src_file_name), - }); - if let Err(e) = new_builder_result { - state::set_idle(); - if let JudgeCoreError::CompileError(_) = e { - return Ok(vec![ - JudgeResultInfo { - verdict: JudgeVerdict::CompileError, - time_usage: Duration::new(0, 0), - memory_usage_bytes: -1, - exit_status: -1, - checker_exit_status: -1, - }; - 1 - ]); - } - return Err(anyhow::anyhow!("Failed to create builder: {:?}", e)); - } - let builder = new_builder_result.expect("builder creater error"); + })?; log::debug!("Builder created: {:?}", builder); - let mut results: Vec = vec![]; - for idx in 0..builder.testdata_configs.len() { - let judge_config = JudgeConfig { - test_data: builder.testdata_configs[idx].clone(), - program: builder.program_config.clone(), - checker: builder.checker_config.clone(), - runtime: builder.runtime_config.clone(), - }; - let result = judge::common::run_judge(&judge_config).map_err(|e| { - state::set_idle(); - anyhow::anyhow!("Failed to run judge: {:?}", e) - })?; - log::debug!("Judge result: {:?}", result); - results.push(result); - } + Ok(builder) + } - log::debug!("Judge finished"); - state::set_idle(); - Ok(results) + pub fn run_judge(&self, judge_config: JudgeConfig) -> Result { + judge::common::run_judge(&judge_config) + .map_err(|e| anyhow::anyhow!("Failed to run judge: {:?}", e)) } }