From e23c9eeb5ba7f7c8b003b002812668f3b0f02ab5 Mon Sep 17 00:00:00 2001 From: slhmy Date: Sat, 23 Mar 2024 16:24:42 +0800 Subject: [PATCH] Refactor judger code --- .../server/environment => }/.env.development | 0 judger/Cargo.toml | 10 +- .../client/httpclient.rs => agent/http.rs} | 4 +- judger/src/agent/mod.rs | 2 + judger/src/agent/rclone.rs | 41 ++++ judger/src/cli/main.rs | 108 --------- judger/src/{server => }/environment/mod.rs | 3 + judger/src/{server => }/error.rs | 2 - .../src/{server/service => handler}/greet.rs | 0 .../src/{server/service => handler}/judge.rs | 2 +- judger/src/{server/service => handler}/mod.rs | 0 .../src/{server/service => handler}/state.rs | 0 judger/src/lib.rs | 1 - judger/src/{server => }/main.rs | 33 ++- judger/src/server/client/mod.rs | 175 --------------- judger/src/service/error.rs | 29 --- judger/src/service/mod.rs | 2 - .../service/package_manager/description.rs | 100 --------- judger/src/service/package_manager/mod.rs | 78 ------- judger/src/service/package_manager/package.rs | 51 ----- judger/src/worker/mod.rs | 206 ++++++++++++++++++ judger/tests/package_test.rs | 48 ---- judger/tests/temp/judge-pd.json | 7 - 23 files changed, 283 insertions(+), 619 deletions(-) rename judger/{src/server/environment => }/.env.development (100%) rename judger/src/{server/client/httpclient.rs => agent/http.rs} (72%) create mode 100644 judger/src/agent/mod.rs create mode 100644 judger/src/agent/rclone.rs delete mode 100644 judger/src/cli/main.rs rename judger/src/{server => }/environment/mod.rs (92%) rename judger/src/{server => }/error.rs (97%) rename judger/src/{server/service => handler}/greet.rs (100%) rename judger/src/{server/service => handler}/judge.rs (98%) rename judger/src/{server/service => handler}/mod.rs (100%) rename judger/src/{server/service => handler}/state.rs (100%) delete mode 100644 judger/src/lib.rs rename judger/src/{server => }/main.rs (60%) delete mode 100644 judger/src/server/client/mod.rs delete mode 100644 judger/src/service/error.rs delete mode 100644 judger/src/service/mod.rs delete mode 100644 judger/src/service/package_manager/description.rs delete mode 100644 judger/src/service/package_manager/mod.rs delete mode 100644 judger/src/service/package_manager/package.rs create mode 100644 judger/src/worker/mod.rs delete mode 100644 judger/tests/package_test.rs delete mode 100644 judger/tests/temp/judge-pd.json diff --git a/judger/src/server/environment/.env.development b/judger/.env.development similarity index 100% rename from judger/src/server/environment/.env.development rename to judger/.env.development diff --git a/judger/Cargo.toml b/judger/Cargo.toml index 009119d..e0e3e4e 100644 --- a/judger/Cargo.toml +++ b/judger/Cargo.toml @@ -43,12 +43,4 @@ chrono = { version = "0.4", features = ["serde"] } anyhow = "1" thiserror = "1" -uuid = { version = "1.4", features = ["serde", "v4"] } - -[[bin]] -name ="judger-cli" -path ="src/cli/main.rs" - -[[bin]] -name ="judger-server" -path ="src/server/main.rs" +uuid = { version = "1.4", features = ["serde", "v4"] } \ No newline at end of file diff --git a/judger/src/server/client/httpclient.rs b/judger/src/agent/http.rs similarity index 72% rename from judger/src/server/client/httpclient.rs rename to judger/src/agent/http.rs index f1fe643..0b53dcd 100644 --- a/judger/src/server/client/httpclient.rs +++ b/judger/src/agent/http.rs @@ -8,9 +8,7 @@ impl HttpClient { let client = reqwest::Client::new(); Self { client, base_url } } - pub fn _get(&self, path: String) -> reqwest::RequestBuilder { - self.client.get(format!("{}{}", self.base_url, path)) - } + pub fn post(&self, path: String) -> reqwest::RequestBuilder { self.client.post(format!("{}{}", self.base_url, path)) } diff --git a/judger/src/agent/mod.rs b/judger/src/agent/mod.rs new file mode 100644 index 0000000..4530e66 --- /dev/null +++ b/judger/src/agent/mod.rs @@ -0,0 +1,2 @@ +pub mod http; +pub mod rclone; diff --git a/judger/src/agent/rclone.rs b/judger/src/agent/rclone.rs new file mode 100644 index 0000000..118d1ec --- /dev/null +++ b/judger/src/agent/rclone.rs @@ -0,0 +1,41 @@ +use anyhow::{self, Error}; +use std::path::PathBuf; +use std::process::Command; + +pub struct RcloneClient { + config_path: PathBuf, +} + +impl RcloneClient { + pub fn new(config_path: PathBuf) -> Self { + Self { config_path } + } + + pub fn is_avaliable(&self) -> bool { + let status = Command::new("rclone") + .arg("--config") + .arg(format!("{}", self.config_path.to_string_lossy())) + .arg("ls") + .arg("minio:") + .status() + .expect("Failed to rclone"); + + status.success() + } + + pub fn sync_bucket(&self, bucket_name: &str, target_dir: PathBuf) -> Result<(), Error> { + let status = Command::new("rclone") + .arg("--config") + .arg(format!("{}", self.config_path.to_string_lossy())) + .arg("sync") + .arg(format!("minio:{}", bucket_name)) + .arg(format!("{}", target_dir.to_string_lossy())) + .status() + .expect("Failed to rclone"); + if status.success() { + Ok(()) + } else { + Err(anyhow::anyhow!("rclone sync failed, please check config.")) + } + } +} diff --git a/judger/src/cli/main.rs b/judger/src/cli/main.rs deleted file mode 100644 index d020e70..0000000 --- a/judger/src/cli/main.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::path::PathBuf; - -use clap::{Parser, Subcommand}; -use judge_core::{ - compiler::{Compiler, Language}, - judge::builder::{JudgeBuilder, JudgeBuilderInput}, - judge::{common::run_judge, JudgeConfig}, - package::PackageType, -}; - -#[derive(Parser)] -#[command(author, version, about, long_about = None)] -struct Cli { - #[command(subcommand)] - command: Option, -} - -#[derive(Subcommand)] -enum Commands { - /// Compile a single file source code into an executable - Compile { - /// Path of the src file - #[arg(short, long)] - source: String, - /// Path to place the compiled executable - #[arg(short, long)] - target: String, - #[arg(short, long)] - /// Supported are: rust | cpp | python - language: Language, - }, - /// Run a batch of judges with specified problem package and src input - BatchJudge { - /// Path of the testing src file - #[arg(short, long)] - source: PathBuf, - /// Supported are: rust | cpp | python - #[arg(short = 'l', long)] - source_language: Language, - /// Path of the problem package to run the judge - #[arg(short, long)] - package: PathBuf, - /// Supported are: icpc - #[arg(short = 't', long)] - package_type: PackageType, - /// Path to run and store the runtime files - #[arg(short, long, default_value = "/tmp")] - runtime_path: PathBuf, - }, -} - -fn main() { - // TODO: use some flags to control weather log is printed - // let _ = env_logger::builder() - // .filter_level(log::LevelFilter::Debug) - // .try_init(); - let cli = Cli::parse(); - match cli.command { - Some(Commands::Compile { - source, - target, - language, - }) => { - let compiler = Compiler::new(language, vec!["-std=c++17".to_string()]); - let output = compiler.compile(&PathBuf::from(source), &PathBuf::from(target)); - println!("{:?}", output) - } - Some(Commands::BatchJudge { - source, - source_language, - package, - package_type, - runtime_path, - }) => { - let new_builder_result = JudgeBuilder::new(JudgeBuilderInput { - package_type, - package_path: package, - runtime_path, - src_language: source_language, - src_path: source, - }); - if new_builder_result.is_err() { - println!( - "Failed to new builder result: {:?}", - new_builder_result.err() - ); - return; - } - let builder = new_builder_result.unwrap(); - println!("Builder created: {:?}", builder); - for idx in 0..builder.testdata_configs.len() { - let judge_config = JudgeConfig { - test_data: builder.testdata_configs[idx].clone(), - program: builder.program_config.clone(), - checker: builder.checker_config.clone(), - runtime: builder.runtime_config.clone(), - }; - let result = run_judge(&judge_config); - println!("Judge result: {:?}", result); - } - - println!("BatchJudge finished") - } - None => { - println!("Please specify a COMMAND, use --help to see more") - } - } -} diff --git a/judger/src/server/environment/mod.rs b/judger/src/environment/mod.rs similarity index 92% rename from judger/src/server/environment/mod.rs rename to judger/src/environment/mod.rs index acdcc4c..dca0029 100644 --- a/judger/src/server/environment/mod.rs +++ b/judger/src/environment/mod.rs @@ -15,6 +15,9 @@ pub struct JudgeServerOpt { #[structopt(long, default_value = "data/dev-problem-package")] pub problem_package_dir: PathBuf, + #[structopt(long, default_value = "data/rclone.conf")] + pub rclone_config: PathBuf, + #[structopt(env = "BASE_URL", default_value = "http://localhost:8080/api/v1/judge")] pub base_url: String, diff --git a/judger/src/server/error.rs b/judger/src/error.rs similarity index 97% rename from judger/src/server/error.rs rename to judger/src/error.rs index e9de4bc..7da2114 100644 --- a/judger/src/server/error.rs +++ b/judger/src/error.rs @@ -2,7 +2,6 @@ use actix_web::{HttpResponse, ResponseError}; use judge_core::error::JudgeCoreError; -use judger::service::error::JudgeServiceError; #[derive(Debug, thiserror::Error)] pub enum ServiceError { @@ -23,7 +22,6 @@ pub enum ServiceError { #[derive(Debug)] pub enum ClientError { InternalError(anyhow::Error), - PackageError(JudgeServiceError), } #[derive(Serialize)] diff --git a/judger/src/server/service/greet.rs b/judger/src/handler/greet.rs similarity index 100% rename from judger/src/server/service/greet.rs rename to judger/src/handler/greet.rs diff --git a/judger/src/server/service/judge.rs b/judger/src/handler/judge.rs similarity index 98% rename from judger/src/server/service/judge.rs rename to judger/src/handler/judge.rs index 4b576c1..acfc9d8 100644 --- a/judger/src/server/service/judge.rs +++ b/judger/src/handler/judge.rs @@ -1,6 +1,6 @@ use std::{fs, path::PathBuf}; -use crate::{error::ServiceError, service::state}; +use crate::{error::ServiceError, handler::state}; use actix_web::{post, web, HttpResponse}; use judge_core::{ diff --git a/judger/src/server/service/mod.rs b/judger/src/handler/mod.rs similarity index 100% rename from judger/src/server/service/mod.rs rename to judger/src/handler/mod.rs diff --git a/judger/src/server/service/state.rs b/judger/src/handler/state.rs similarity index 100% rename from judger/src/server/service/state.rs rename to judger/src/handler/state.rs diff --git a/judger/src/lib.rs b/judger/src/lib.rs deleted file mode 100644 index 1f278a4..0000000 --- a/judger/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod service; diff --git a/judger/src/server/main.rs b/judger/src/main.rs similarity index 60% rename from judger/src/server/main.rs rename to judger/src/main.rs index 1588b65..36aa920 100644 --- a/judger/src/server/main.rs +++ b/judger/src/main.rs @@ -1,7 +1,8 @@ -mod client; +mod agent; mod environment; mod error; -mod service; +mod handler; +mod worker; #[macro_use] extern crate serde_derive; @@ -23,8 +24,30 @@ async fn main() -> std::io::Result<()> { // } // }); + let new_worker_result = worker::JudgeWorker::new( + opt.base_url, + opt.interval as u64, + opt.rclone_config, + opt.problem_package_dir.clone(), + ); + + let worker = match new_worker_result { + Ok(maybe_worker) => { + if let Some(worker) = maybe_worker { + worker + } else { + log::error!("Failed to create worker"); + return Ok(()); + } + } + Err(e) => { + log::error!("Failed to create worker: {:?}", e); + return Ok(()); + } + }; + tokio::spawn(async move { - client::run_client(opt.base_url, opt.interval as u64).await; + worker.run().await; }); let port = opt.port; @@ -33,11 +56,11 @@ async fn main() -> std::io::Result<()> { App::new() .wrap(actix_web::middleware::Logger::default()) .app_data(Data::new(opt.problem_package_dir.clone())) - .configure(service::route) + .configure(handler::route) .service( utoipa_swagger_ui::SwaggerUi::new("/swagger-ui/{_:.*}").urls(vec![( utoipa_swagger_ui::Url::new("api", "/api-docs/openapi.json"), - service::ApiDoc::openapi(), + handler::ApiDoc::openapi(), )]), ) }) diff --git a/judger/src/server/client/mod.rs b/judger/src/server/client/mod.rs deleted file mode 100644 index 2600431..0000000 --- a/judger/src/server/client/mod.rs +++ /dev/null @@ -1,175 +0,0 @@ -mod httpclient; -use crate::error::ClientError; -use crate::service::state; -use httpclient::HttpClient; -use judge_core::judge; -use judge_core::{ - compiler::Language, - judge::builder::{JudgeBuilder, JudgeBuilderInput}, - judge::result::JudgeResultInfo, - judge::JudgeConfig, - package::PackageType, -}; -use judger::service::package_manager::package; -use serde::{Deserialize, Serialize}; -use std::time::Duration; -use std::{fs, path::PathBuf}; -use tokio::time::interval; - -#[derive(Serialize)] -struct PickBody { - consumer: String, -} -#[derive(Deserialize, Debug)] -struct PickResponse { - task: JudgeTask, -} -#[derive(Serialize)] -struct ReportBody { - consumer: String, - stream_id: String, - verdict_json: String, -} -#[derive(Deserialize, Debug)] -struct ReportResponse { - message: String, -} -#[derive(Deserialize, Debug)] -struct JudgeTask { - #[serde(rename = "submissionUID")] - submission_uid: String, - #[serde(rename = "problemSlug")] - problem_slug: String, - code: String, - language: Language, - #[serde(rename = "redisStreamID")] - redis_stream_id: String, -} - -pub async fn run_client(base_url: String, interval_sec: u64) { - let mut interval = interval(Duration::from_secs(interval_sec)); - let client = HttpClient::new(base_url); - - loop { - interval.tick().await; - match pick_task(&client).await { - Ok(task) => { - let stream_id = task.redis_stream_id.clone(); - let submission_uid = task.submission_uid.clone(); - log::info!("Received task: {:?}", task); - match run_judge(task) { - Ok(results) => { - let report_response = report_task(&client, &stream_id, results).await; - if report_response.is_err() { - log::debug!("Report failed {:?}", report_response); - return; - } - log::info!("Submission {:?} report success", submission_uid); - } - Err(e) => log::info!("Error judge task: {:?}", e), - } - } - Err(e) => log::debug!("Error sending request: {:?}", e), - } - } -} - -async fn pick_task(client: &HttpClient) -> Result { - let pick_url = "/task/pick"; - let body = PickBody { - consumer: "".to_string(), - }; - let response = client.post(pick_url.to_string()).json(&body).send().await?; - - match response.status() { - reqwest::StatusCode::OK => Ok(response.json::().await?.task), - _ => Err(ClientError::InternalError(anyhow::anyhow!( - "Queue is empty" - ))), - } -} - -async fn report_task( - client: &HttpClient, - stream_id: &str, - results: Vec, -) -> Result<(), ClientError> { - let report_url = "/task/report"; - let body = ReportBody { - consumer: "".to_string(), - stream_id: stream_id.to_owned(), - verdict_json: serde_json::to_string(&results).unwrap(), - }; - let response = client - .post(report_url.to_string()) - .json(&body) - .send() - .await?; - - match response.status() { - reqwest::StatusCode::OK => { - log::debug!( - "Report message: {:?}", - response.json::().await?.message - ); - Ok(()) - } - _ => Err(ClientError::InternalError(anyhow::anyhow!("Report Failed"))), - } -} - -fn run_judge(task: JudgeTask) -> Result, ClientError> { - if let Err(sync_err) = package::sync_package(&PathBuf::from("data"), "oj-lab-problem-package") { - return Err(ClientError::PackageError(sync_err)); - }; - if let Err(set_err) = state::set_busy() { - return Err(ClientError::InternalError(set_err)); - } - let problem_package_dir = PathBuf::from(format!("data/{}", package::PACKAGE_SAVE_DIRNAME)); - let problem_slug = task.problem_slug; - let uuid = uuid::Uuid::new_v4(); - let runtime_path = PathBuf::from("/tmp").join(uuid.to_string()); - let src_file_name = format!("src.{}", task.language.get_extension()); - log::debug!("runtime_path: {:?}", runtime_path); - fs::create_dir_all(runtime_path.clone()).map_err(|e| { - log::debug!("Failed to create runtime dir: {:?}", e); - ClientError::InternalError(anyhow::anyhow!("Failed to create runtime dir")) - })?; - fs::write(runtime_path.clone().join(&src_file_name), task.code.clone()).map_err(|e| { - log::debug!("Failed to write src file: {:?}", e); - ClientError::InternalError(anyhow::anyhow!("Failed to write src file")) - })?; - - let new_builder_result = JudgeBuilder::new(JudgeBuilderInput { - package_type: PackageType::ICPC, - package_path: problem_package_dir.join(problem_slug.clone()), - runtime_path: runtime_path.clone(), - src_language: task.language, - src_path: runtime_path.clone().join(&src_file_name), - }); - if new_builder_result.is_err() { - state::set_idle(); - return Err(ClientError::InternalError(anyhow::anyhow!( - "Failed to new builder result: {:?}", - new_builder_result.err() - ))); - } - let builder = new_builder_result?; - log::debug!("Builder created: {:?}", builder); - let mut results: Vec = vec![]; - for idx in 0..builder.testdata_configs.len() { - let judge_config = JudgeConfig { - test_data: builder.testdata_configs[idx].clone(), - program: builder.program_config.clone(), - checker: builder.checker_config.clone(), - runtime: builder.runtime_config.clone(), - }; - let result = judge::common::run_judge(&judge_config)?; - log::debug!("Judge result: {:?}", result); - results.push(result); - } - - log::debug!("Judge finished"); - state::set_idle(); - Ok(results) -} diff --git a/judger/src/service/error.rs b/judger/src/service/error.rs deleted file mode 100644 index 11925bd..0000000 --- a/judger/src/service/error.rs +++ /dev/null @@ -1,29 +0,0 @@ -use std::io; - -use judge_core::error::JudgeCoreError; - -#[derive(Debug)] -pub enum JudgeServiceError { - JudgeCoreError(JudgeCoreError), - IOError(io::Error), - RcloneError(anyhow::Error), - AnyhowError(anyhow::Error), -} - -impl From for JudgeServiceError { - fn from(error: JudgeCoreError) -> JudgeServiceError { - JudgeServiceError::JudgeCoreError(error) - } -} - -impl From for JudgeServiceError { - fn from(error: io::Error) -> JudgeServiceError { - JudgeServiceError::IOError(error) - } -} - -impl From for JudgeServiceError { - fn from(error: anyhow::Error) -> JudgeServiceError { - JudgeServiceError::AnyhowError(error) - } -} diff --git a/judger/src/service/mod.rs b/judger/src/service/mod.rs deleted file mode 100644 index 215fb2c..0000000 --- a/judger/src/service/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod error; -pub mod package_manager; diff --git a/judger/src/service/package_manager/description.rs b/judger/src/service/package_manager/description.rs deleted file mode 100644 index f7f762b..0000000 --- a/judger/src/service/package_manager/description.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::{ - collections::HashMap, - fs, - path::{Path, PathBuf}, -}; - -use judge_core::{error::JudgeCoreError, package::PackageType}; -use serde_derive::{Deserialize, Serialize}; - -use crate::service::error::JudgeServiceError; - -pub const PACKAGES_DESCRIPTION_FILE_NAME: &str = "judge-pd.json"; - -#[derive(Debug, Serialize, Deserialize)] -pub struct PackageDescription { - pub name: String, - pub revision: u32, - pub package_type: PackageType, -} - -impl PackageDescription { - pub fn new(name: String, package_type: PackageType) -> Result { - Ok(Self { - name, - revision: 0, - package_type, - }) - } -} - -pub struct StoragedPackageDescriptionMap { - pub folder_path: PathBuf, - pub package_description_map: HashMap, -} - -impl StoragedPackageDescriptionMap { - pub fn init(folder_path: PathBuf) -> Result { - init_package_description_file(&folder_path)?; - let package_description_map = HashMap::new(); - Ok(Self { - folder_path, - package_description_map, - }) - } - - pub fn load(folder_path: PathBuf) -> Result { - let package_description_map = load_package_description_map(&folder_path)?; - Ok(Self { - folder_path, - package_description_map, - }) - } - - pub fn insert( - &mut self, - package_description: PackageDescription, - ) -> Result<(), JudgeCoreError> { - self.package_description_map - .insert(package_description.name.clone(), package_description); - update_package_description_file(&self.folder_path, &self.package_description_map)?; - Ok(()) - } - - pub fn get(&self, package_name: &str) -> Option<&PackageDescription> { - self.package_description_map.get(package_name) - } -} - -fn load_package_description_map( - folder: &Path, -) -> Result, JudgeCoreError> { - let description_file_content = fs::read_to_string(folder.join(PACKAGES_DESCRIPTION_FILE_NAME))?; - let package_description_map: HashMap = - serde_json::from_str(&description_file_content)?; - Ok(package_description_map) -} - -fn init_package_description_file(folder: &Path) -> Result<(), JudgeCoreError> { - let package_description_map = HashMap::::new(); - let package_description_file_content = serde_json::to_string_pretty(&package_description_map)?; - - fs::write( - folder.join(PACKAGES_DESCRIPTION_FILE_NAME), - package_description_file_content, - )?; - Ok(()) -} - -fn update_package_description_file( - folder: &Path, - package_description_map: &HashMap, -) -> Result<(), JudgeCoreError> { - let package_description_file_content = serde_json::to_string_pretty(package_description_map)?; - - fs::write( - folder.join(PACKAGES_DESCRIPTION_FILE_NAME), - package_description_file_content, - )?; - Ok(()) -} diff --git a/judger/src/service/package_manager/mod.rs b/judger/src/service/package_manager/mod.rs deleted file mode 100644 index 6440d6a..0000000 --- a/judger/src/service/package_manager/mod.rs +++ /dev/null @@ -1,78 +0,0 @@ -pub mod description; -pub mod package; - -use std::path::PathBuf; - -use judge_core::package::PackageType; - -use crate::service::error::JudgeServiceError; - -use self::description::StoragedPackageDescriptionMap; - -pub struct PackageManager { - pub folder_path: PathBuf, - pub package_description_map: StoragedPackageDescriptionMap, -} - -impl PackageManager { - pub fn new(folder_path: PathBuf) -> Result { - if folder_path.exists() && folder_path.is_file() { - return Err(JudgeServiceError::AnyhowError(anyhow::anyhow!( - "Package folder '{}' appears to be a file.", - folder_path.display() - ))); - } - - if !folder_path.exists() { - std::fs::create_dir_all(&folder_path)?; - } - - let description_file_path = folder_path.join(description::PACKAGES_DESCRIPTION_FILE_NAME); - if description_file_path.exists() && description_file_path.is_dir() { - return Err(JudgeServiceError::AnyhowError(anyhow::anyhow!( - "Description file '{}' appears to be a folder.", - folder_path.display() - ))); - } - let package_description_map = if !description_file_path.exists() { - StoragedPackageDescriptionMap::init(folder_path.clone())? - } else { - StoragedPackageDescriptionMap::load(folder_path.clone())? - }; - - Ok(Self { - folder_path, - package_description_map, - }) - } - - pub fn import_package( - &mut self, - package_name: String, - package_type: PackageType, - ) -> Result<(), JudgeServiceError> { - let package_description = self.package_description_map.get(&package_name); - if package_description.is_some() { - return Err(JudgeServiceError::AnyhowError(anyhow::anyhow!( - "Package '{}' already exists.", - package_name - ))); - } - - if package_type - .get_package_agent(self.folder_path.join(&package_name))? - .validate() - { - let package_description = - description::PackageDescription::new(package_name, package_type)?; - self.package_description_map.insert(package_description)?; - } else { - return Err(JudgeServiceError::AnyhowError(anyhow::anyhow!( - "Package '{}' is not valid.", - package_name - ))); - } - - Ok(()) - } -} diff --git a/judger/src/service/package_manager/package.rs b/judger/src/service/package_manager/package.rs deleted file mode 100644 index 3fed7f1..0000000 --- a/judger/src/service/package_manager/package.rs +++ /dev/null @@ -1,51 +0,0 @@ -use crate::service::error::JudgeServiceError; -use anyhow; -use std::path::Path; -use std::process::Command; - -fn check_rclone(config_path: &Path) -> Result<(), JudgeServiceError> { - let mut cmd = Command::new("rclone"); - let status = cmd - .arg("--config") - .arg(config_path) - .arg("ls") - .arg("minio:") - .status() - .expect("Failed to rclone"); - if status.success() { - Ok(()) - } else { - Err(JudgeServiceError::RcloneError(anyhow::anyhow!( - "rclone failed, please check config." - ))) - } -} - -pub const PACKAGE_SAVE_DIRNAME: &str = "rclone-problem-package"; -pub const RCLONE_CONFIG_FILE: &str = "rclone-minio.conf"; - -pub fn sync_package(data_dir: &Path, bucket_name: &str) -> Result<(), JudgeServiceError> { - let config_path = data_dir.join(RCLONE_CONFIG_FILE); - let package_path = data_dir.join(PACKAGE_SAVE_DIRNAME); - let check_res = check_rclone(&config_path); - if check_res.as_ref().is_err() { - return check_res; - } - println!("{:?}", data_dir); - let mut cmd = Command::new("rclone"); - let status = cmd - .arg("--config") - .arg(format!("{}", config_path.to_string_lossy())) - .arg("sync") - .arg(format!("minio:{}", bucket_name)) - .arg(format!("{}", package_path.to_string_lossy())) - .status() - .expect("Failed to rclone"); - if status.success() { - Ok(()) - } else { - Err(JudgeServiceError::RcloneError(anyhow::anyhow!( - "rclone sync failed, please check config." - ))) - } -} diff --git a/judger/src/worker/mod.rs b/judger/src/worker/mod.rs new file mode 100644 index 0000000..36d5508 --- /dev/null +++ b/judger/src/worker/mod.rs @@ -0,0 +1,206 @@ +use crate::agent::http::HttpClient; +use crate::agent::rclone::RcloneClient; +use crate::error::ClientError; +use crate::handler::state; +use anyhow::Error; +use judge_core::judge; +use judge_core::{ + compiler::Language, + judge::builder::{JudgeBuilder, JudgeBuilderInput}, + judge::result::JudgeResultInfo, + judge::JudgeConfig, + package::PackageType, +}; +use serde::{Deserialize, Serialize}; +use std::time::Duration; +use std::{fs, path::PathBuf}; +use tokio::time::interval; + +#[derive(Serialize)] +struct PickBody { + consumer: String, +} +#[derive(Deserialize, Debug)] +struct PickResponse { + task: JudgeTask, +} +#[derive(Serialize)] +struct ReportBody { + consumer: String, + stream_id: String, + verdict_json: String, +} +#[derive(Deserialize, Debug)] +struct ReportResponse { + message: String, +} +#[derive(Deserialize, Debug)] +struct JudgeTask { + #[serde(rename = "submissionUID")] + submission_uid: String, + #[serde(rename = "problemSlug")] + problem_slug: String, + code: String, + language: Language, + #[serde(rename = "redisStreamID")] + redis_stream_id: String, +} + +pub struct JudgeWorker { + platform_client: HttpClient, + interval_sec: u64, + rclone_client: RcloneClient, + package_dir: PathBuf, +} + +impl JudgeWorker { + pub fn new( + base_url: String, + interval_sec: u64, + rclone_config: PathBuf, + package_dir: PathBuf, + ) -> Result, Error> { + let platform_client = HttpClient::new(base_url); + let rclone_client = RcloneClient::new(rclone_config); + if rclone_client.is_avaliable() { + Err(anyhow::anyhow!("Rclone is not avaliable"))?; + } + Ok(Some(Self { + platform_client, + rclone_client, + interval_sec, + package_dir, + })) + } + + pub async fn run(&self) { + let mut interval = interval(Duration::from_secs(self.interval_sec)); + loop { + interval.tick().await; + match pick_task(&self.platform_client).await { + Ok(task) => { + let stream_id = task.redis_stream_id.clone(); + let submission_uid = task.submission_uid.clone(); + log::info!("Received task: {:?}", task); + match self.run_judge(task) { + Ok(results) => { + let report_response = + report_task(&self.platform_client, &stream_id, results).await; + if report_response.is_err() { + log::debug!("Report failed {:?}", report_response); + return; + } + log::info!("Submission {:?} report success", submission_uid); + } + Err(e) => log::info!("Error judge task: {:?}", e), + } + } + Err(e) => log::debug!("Error sending request: {:?}", e), + } + } + } + + fn run_judge(&self, task: JudgeTask) -> Result, ClientError> { + if let Err(sync_err) = self + .rclone_client + .sync_bucket("oj-lab-problem-package", PathBuf::from("data")) + { + return Err(ClientError::InternalError(sync_err)); + }; + if let Err(set_err) = state::set_busy() { + return Err(ClientError::InternalError(set_err)); + } + let problem_slug = task.problem_slug; + let problem_package_dir = self.package_dir.join(problem_slug.clone()); + + let uuid = uuid::Uuid::new_v4(); + let runtime_path = PathBuf::from("/tmp").join(uuid.to_string()); + let src_file_name = format!("src.{}", task.language.get_extension()); + log::debug!("runtime_path: {:?}", runtime_path); + fs::create_dir_all(runtime_path.clone()).map_err(|e| { + log::debug!("Failed to create runtime dir: {:?}", e); + ClientError::InternalError(anyhow::anyhow!("Failed to create runtime dir")) + })?; + fs::write(runtime_path.clone().join(&src_file_name), task.code.clone()).map_err(|e| { + log::debug!("Failed to write src file: {:?}", e); + ClientError::InternalError(anyhow::anyhow!("Failed to write src file")) + })?; + + let new_builder_result = JudgeBuilder::new(JudgeBuilderInput { + package_type: PackageType::ICPC, + package_path: problem_package_dir, + runtime_path: runtime_path.clone(), + src_language: task.language, + src_path: runtime_path.clone().join(&src_file_name), + }); + if new_builder_result.is_err() { + state::set_idle(); + return Err(ClientError::InternalError(anyhow::anyhow!( + "Failed to new builder result: {:?}", + new_builder_result.err() + ))); + } + let builder = new_builder_result?; + log::debug!("Builder created: {:?}", builder); + let mut results: Vec = vec![]; + for idx in 0..builder.testdata_configs.len() { + let judge_config = JudgeConfig { + test_data: builder.testdata_configs[idx].clone(), + program: builder.program_config.clone(), + checker: builder.checker_config.clone(), + runtime: builder.runtime_config.clone(), + }; + let result = judge::common::run_judge(&judge_config)?; + log::debug!("Judge result: {:?}", result); + results.push(result); + } + + log::debug!("Judge finished"); + state::set_idle(); + Ok(results) + } +} + +async fn pick_task(client: &HttpClient) -> Result { + let pick_url = "/task/pick"; + let body = PickBody { + consumer: "".to_string(), + }; + let response = client.post(pick_url.to_string()).json(&body).send().await?; + + match response.status() { + reqwest::StatusCode::OK => Ok(response.json::().await?.task), + _ => Err(ClientError::InternalError(anyhow::anyhow!( + "Queue is empty" + ))), + } +} + +async fn report_task( + client: &HttpClient, + stream_id: &str, + results: Vec, +) -> Result<(), ClientError> { + let report_url = "/task/report"; + let body = ReportBody { + consumer: "".to_string(), + stream_id: stream_id.to_owned(), + verdict_json: serde_json::to_string(&results).unwrap(), + }; + let response = client + .post(report_url.to_string()) + .json(&body) + .send() + .await?; + + match response.status() { + reqwest::StatusCode::OK => { + log::debug!( + "Report message: {:?}", + response.json::().await?.message + ); + Ok(()) + } + _ => Err(ClientError::InternalError(anyhow::anyhow!("Report Failed"))), + } +} diff --git a/judger/tests/package_test.rs b/judger/tests/package_test.rs deleted file mode 100644 index 3c8c72b..0000000 --- a/judger/tests/package_test.rs +++ /dev/null @@ -1,48 +0,0 @@ -use std::path::PathBuf; - -use judge_core::package::PackageType; -use judger::service::package_manager::description::{ - PackageDescription, StoragedPackageDescriptionMap, -}; - -const TEST_TEMP_PATH: &str = "tests/temp"; - -#[test] -fn test_storaged_package_description_map() { - let folder = PathBuf::from(TEST_TEMP_PATH); - let mut package_description_map = StoragedPackageDescriptionMap::init(folder.clone()).unwrap(); - - let package_description = PackageDescription { - name: "test".to_string(), - revision: 1, - package_type: PackageType::ICPC, - }; - package_description_map.insert(package_description).unwrap(); - - let package_description_map = StoragedPackageDescriptionMap::load(folder).unwrap(); - assert_eq!(package_description_map.package_description_map.len(), 1); - assert_eq!( - package_description_map - .package_description_map - .get("test") - .unwrap() - .name, - "test" - ); - assert_eq!( - package_description_map - .package_description_map - .get("test") - .unwrap() - .revision, - 1 - ); - assert_eq!( - package_description_map - .package_description_map - .get("test") - .unwrap() - .package_type, - PackageType::ICPC - ); -} diff --git a/judger/tests/temp/judge-pd.json b/judger/tests/temp/judge-pd.json deleted file mode 100644 index 8ac8c98..0000000 --- a/judger/tests/temp/judge-pd.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "test": { - "name": "test", - "revision": 1, - "package_type": "ICPC" - } -} \ No newline at end of file