From 10231736dd86c3101527acfd6cf9f421597b2d8d Mon Sep 17 00:00:00 2001 From: slhmy Date: Sat, 23 Mar 2024 16:24:42 +0800 Subject: [PATCH] Refactor judger code --- data/.gitignore | 3 +- ...minio.conf.example => default-rclone.conf} | 4 +- .../server/environment => }/.env.development | 0 judger/Cargo.toml | 10 +- .../client/httpclient.rs => agent/http.rs} | 4 +- judger/src/agent/mod.rs | 3 + judger/src/agent/platform/mod.rs | 99 ++++++++++ judger/src/agent/rclone.rs | 41 ++++ judger/src/cli/main.rs | 108 ----------- judger/src/{server/environment => env}/mod.rs | 21 ++- judger/src/{server => }/error.rs | 2 - .../src/{server/service => handler}/greet.rs | 0 judger/src/{server/service => handler}/mod.rs | 6 - .../src/{server/service => handler}/state.rs | 0 judger/src/lib.rs | 1 - judger/src/main.rs | 52 ++++++ judger/src/server/client/mod.rs | 175 ------------------ judger/src/server/main.rs | 47 ----- judger/src/server/service/judge.rs | 112 ----------- judger/src/service/error.rs | 29 --- judger/src/service/mod.rs | 2 - .../service/package_manager/description.rs | 100 ---------- judger/src/service/package_manager/mod.rs | 78 -------- judger/src/service/package_manager/package.rs | 51 ----- judger/src/worker/mod.rs | 135 ++++++++++++++ judger/tests/package_test.rs | 48 ----- judger/tests/temp/judge-pd.json | 7 - 27 files changed, 349 insertions(+), 789 deletions(-) rename data/{rclone-minio.conf.example => default-rclone.conf} (62%) rename judger/{src/server/environment => }/.env.development (100%) rename judger/src/{server/client/httpclient.rs => agent/http.rs} (72%) create mode 100644 judger/src/agent/mod.rs create mode 100644 judger/src/agent/platform/mod.rs create mode 100644 judger/src/agent/rclone.rs delete mode 100644 judger/src/cli/main.rs rename judger/src/{server/environment => env}/mod.rs (55%) rename judger/src/{server => }/error.rs (97%) rename judger/src/{server/service => handler}/greet.rs (100%) rename judger/src/{server/service => handler}/mod.rs (80%) rename judger/src/{server/service => handler}/state.rs (100%) delete mode 100644 judger/src/lib.rs create mode 100644 judger/src/main.rs delete mode 100644 judger/src/server/client/mod.rs delete mode 100644 judger/src/server/main.rs delete mode 100644 judger/src/server/service/judge.rs delete mode 100644 judger/src/service/error.rs delete mode 100644 judger/src/service/mod.rs delete mode 100644 judger/src/service/package_manager/description.rs delete mode 100644 judger/src/service/package_manager/mod.rs delete mode 100644 judger/src/service/package_manager/package.rs create mode 100644 judger/src/worker/mod.rs delete mode 100644 judger/tests/package_test.rs delete mode 100644 judger/tests/temp/judge-pd.json diff --git a/data/.gitignore b/data/.gitignore index 28ceaa7..a4fd041 100644 --- a/data/.gitignore +++ b/data/.gitignore @@ -1,2 +1 @@ -rclone-minio.conf -rclone-problem-package/ +problem-package/ diff --git a/data/rclone-minio.conf.example b/data/default-rclone.conf similarity index 62% rename from data/rclone-minio.conf.example rename to data/default-rclone.conf index 8f688fc..2919342 100644 --- a/data/rclone-minio.conf.example +++ b/data/default-rclone.conf @@ -2,8 +2,8 @@ type = s3 provider = Minio env_auth = false -access_key_id = YOUR_ACCESS_KEY -secret_access_key = YOUR_SECRET_KEY +access_key_id = minio-root-user +secret_access_key = minio-root-password endpoint = http://127.0.0.1:9000 location_constraint = acl = private \ No newline at end of file diff --git a/judger/src/server/environment/.env.development b/judger/.env.development similarity index 100% rename from judger/src/server/environment/.env.development rename to judger/.env.development diff --git a/judger/Cargo.toml b/judger/Cargo.toml index 009119d..e0e3e4e 100644 --- a/judger/Cargo.toml +++ b/judger/Cargo.toml @@ -43,12 +43,4 @@ chrono = { version = "0.4", features = ["serde"] } anyhow = "1" thiserror = "1" -uuid = { version = "1.4", features = ["serde", "v4"] } - -[[bin]] -name ="judger-cli" -path ="src/cli/main.rs" - -[[bin]] -name ="judger-server" -path ="src/server/main.rs" +uuid = { version = "1.4", features = ["serde", "v4"] } \ No newline at end of file diff --git a/judger/src/server/client/httpclient.rs b/judger/src/agent/http.rs similarity index 72% rename from judger/src/server/client/httpclient.rs rename to judger/src/agent/http.rs index f1fe643..0b53dcd 100644 --- a/judger/src/server/client/httpclient.rs +++ b/judger/src/agent/http.rs @@ -8,9 +8,7 @@ impl HttpClient { let client = reqwest::Client::new(); Self { client, base_url } } - pub fn _get(&self, path: String) -> reqwest::RequestBuilder { - self.client.get(format!("{}{}", self.base_url, path)) - } + pub fn post(&self, path: String) -> reqwest::RequestBuilder { self.client.post(format!("{}{}", self.base_url, path)) } diff --git a/judger/src/agent/mod.rs b/judger/src/agent/mod.rs new file mode 100644 index 0000000..0488229 --- /dev/null +++ b/judger/src/agent/mod.rs @@ -0,0 +1,3 @@ +pub mod http; +pub mod rclone; +pub mod platform; diff --git a/judger/src/agent/platform/mod.rs b/judger/src/agent/platform/mod.rs new file mode 100644 index 0000000..179d061 --- /dev/null +++ b/judger/src/agent/platform/mod.rs @@ -0,0 +1,99 @@ +use super::http::HttpClient; +use judge_core::{compiler::Language, judge::result::JudgeResultInfo}; + +pub struct PlatformClient { + client: HttpClient, +} + +impl PlatformClient { + pub fn new(base_url: String) -> Self { + Self { + client: HttpClient::new(base_url), + } + } + + pub async fn pick_task(&self) -> Result { + pick_task(&self.client).await + } + + pub async fn report_task( + &self, + stream_id: &str, + results: Vec, + ) -> Result<(), anyhow::Error> { + report_task(&self.client, stream_id, results).await + } +} + +#[derive(Deserialize, Debug)] +pub struct JudgeTask { + #[serde(rename = "submissionUID")] + pub submission_uid: String, + #[serde(rename = "problemSlug")] + pub problem_slug: String, + pub code: String, + pub language: Language, + #[serde(rename = "redisStreamID")] + pub redis_stream_id: String, +} +#[derive(Serialize)] +struct PickTaskBody { + consumer: String, +} +#[derive(Deserialize, Debug)] +struct PickTaskResponse { + task: JudgeTask, +} + +async fn pick_task(client: &HttpClient) -> Result { + let pick_url = "/api/v1/judge/task/pick"; + let body = PickTaskBody { + consumer: "".to_string(), + }; + let response = client.post(pick_url.to_string()).json(&body).send().await?; + + match response.status() { + reqwest::StatusCode::OK => Ok(response.json::().await?.task), + _ => Err(anyhow::anyhow!("Queue is empty")), + } +} + +#[derive(Serialize)] +struct ReportTaskBody { + consumer: String, + stream_id: String, + verdict_json: String, +} +#[derive(Deserialize, Debug)] +struct ReportTaskResponse { + message: String, +} + +async fn report_task( + client: &HttpClient, + stream_id: &str, + results: Vec, +) -> Result<(), anyhow::Error> { + let report_url = "/api/v1/judge/task/report"; + let body = ReportTaskBody { + consumer: "".to_string(), + stream_id: stream_id.to_owned(), + verdict_json: serde_json::to_string(&results).unwrap(), + }; + let response = client + .post(report_url.to_string()) + .json(&body) + .send() + .await?; + + match response.status() { + reqwest::StatusCode::OK => { + log::debug!( + "Report message: {:?}", + response.json::().await?.message + ); + Ok(()) + } + _ => Err(anyhow::anyhow!("Report Failed")), + } +} diff --git a/judger/src/agent/rclone.rs b/judger/src/agent/rclone.rs new file mode 100644 index 0000000..35d4d79 --- /dev/null +++ b/judger/src/agent/rclone.rs @@ -0,0 +1,41 @@ +use anyhow::{self, Error}; +use std::path::PathBuf; +use std::process::Command; + +pub struct RcloneClient { + config_path: PathBuf, +} + +impl RcloneClient { + pub fn new(config_path: PathBuf) -> Self { + Self { config_path } + } + + pub fn is_avaliable(&self) -> bool { + let status = Command::new("rclone") + .arg("--config") + .arg(format!("{}", self.config_path.to_string_lossy())) + .arg("ls") + .arg("minio:") + .status() + .expect("Failed to rclone"); + + status.success() + } + + pub fn sync_bucket(&self, bucket_name: &str, target_dir: &PathBuf) -> Result<(), Error> { + let status = Command::new("rclone") + .arg("--config") + .arg(format!("{}", self.config_path.to_string_lossy())) + .arg("sync") + .arg(format!("minio:{}", bucket_name)) + .arg(format!("{}", target_dir.to_string_lossy())) + .status() + .expect("Failed to rclone"); + if status.success() { + Ok(()) + } else { + Err(anyhow::anyhow!("rclone sync failed, please check config.")) + } + } +} diff --git a/judger/src/cli/main.rs b/judger/src/cli/main.rs deleted file mode 100644 index d020e70..0000000 --- a/judger/src/cli/main.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::path::PathBuf; - -use clap::{Parser, Subcommand}; -use judge_core::{ - compiler::{Compiler, Language}, - judge::builder::{JudgeBuilder, JudgeBuilderInput}, - judge::{common::run_judge, JudgeConfig}, - package::PackageType, -}; - -#[derive(Parser)] -#[command(author, version, about, long_about = None)] -struct Cli { - #[command(subcommand)] - command: Option, -} - -#[derive(Subcommand)] -enum Commands { - /// Compile a single file source code into an executable - Compile { - /// Path of the src file - #[arg(short, long)] - source: String, - /// Path to place the compiled executable - #[arg(short, long)] - target: String, - #[arg(short, long)] - /// Supported are: rust | cpp | python - language: Language, - }, - /// Run a batch of judges with specified problem package and src input - BatchJudge { - /// Path of the testing src file - #[arg(short, long)] - source: PathBuf, - /// Supported are: rust | cpp | python - #[arg(short = 'l', long)] - source_language: Language, - /// Path of the problem package to run the judge - #[arg(short, long)] - package: PathBuf, - /// Supported are: icpc - #[arg(short = 't', long)] - package_type: PackageType, - /// Path to run and store the runtime files - #[arg(short, long, default_value = "/tmp")] - runtime_path: PathBuf, - }, -} - -fn main() { - // TODO: use some flags to control weather log is printed - // let _ = env_logger::builder() - // .filter_level(log::LevelFilter::Debug) - // .try_init(); - let cli = Cli::parse(); - match cli.command { - Some(Commands::Compile { - source, - target, - language, - }) => { - let compiler = Compiler::new(language, vec!["-std=c++17".to_string()]); - let output = compiler.compile(&PathBuf::from(source), &PathBuf::from(target)); - println!("{:?}", output) - } - Some(Commands::BatchJudge { - source, - source_language, - package, - package_type, - runtime_path, - }) => { - let new_builder_result = JudgeBuilder::new(JudgeBuilderInput { - package_type, - package_path: package, - runtime_path, - src_language: source_language, - src_path: source, - }); - if new_builder_result.is_err() { - println!( - "Failed to new builder result: {:?}", - new_builder_result.err() - ); - return; - } - let builder = new_builder_result.unwrap(); - println!("Builder created: {:?}", builder); - for idx in 0..builder.testdata_configs.len() { - let judge_config = JudgeConfig { - test_data: builder.testdata_configs[idx].clone(), - program: builder.program_config.clone(), - checker: builder.checker_config.clone(), - runtime: builder.runtime_config.clone(), - }; - let result = run_judge(&judge_config); - println!("Judge result: {:?}", result); - } - - println!("BatchJudge finished") - } - None => { - println!("Please specify a COMMAND, use --help to see more") - } - } -} diff --git a/judger/src/server/environment/mod.rs b/judger/src/env/mod.rs similarity index 55% rename from judger/src/server/environment/mod.rs rename to judger/src/env/mod.rs index acdcc4c..99db343 100644 --- a/judger/src/server/environment/mod.rs +++ b/judger/src/env/mod.rs @@ -3,8 +3,9 @@ use std::path::PathBuf; use structopt::StructOpt; #[derive(StructOpt, Debug, Clone)] -#[structopt(name = "judge-server")] +#[structopt(name = "judger")] pub struct JudgeServerOpt { + /// For loading Opt from .env file #[structopt(long)] pub env_path: Option, @@ -12,14 +13,20 @@ pub struct JudgeServerOpt { #[structopt(env = "PORT", default_value = "8080")] pub port: u16, - #[structopt(long, default_value = "data/dev-problem-package")] + // TODO: make rclone optional + #[structopt(long, default_value = "data/default-rclone.conf")] + pub rclone_config: PathBuf, + #[structopt(long, default_value = "oj-lab-problem-package")] + pub problem_package_bucket: String, + /// Where to store problem package + #[structopt(long, default_value = "data/problem-package")] pub problem_package_dir: PathBuf, - #[structopt(env = "BASE_URL", default_value = "http://localhost:8080/api/v1/judge")] - pub base_url: String, - - #[structopt(env = "INTERVAL", default_value = "10")] - pub interval: i32, + #[structopt(env = "PLATFORM_URI", default_value = "http://localhost:8080/")] + pub platform_uri: String, + /// Interval to fetch task in seconds + #[structopt(env = "FETCH_TASK_INTERVAL", default_value = "10")] + pub fetch_task_interval: u64, } pub fn load_option() -> JudgeServerOpt { diff --git a/judger/src/server/error.rs b/judger/src/error.rs similarity index 97% rename from judger/src/server/error.rs rename to judger/src/error.rs index e9de4bc..7da2114 100644 --- a/judger/src/server/error.rs +++ b/judger/src/error.rs @@ -2,7 +2,6 @@ use actix_web::{HttpResponse, ResponseError}; use judge_core::error::JudgeCoreError; -use judger::service::error::JudgeServiceError; #[derive(Debug, thiserror::Error)] pub enum ServiceError { @@ -23,7 +22,6 @@ pub enum ServiceError { #[derive(Debug)] pub enum ClientError { InternalError(anyhow::Error), - PackageError(JudgeServiceError), } #[derive(Serialize)] diff --git a/judger/src/server/service/greet.rs b/judger/src/handler/greet.rs similarity index 100% rename from judger/src/server/service/greet.rs rename to judger/src/handler/greet.rs diff --git a/judger/src/server/service/mod.rs b/judger/src/handler/mod.rs similarity index 80% rename from judger/src/server/service/mod.rs rename to judger/src/handler/mod.rs index ae4878b..6eabb17 100644 --- a/judger/src/server/service/mod.rs +++ b/judger/src/handler/mod.rs @@ -1,5 +1,4 @@ mod greet; -mod judge; pub mod state; use actix_web::web; @@ -15,7 +14,6 @@ pub struct ApiDoc; pub fn route(cfg: &mut web::ServiceConfig) { cfg.service( web::scope("/api/v1") - .configure(judge::route) .service(greet::greet) .configure(state::route), ) @@ -25,10 +23,6 @@ pub fn route(cfg: &mut web::ServiceConfig) { utoipa_swagger_ui::Url::new("root", "/api-docs/openapi.json"), ApiDoc::openapi(), ), - ( - utoipa_swagger_ui::Url::new("judge", "/api-docs/judge.json"), - judge::JudgeApiDoc::openapi(), - ), ( utoipa_swagger_ui::Url::new("state", "/api-docs/state.json"), state::StateApiDoc::openapi(), diff --git a/judger/src/server/service/state.rs b/judger/src/handler/state.rs similarity index 100% rename from judger/src/server/service/state.rs rename to judger/src/handler/state.rs diff --git a/judger/src/lib.rs b/judger/src/lib.rs deleted file mode 100644 index 1f278a4..0000000 --- a/judger/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod service; diff --git a/judger/src/main.rs b/judger/src/main.rs new file mode 100644 index 0000000..6a87066 --- /dev/null +++ b/judger/src/main.rs @@ -0,0 +1,52 @@ +mod agent; +mod env; +mod error; +mod handler; +mod worker; + +#[macro_use] +extern crate serde_derive; +extern crate lazy_static; + +use actix_web::{web::Data, App, HttpServer}; +use worker::JudgeWorker; + +#[actix_web::main] // or #[tokio::main] +async fn main() -> std::io::Result<()> { + let opt = env::load_option(); + env::setup_logger(); + + // TODO: Send heartbeat here to a remote host + + let worker = match JudgeWorker::new( + opt.platform_uri, + opt.fetch_task_interval, + opt.rclone_config, + opt.problem_package_bucket.clone(), + opt.problem_package_dir.clone(), + ) { + Ok(maybe_worker) => { + if let Some(worker) = maybe_worker { + worker + } else { + log::error!("Failed to create worker"); + return Ok(()); + } + } + Err(e) => { + log::error!("Failed to create worker: {:?}", e); + return Ok(()); + } + }; + tokio::spawn(async move { worker.run().await }); + + HttpServer::new(move || { + App::new() + .wrap(actix_web::middleware::Logger::default()) + .app_data(Data::new(opt.problem_package_dir.clone())) + .configure(handler::route) + }) + .bind(("0.0.0.0", opt.port))? + .run() + .await +} diff --git a/judger/src/server/client/mod.rs b/judger/src/server/client/mod.rs deleted file mode 100644 index 2600431..0000000 --- a/judger/src/server/client/mod.rs +++ /dev/null @@ -1,175 +0,0 @@ -mod httpclient; -use crate::error::ClientError; -use crate::service::state; -use httpclient::HttpClient; -use judge_core::judge; -use judge_core::{ - compiler::Language, - judge::builder::{JudgeBuilder, JudgeBuilderInput}, - judge::result::JudgeResultInfo, - judge::JudgeConfig, - package::PackageType, -}; -use judger::service::package_manager::package; -use serde::{Deserialize, Serialize}; -use std::time::Duration; -use std::{fs, path::PathBuf}; -use tokio::time::interval; - -#[derive(Serialize)] -struct PickBody { - consumer: String, -} -#[derive(Deserialize, Debug)] -struct PickResponse { - task: JudgeTask, -} -#[derive(Serialize)] -struct ReportBody { - consumer: String, - stream_id: String, - verdict_json: String, -} -#[derive(Deserialize, Debug)] -struct ReportResponse { - message: String, -} -#[derive(Deserialize, Debug)] -struct JudgeTask { - #[serde(rename = "submissionUID")] - submission_uid: String, - #[serde(rename = "problemSlug")] - problem_slug: String, - code: String, - language: Language, - #[serde(rename = "redisStreamID")] - redis_stream_id: String, -} - -pub async fn run_client(base_url: String, interval_sec: u64) { - let mut interval = interval(Duration::from_secs(interval_sec)); - let client = HttpClient::new(base_url); - - loop { - interval.tick().await; - match pick_task(&client).await { - Ok(task) => { - let stream_id = task.redis_stream_id.clone(); - let submission_uid = task.submission_uid.clone(); - log::info!("Received task: {:?}", task); - match run_judge(task) { - Ok(results) => { - let report_response = report_task(&client, &stream_id, results).await; - if report_response.is_err() { - log::debug!("Report failed {:?}", report_response); - return; - } - log::info!("Submission {:?} report success", submission_uid); - } - Err(e) => log::info!("Error judge task: {:?}", e), - } - } - Err(e) => log::debug!("Error sending request: {:?}", e), - } - } -} - -async fn pick_task(client: &HttpClient) -> Result { - let pick_url = "/task/pick"; - let body = PickBody { - consumer: "".to_string(), - }; - let response = client.post(pick_url.to_string()).json(&body).send().await?; - - match response.status() { - reqwest::StatusCode::OK => Ok(response.json::().await?.task), - _ => Err(ClientError::InternalError(anyhow::anyhow!( - "Queue is empty" - ))), - } -} - -async fn report_task( - client: &HttpClient, - stream_id: &str, - results: Vec, -) -> Result<(), ClientError> { - let report_url = "/task/report"; - let body = ReportBody { - consumer: "".to_string(), - stream_id: stream_id.to_owned(), - verdict_json: serde_json::to_string(&results).unwrap(), - }; - let response = client - .post(report_url.to_string()) - .json(&body) - .send() - .await?; - - match response.status() { - reqwest::StatusCode::OK => { - log::debug!( - "Report message: {:?}", - response.json::().await?.message - ); - Ok(()) - } - _ => Err(ClientError::InternalError(anyhow::anyhow!("Report Failed"))), - } -} - -fn run_judge(task: JudgeTask) -> Result, ClientError> { - if let Err(sync_err) = package::sync_package(&PathBuf::from("data"), "oj-lab-problem-package") { - return Err(ClientError::PackageError(sync_err)); - }; - if let Err(set_err) = state::set_busy() { - return Err(ClientError::InternalError(set_err)); - } - let problem_package_dir = PathBuf::from(format!("data/{}", package::PACKAGE_SAVE_DIRNAME)); - let problem_slug = task.problem_slug; - let uuid = uuid::Uuid::new_v4(); - let runtime_path = PathBuf::from("/tmp").join(uuid.to_string()); - let src_file_name = format!("src.{}", task.language.get_extension()); - log::debug!("runtime_path: {:?}", runtime_path); - fs::create_dir_all(runtime_path.clone()).map_err(|e| { - log::debug!("Failed to create runtime dir: {:?}", e); - ClientError::InternalError(anyhow::anyhow!("Failed to create runtime dir")) - })?; - fs::write(runtime_path.clone().join(&src_file_name), task.code.clone()).map_err(|e| { - log::debug!("Failed to write src file: {:?}", e); - ClientError::InternalError(anyhow::anyhow!("Failed to write src file")) - })?; - - let new_builder_result = JudgeBuilder::new(JudgeBuilderInput { - package_type: PackageType::ICPC, - package_path: problem_package_dir.join(problem_slug.clone()), - runtime_path: runtime_path.clone(), - src_language: task.language, - src_path: runtime_path.clone().join(&src_file_name), - }); - if new_builder_result.is_err() { - state::set_idle(); - return Err(ClientError::InternalError(anyhow::anyhow!( - "Failed to new builder result: {:?}", - new_builder_result.err() - ))); - } - let builder = new_builder_result?; - log::debug!("Builder created: {:?}", builder); - let mut results: Vec = vec![]; - for idx in 0..builder.testdata_configs.len() { - let judge_config = JudgeConfig { - test_data: builder.testdata_configs[idx].clone(), - program: builder.program_config.clone(), - checker: builder.checker_config.clone(), - runtime: builder.runtime_config.clone(), - }; - let result = judge::common::run_judge(&judge_config)?; - log::debug!("Judge result: {:?}", result); - results.push(result); - } - - log::debug!("Judge finished"); - state::set_idle(); - Ok(results) -} diff --git a/judger/src/server/main.rs b/judger/src/server/main.rs deleted file mode 100644 index 1588b65..0000000 --- a/judger/src/server/main.rs +++ /dev/null @@ -1,47 +0,0 @@ -mod client; -mod environment; -mod error; -mod service; - -#[macro_use] -extern crate serde_derive; -extern crate lazy_static; - -use actix_web::{web::Data, App, HttpServer}; -use utoipa::OpenApi; - -#[actix_web::main] // or #[tokio::main] -async fn main() -> std::io::Result<()> { - let opt = environment::load_option(); - environment::setup_logger(); - - // Suppose to send heartbeat here to a remote host - // tokio::spawn(async move { - // loop { - // tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - // log::debug!("JudgeSever heartbeat") - // } - // }); - - tokio::spawn(async move { - client::run_client(opt.base_url, opt.interval as u64).await; - }); - - let port = opt.port; - - HttpServer::new(move || { - App::new() - .wrap(actix_web::middleware::Logger::default()) - .app_data(Data::new(opt.problem_package_dir.clone())) - .configure(service::route) - .service( - utoipa_swagger_ui::SwaggerUi::new("/swagger-ui/{_:.*}").urls(vec![( - utoipa_swagger_ui::Url::new("api", "/api-docs/openapi.json"), - service::ApiDoc::openapi(), - )]), - ) - }) - .bind(("0.0.0.0", port))? - .run() - .await -} diff --git a/judger/src/server/service/judge.rs b/judger/src/server/service/judge.rs deleted file mode 100644 index 4b576c1..0000000 --- a/judger/src/server/service/judge.rs +++ /dev/null @@ -1,112 +0,0 @@ -use std::{fs, path::PathBuf}; - -use crate::{error::ServiceError, service::state}; -use actix_web::{post, web, HttpResponse}; - -use judge_core::{ - compiler::Language, - error::JudgeCoreError, - judge::{ - self, - builder::{JudgeBuilder, JudgeBuilderInput}, - result::JudgeResultInfo, - JudgeConfig, - }, - package::PackageType, -}; -use tokio::task::JoinHandle; -use utoipa::ToSchema; - -#[derive(utoipa::OpenApi)] -#[openapi(paths(run_judge), components(schemas(RunJudgeBody)))] -pub struct JudgeApiDoc; - -pub fn route(cfg: &mut web::ServiceConfig) { - cfg.service(web::scope("/judge").service(run_judge)); -} - -// TODO: Remove the first `_` when the segment is actually used -#[derive(Debug, ToSchema, Deserialize)] -pub struct RunJudgeBody { - code: String, - language: Language, -} - -#[utoipa::path( - context_path = "/api/v1/judge", - request_body(content = RunJudgeBody, content_type = "application/json", description = "The info a judge task should refer to"), - responses( - (status = 200, description = "Judge run successfully") - ) -)] -#[post("/{package_slug}")] -pub async fn run_judge( - path: web::Path, - body: web::Json, - problem_package_dir: web::Data, -) -> Result { - state::set_busy().map_err(|e| { - println!("Failed to set busy: {:?}", e); - ServiceError::InternalError(anyhow::anyhow!("Judge server is busy")) - })?; - - let package_slug = path.into_inner(); - log::debug!("receive body: {:?}", body); - - let uuid = uuid::Uuid::new_v4(); - let runtime_path = PathBuf::from("/tmp").join(uuid.to_string()); - let src_file_name = format!("src.{}", body.language.get_extension()); - log::debug!("runtime_path: {:?}", runtime_path); - fs::create_dir_all(runtime_path.clone()).map_err(|e| { - log::debug!("Failed to create runtime dir: {:?}", e); - ServiceError::InternalError(anyhow::anyhow!("Failed to create runtime dir")) - })?; - fs::write(runtime_path.clone().join(&src_file_name), body.code.clone()).map_err(|e| { - log::debug!("Failed to write src file: {:?}", e); - ServiceError::InternalError(anyhow::anyhow!("Failed to write src file")) - })?; - - let handle: JoinHandle, JudgeCoreError>> = - tokio::spawn(async move { - let new_builder_result = JudgeBuilder::new(JudgeBuilderInput { - package_type: PackageType::ICPC, - package_path: problem_package_dir.join(package_slug.clone()), - runtime_path: runtime_path.clone(), - src_language: body.language, - src_path: runtime_path.clone().join(&src_file_name), - }); - if new_builder_result.is_err() { - log::debug!( - "Failed to new builder result: {:?}", - new_builder_result.err() - ); - return Ok(vec![]); - } - let builder = new_builder_result?; - log::debug!("Builder created: {:?}", builder); - let mut results: Vec = vec![]; - for idx in 0..builder.testdata_configs.len() { - let judge_config = JudgeConfig { - test_data: builder.testdata_configs[idx].clone(), - program: builder.program_config.clone(), - checker: builder.checker_config.clone(), - runtime: builder.runtime_config.clone(), - }; - let result = judge::common::run_judge(&judge_config)?; - log::debug!("Judge result: {:?}", result); - results.push(result); - } - - state::set_idle(); - log::debug!("Judge finished"); - Ok(results) - }); - - match handle.await.unwrap() { - Ok(results) => Ok(HttpResponse::Ok().json(results)), - Err(e) => { - log::info!("Failed to await handle: {:?}", e); - Err(ServiceError::InternalError(anyhow::anyhow!("Judge failed"))) - } - } -} diff --git a/judger/src/service/error.rs b/judger/src/service/error.rs deleted file mode 100644 index 11925bd..0000000 --- a/judger/src/service/error.rs +++ /dev/null @@ -1,29 +0,0 @@ -use std::io; - -use judge_core::error::JudgeCoreError; - -#[derive(Debug)] -pub enum JudgeServiceError { - JudgeCoreError(JudgeCoreError), - IOError(io::Error), - RcloneError(anyhow::Error), - AnyhowError(anyhow::Error), -} - -impl From for JudgeServiceError { - fn from(error: JudgeCoreError) -> JudgeServiceError { - JudgeServiceError::JudgeCoreError(error) - } -} - -impl From for JudgeServiceError { - fn from(error: io::Error) -> JudgeServiceError { - JudgeServiceError::IOError(error) - } -} - -impl From for JudgeServiceError { - fn from(error: anyhow::Error) -> JudgeServiceError { - JudgeServiceError::AnyhowError(error) - } -} diff --git a/judger/src/service/mod.rs b/judger/src/service/mod.rs deleted file mode 100644 index 215fb2c..0000000 --- a/judger/src/service/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod error; -pub mod package_manager; diff --git a/judger/src/service/package_manager/description.rs b/judger/src/service/package_manager/description.rs deleted file mode 100644 index f7f762b..0000000 --- a/judger/src/service/package_manager/description.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::{ - collections::HashMap, - fs, - path::{Path, PathBuf}, -}; - -use judge_core::{error::JudgeCoreError, package::PackageType}; -use serde_derive::{Deserialize, Serialize}; - -use crate::service::error::JudgeServiceError; - -pub const PACKAGES_DESCRIPTION_FILE_NAME: &str = "judge-pd.json"; - -#[derive(Debug, Serialize, Deserialize)] -pub struct PackageDescription { - pub name: String, - pub revision: u32, - pub package_type: PackageType, -} - -impl PackageDescription { - pub fn new(name: String, package_type: PackageType) -> Result { - Ok(Self { - name, - revision: 0, - package_type, - }) - } -} - -pub struct StoragedPackageDescriptionMap { - pub folder_path: PathBuf, - pub package_description_map: HashMap, -} - -impl StoragedPackageDescriptionMap { - pub fn init(folder_path: PathBuf) -> Result { - init_package_description_file(&folder_path)?; - let package_description_map = HashMap::new(); - Ok(Self { - folder_path, - package_description_map, - }) - } - - pub fn load(folder_path: PathBuf) -> Result { - let package_description_map = load_package_description_map(&folder_path)?; - Ok(Self { - folder_path, - package_description_map, - }) - } - - pub fn insert( - &mut self, - package_description: PackageDescription, - ) -> Result<(), JudgeCoreError> { - self.package_description_map - .insert(package_description.name.clone(), package_description); - update_package_description_file(&self.folder_path, &self.package_description_map)?; - Ok(()) - } - - pub fn get(&self, package_name: &str) -> Option<&PackageDescription> { - self.package_description_map.get(package_name) - } -} - -fn load_package_description_map( - folder: &Path, -) -> Result, JudgeCoreError> { - let description_file_content = fs::read_to_string(folder.join(PACKAGES_DESCRIPTION_FILE_NAME))?; - let package_description_map: HashMap = - serde_json::from_str(&description_file_content)?; - Ok(package_description_map) -} - -fn init_package_description_file(folder: &Path) -> Result<(), JudgeCoreError> { - let package_description_map = HashMap::::new(); - let package_description_file_content = serde_json::to_string_pretty(&package_description_map)?; - - fs::write( - folder.join(PACKAGES_DESCRIPTION_FILE_NAME), - package_description_file_content, - )?; - Ok(()) -} - -fn update_package_description_file( - folder: &Path, - package_description_map: &HashMap, -) -> Result<(), JudgeCoreError> { - let package_description_file_content = serde_json::to_string_pretty(package_description_map)?; - - fs::write( - folder.join(PACKAGES_DESCRIPTION_FILE_NAME), - package_description_file_content, - )?; - Ok(()) -} diff --git a/judger/src/service/package_manager/mod.rs b/judger/src/service/package_manager/mod.rs deleted file mode 100644 index 6440d6a..0000000 --- a/judger/src/service/package_manager/mod.rs +++ /dev/null @@ -1,78 +0,0 @@ -pub mod description; -pub mod package; - -use std::path::PathBuf; - -use judge_core::package::PackageType; - -use crate::service::error::JudgeServiceError; - -use self::description::StoragedPackageDescriptionMap; - -pub struct PackageManager { - pub folder_path: PathBuf, - pub package_description_map: StoragedPackageDescriptionMap, -} - -impl PackageManager { - pub fn new(folder_path: PathBuf) -> Result { - if folder_path.exists() && folder_path.is_file() { - return Err(JudgeServiceError::AnyhowError(anyhow::anyhow!( - "Package folder '{}' appears to be a file.", - folder_path.display() - ))); - } - - if !folder_path.exists() { - std::fs::create_dir_all(&folder_path)?; - } - - let description_file_path = folder_path.join(description::PACKAGES_DESCRIPTION_FILE_NAME); - if description_file_path.exists() && description_file_path.is_dir() { - return Err(JudgeServiceError::AnyhowError(anyhow::anyhow!( - "Description file '{}' appears to be a folder.", - folder_path.display() - ))); - } - let package_description_map = if !description_file_path.exists() { - StoragedPackageDescriptionMap::init(folder_path.clone())? - } else { - StoragedPackageDescriptionMap::load(folder_path.clone())? - }; - - Ok(Self { - folder_path, - package_description_map, - }) - } - - pub fn import_package( - &mut self, - package_name: String, - package_type: PackageType, - ) -> Result<(), JudgeServiceError> { - let package_description = self.package_description_map.get(&package_name); - if package_description.is_some() { - return Err(JudgeServiceError::AnyhowError(anyhow::anyhow!( - "Package '{}' already exists.", - package_name - ))); - } - - if package_type - .get_package_agent(self.folder_path.join(&package_name))? - .validate() - { - let package_description = - description::PackageDescription::new(package_name, package_type)?; - self.package_description_map.insert(package_description)?; - } else { - return Err(JudgeServiceError::AnyhowError(anyhow::anyhow!( - "Package '{}' is not valid.", - package_name - ))); - } - - Ok(()) - } -} diff --git a/judger/src/service/package_manager/package.rs b/judger/src/service/package_manager/package.rs deleted file mode 100644 index 3fed7f1..0000000 --- a/judger/src/service/package_manager/package.rs +++ /dev/null @@ -1,51 +0,0 @@ -use crate::service::error::JudgeServiceError; -use anyhow; -use std::path::Path; -use std::process::Command; - -fn check_rclone(config_path: &Path) -> Result<(), JudgeServiceError> { - let mut cmd = Command::new("rclone"); - let status = cmd - .arg("--config") - .arg(config_path) - .arg("ls") - .arg("minio:") - .status() - .expect("Failed to rclone"); - if status.success() { - Ok(()) - } else { - Err(JudgeServiceError::RcloneError(anyhow::anyhow!( - "rclone failed, please check config." - ))) - } -} - -pub const PACKAGE_SAVE_DIRNAME: &str = "rclone-problem-package"; -pub const RCLONE_CONFIG_FILE: &str = "rclone-minio.conf"; - -pub fn sync_package(data_dir: &Path, bucket_name: &str) -> Result<(), JudgeServiceError> { - let config_path = data_dir.join(RCLONE_CONFIG_FILE); - let package_path = data_dir.join(PACKAGE_SAVE_DIRNAME); - let check_res = check_rclone(&config_path); - if check_res.as_ref().is_err() { - return check_res; - } - println!("{:?}", data_dir); - let mut cmd = Command::new("rclone"); - let status = cmd - .arg("--config") - .arg(format!("{}", config_path.to_string_lossy())) - .arg("sync") - .arg(format!("minio:{}", bucket_name)) - .arg(format!("{}", package_path.to_string_lossy())) - .status() - .expect("Failed to rclone"); - if status.success() { - Ok(()) - } else { - Err(JudgeServiceError::RcloneError(anyhow::anyhow!( - "rclone sync failed, please check config." - ))) - } -} diff --git a/judger/src/worker/mod.rs b/judger/src/worker/mod.rs new file mode 100644 index 0000000..2e987e6 --- /dev/null +++ b/judger/src/worker/mod.rs @@ -0,0 +1,135 @@ +use crate::agent::platform::{JudgeTask, PlatformClient}; +use crate::agent::rclone::RcloneClient; +use crate::handler::state; +use anyhow::Error; +use judge_core::judge; +use judge_core::{ + judge::builder::{JudgeBuilder, JudgeBuilderInput}, + judge::result::JudgeResultInfo, + judge::JudgeConfig, + package::PackageType, +}; +use std::time::Duration; +use std::{fs, path::PathBuf}; +use tokio::time::interval; + +pub struct JudgeWorker { + platform_client: PlatformClient, + interval_sec: u64, + rclone_client: RcloneClient, + package_bucket: String, + package_dir: PathBuf, +} + +impl JudgeWorker { + pub fn new( + platform_uri: String, + interval_sec: u64, + rclone_config: PathBuf, + package_bucket: String, + package_dir: PathBuf, + ) -> Result, Error> { + let platform_client = PlatformClient::new(platform_uri); + let rclone_client = RcloneClient::new(rclone_config); + if !rclone_client.is_avaliable() { + Err(anyhow::anyhow!("Rclone is not avaliable"))?; + } + Ok(Some(Self { + platform_client, + rclone_client, + interval_sec, + package_bucket, + package_dir, + })) + } + + pub async fn run(&self) { + let _ = self.rclone_client + .sync_bucket(&self.package_bucket, &self.package_dir) + .map_err(|e| log::debug!("Failed to sync bucket: {:?}", e)); + + let mut interval = interval(Duration::from_secs(self.interval_sec)); + loop { + interval.tick().await; + match self.platform_client.pick_task().await { + Ok(task) => { + log::info!("Received task: {:?}", task); + match self.run_judge(&task) { + Ok(results) => { + let report_response = self + .platform_client + .report_task(&task.redis_stream_id.clone(), results) + .await; + if report_response.is_err() { + log::debug!( + "Report failed with error: {:?}", + report_response.err() + ); + return; + } + log::info!( + "Submission {:?} report success", + task.submission_uid.clone() + ); + } + Err(e) => log::info!("Error judge task: {:?}", e), + } + } + Err(e) => log::debug!("Error sending request: {:?}", e), + } + } + } + + fn run_judge(&self, task: &JudgeTask) -> Result, anyhow::Error> { + self.rclone_client + .sync_bucket(&self.package_bucket, &self.package_dir)?; + state::set_busy()?; + let problem_package_dir = self.package_dir.join(&task.problem_slug); + + let uuid = uuid::Uuid::new_v4(); + let runtime_path = PathBuf::from("/tmp").join(uuid.to_string()); + let src_file_name = format!("src.{}", task.language.get_extension()); + log::debug!("runtime_path: {:?}", runtime_path); + fs::create_dir_all(runtime_path.clone()).map_err(|e| { + log::debug!("Failed to create runtime dir: {:?}", e); + anyhow::anyhow!("Failed to create runtime dir") + })?; + fs::write(runtime_path.clone().join(&src_file_name), task.code.clone()).map_err(|e| { + log::debug!("Failed to write src file: {:?}", e); + anyhow::anyhow!("Failed to write src file") + })?; + + let new_builder_result = JudgeBuilder::new(JudgeBuilderInput { + package_type: PackageType::ICPC, + package_path: problem_package_dir, + runtime_path: runtime_path.clone(), + src_language: task.language, + src_path: runtime_path.clone().join(&src_file_name), + }) + .map_err(|e| { + state::set_idle(); + anyhow::anyhow!("Failed to new builder result: {:?}", e) + }); + let builder = new_builder_result?; + log::debug!("Builder created: {:?}", builder); + let mut results: Vec = vec![]; + for idx in 0..builder.testdata_configs.len() { + let judge_config = JudgeConfig { + test_data: builder.testdata_configs[idx].clone(), + program: builder.program_config.clone(), + checker: builder.checker_config.clone(), + runtime: builder.runtime_config.clone(), + }; + let result = judge::common::run_judge(&judge_config).map_err(|e| { + state::set_idle(); + anyhow::anyhow!("Failed to run judge: {:?}", e) + })?; + log::debug!("Judge result: {:?}", result); + results.push(result); + } + + log::debug!("Judge finished"); + state::set_idle(); + Ok(results) + } +} diff --git a/judger/tests/package_test.rs b/judger/tests/package_test.rs deleted file mode 100644 index 3c8c72b..0000000 --- a/judger/tests/package_test.rs +++ /dev/null @@ -1,48 +0,0 @@ -use std::path::PathBuf; - -use judge_core::package::PackageType; -use judger::service::package_manager::description::{ - PackageDescription, StoragedPackageDescriptionMap, -}; - -const TEST_TEMP_PATH: &str = "tests/temp"; - -#[test] -fn test_storaged_package_description_map() { - let folder = PathBuf::from(TEST_TEMP_PATH); - let mut package_description_map = StoragedPackageDescriptionMap::init(folder.clone()).unwrap(); - - let package_description = PackageDescription { - name: "test".to_string(), - revision: 1, - package_type: PackageType::ICPC, - }; - package_description_map.insert(package_description).unwrap(); - - let package_description_map = StoragedPackageDescriptionMap::load(folder).unwrap(); - assert_eq!(package_description_map.package_description_map.len(), 1); - assert_eq!( - package_description_map - .package_description_map - .get("test") - .unwrap() - .name, - "test" - ); - assert_eq!( - package_description_map - .package_description_map - .get("test") - .unwrap() - .revision, - 1 - ); - assert_eq!( - package_description_map - .package_description_map - .get("test") - .unwrap() - .package_type, - PackageType::ICPC - ); -} diff --git a/judger/tests/temp/judge-pd.json b/judger/tests/temp/judge-pd.json deleted file mode 100644 index 8ac8c98..0000000 --- a/judger/tests/temp/judge-pd.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "test": { - "name": "test", - "revision": 1, - "package_type": "ICPC" - } -} \ No newline at end of file