diff --git a/judger/src/filesystem/adapter/error.rs b/judger/src/filesystem/adapter/error.rs index 2df5faa..d56395a 100644 --- a/judger/src/filesystem/adapter/error.rs +++ b/judger/src/filesystem/adapter/error.rs @@ -1,3 +1,10 @@ +/// Error occurred in the filesystem adapter. +/// +/// It's only used to manage the error in a centralized way. +/// +/// User shouldn't rely on this error to as value in another error, +/// and should always call [`Into::>::into`] +/// immediately after the error is returned. #[derive(thiserror::Error, Debug)] pub enum FuseError { #[error("not a readable file")] diff --git a/judger/src/filesystem/adapter/fuse.rs b/judger/src/filesystem/adapter/fuse.rs index 0dea778..cc9016a 100644 --- a/judger/src/filesystem/adapter/fuse.rs +++ b/judger/src/filesystem/adapter/fuse.rs @@ -5,9 +5,9 @@ use spin::Mutex; use tokio::io::{AsyncRead, AsyncSeek}; use tokio::sync::Mutex as AsyncMutex; -use crate::filesystem::entry::{Entry, TarTree, BLOCKSIZE}; +use crate::filesystem::entry::{Entry, BLOCKSIZE}; use crate::filesystem::resource::Resource; -use crate::filesystem::table::to_internal_path; +use crate::filesystem::table::{to_internal_path, AdjTable}; use super::{error::FuseError, handle::HandleTable, reply::*}; use fuse3::{ @@ -15,13 +15,16 @@ use fuse3::{ Result as FuseResult, *, }; +/// A asynchorized stream from vector type VecStream = tokio_stream::Iter>; + +// filesystem is an adapter, it should not contain any business logic. pub struct Filesystem where F: AsyncRead + AsyncSeek + Unpin + Send + 'static, { handle_table: HandleTable>>, - tree: Mutex>, + tree: Mutex>>, resource: Arc, } @@ -30,15 +33,16 @@ where F: AsyncRead + AsyncSeek + Unpin + Send + Sync + 'static, { /// Create a new filesystem - pub(super) fn new(tree: TarTree, fs_size: u64) -> Self { + pub(super) fn new(tree: AdjTable>, fs_size: u64) -> Self { Self { handle_table: HandleTable::new(), tree: Mutex::new(tree), resource: Arc::new(Resource::new(fs_size)), } } - /// Mount the filesystem to a path - pub async fn mount_with_path( + /// Mount the filesystem to a path, + /// return a raw handle from `libfuse` + pub async fn raw_mount_with_path( self, path: impl AsRef + Clone, ) -> std::io::Result { @@ -53,13 +57,13 @@ where .mount_with_unprivileged(self, path.as_ref()) .await } - /// Insert a file by path + /// Insert a file by path before actual mounts. pub fn insert_by_path(&self, path: impl AsRef, content: Vec) { let mut tree = self.tree.lock(); tree.insert_by_path( to_internal_path(path.as_ref()), || Entry::Directory, - Entry::new_file_with_content(content), + Entry::new_file_with_vec(content), ); } } diff --git a/judger/src/filesystem/adapter/handle.rs b/judger/src/filesystem/adapter/handle.rs index 2a0e8d6..124c2c1 100644 --- a/judger/src/filesystem/adapter/handle.rs +++ b/judger/src/filesystem/adapter/handle.rs @@ -5,6 +5,8 @@ use std::{ use spin::Mutex; +pub type FileHandle = u64; +/// Lookup table for file handles pub struct HandleTable { handle_generator: AtomicU64, table: Mutex>>, @@ -19,7 +21,7 @@ impl HandleTable { } } /// Add an entry to the table - pub fn add(&self, entry: E) -> u64 { + pub fn add(&self, entry: E) -> FileHandle { let handle = self .handle_generator .fetch_add(1, std::sync::atomic::Ordering::AcqRel); @@ -28,12 +30,12 @@ impl HandleTable { handle } /// Get an entry from the table - pub fn get(&self, handle: u64) -> Option> { + pub fn get(&self, handle: FileHandle) -> Option> { log::trace!("get handle: {}", handle); self.table.lock().get(&handle).cloned() } /// Remove an entry from the table - pub fn remove(&self, handle: u64) -> Option> { + pub fn remove(&self, handle: FileHandle) -> Option> { log::trace!("deallocate handle: {}", handle); self.table.lock().remove(&handle) } diff --git a/judger/src/filesystem/adapter/mod.rs b/judger/src/filesystem/adapter/mod.rs index c2d2e37..60a098d 100644 --- a/judger/src/filesystem/adapter/mod.rs +++ b/judger/src/filesystem/adapter/mod.rs @@ -24,7 +24,7 @@ mod test { log::info!("mounting test tarball in .temp ..."); let template = Template::new("plugins/rlua-54.lang").await.unwrap(); let filesystem = template.as_filesystem(1024 * 1024 * 1024); - let mut mount_handle = filesystem.mount_with_path("./.temp/5").await.unwrap(); + let mut mount_handle = filesystem.raw_mount_with_path("./.temp/5").await.unwrap(); let handle = &mut mount_handle; tokio::select! { diff --git a/judger/src/filesystem/adapter/template.rs b/judger/src/filesystem/adapter/template.rs index 28e7c54..4adb9d4 100644 --- a/judger/src/filesystem/adapter/template.rs +++ b/judger/src/filesystem/adapter/template.rs @@ -5,11 +5,14 @@ use tokio::{ io::{AsyncRead, AsyncSeek}, }; -use crate::filesystem::entry::TarTree; +use crate::filesystem::{ + entry::{Entry, TarTree}, + table::AdjTable, +}; use super::fuse::Filesystem; -pub struct Template(TarTree) +pub struct Template(AdjTable>) where F: AsyncRead + AsyncSeek + Unpin + Send + 'static; @@ -23,7 +26,7 @@ where } /// read a file by path pub async fn read_by_path(&self, path: impl AsRef) -> Option> { - self.0.read_by_path(path).await + self.read_by_path(path).await } } @@ -31,6 +34,6 @@ impl Template { /// Create a new template from a tar file pub async fn new(path: impl AsRef + Clone) -> std::io::Result { let tree = TarTree::new(path).await?; - Ok(Self(tree)) + Ok(Self(tree.0)) } } diff --git a/judger/src/filesystem/dev.md b/judger/src/filesystem/dev.md new file mode 100644 index 0000000..e981d9b --- /dev/null +++ b/judger/src/filesystem/dev.md @@ -0,0 +1,118 @@ +## Module Layout + +- `table.rs`: adjacency table + - Tree data structure on vector + - Inode allocation by `MIN_ID + index` + - Not `MT-Safe` +- `handle.rs`: Mount Handle + - NewType wrapper for dropping +- `adapter` module: adapter between internal data structure(tree-like) and `libfuse` + - `error.rs`: a centralized way to handle error + - `fuse.rs`: adaptor between internal data structure and `libfuse` + - `reply.rs`: collection of constructor for replay from `libfuse` + - `handle.rs`: file handle table + - `template.rs`: A NewType wrapper to force user explicitly clone(`as_filesystem`) filesystem +- `entry` module: collection of single file + - `tar.rs`: a NewType wrapper for `Tree` + - `ro.rs`: read only normal file(mapped from tar ball) + - `rw.rs`: read/write normal file(in memory) +- `resource.rs`: Resource counter, much like `semaphore` +- `mkdtemp.rs`: a safe wrapper around `libc::mkdtemp` + +## Prerequisite knowledge + +### Filesystem in Userspace + +> FUSE, or Filesystem in Userspace, is a software interface that allows non-privileged users to create their own file systems in Linux without modifying the kernel. It acts as a bridge between the kernel's virtual filesystem layer and user-space programs. + +Traditionally, we have to develop a dedicated kernel module for a filesystem. + +FUSE workaround this problem by providing connection, to set up a FUSE, program need to... + +1. acquire a FUSE connection(similar to unix socket). +2. poll the socket until a connection(similar to a new connection on tcp socket) +3. read that connection to acquire an OPCODE +4. follow OPCODE to parse the request + +example of OPCODE: `READ`, `LOOKUP`, `OPEN`, `RMDIR`... + +[list](https://github.com/libfuse/libfuse/blob/6476b1c3ccde2fc4e8755858c96debf55aa0574b/lib/fuse_lowlevel.c#L2619) of OPCODE + +In this project, we use `fuse3`, which is a wrapper over `libfuse-sys`. + +To get started, you can follow [main.rs](https://github.com/Sherlock-Holo/fuse3/blob/master/examples/src/memfs/main.rs) from `fuse3`'s example. + +#### `INODE` + +`INODE` is an id generate by filesystem, program providing that can set `INODE` to whatever you want, but make sure it's unique for same file(dictionary). + +You can get inode with `stat` +``` +❯ stat . + File: . + Size: 128 Blocks: 0 IO Block: 4096 directory +Device: 2ah/42d Inode: 13553287 Links: 1 +Access: (0775/drwxrwxr-x) Uid: ( 1000/ eason) Gid: ( 1000/ eason) +Access: 2024-04-28 19:44:43.208376257 +0800 +Modify: 2024-05-20 21:39:42.300855512 +0800 +Change: 2024-05-20 21:39:42.300855512 +0800 + Birth: 2024-04-28 19:44:43.208376257 +0800 +``` + +Note that zero `INODE` means unspecified(null in C's language). + +#### `File Handle` + +In the context of filesystem of libc, you might be familiar with `file descriptor`, `file descriptor` is a `uint64` generate secquetially by kernel(unique for each process). + +`File handle` is similar to `file descriptor`, but it sit between kernel and FUSE provider, generated by FUSE provider and unique for each FUSE connection. + +When a file open, FUSE provider generate a `uint64`, and file handle is pass as parameter for later operations. + +FUSE provider should implement a way to retrieve file's session by `file handle`. + +> file's session includes `bytes readed`, `open flag`... + +Note that zero `File Handle` means unspecified(null in C's language), generating a 0 `File Handle` is not allowed. + +#### OPCODE `READDIR` + +> Read directory. + +similar to what `ls -a` provide, list dictionary including `.` and `..` + +parameters: +1. parent `INODE`(could be unspecified, unspecified is root) +2. offset +3. size + +return: list of file(dictionary) + +example(ignore the fact that many file is missing): + +``` +❯ ls / +❯ ls -a / +. .. boot etc lib lib64 media +``` + +| offset | size | output | +| ---- | ---- | --- | +| 0 | 1 | `.` | +| 2 | 3 | `etc`, `lib`, `lib64` | + +#### OPCODE `OPEN` + +> Open a file. Open flags (with the exception of O_CREAT, O_EXCL, O_NOCTTY and O_TRUNC) are available in flags + +parameters: +1. file `INODE`(could be unspecified, unspecified is root) +2. flag + +return: File Handle + +`O_CREAT` should be handle by kernel instead. + +### mkdtemp + +> See `man mkdtemp` diff --git a/judger/src/filesystem/entry/mod.rs b/judger/src/filesystem/entry/mod.rs index 055f979..d107e1e 100644 --- a/judger/src/filesystem/entry/mod.rs +++ b/judger/src/filesystem/entry/mod.rs @@ -5,10 +5,10 @@ mod tar; use self::{ro::TarBlock, rw::MemBlock}; use bytes::Bytes; use fuse3::FileType; -use std::{ffi::OsString, ops::Deref, sync::Arc}; +use std::{ffi::OsString, sync::Arc}; use tokio::{ io::{AsyncRead, AsyncSeek}, - sync::{Mutex, OwnedMutexGuard}, + sync::Mutex, }; use super::resource::Resource; @@ -25,12 +25,6 @@ pub trait FuseWriteTrait { async fn write(&mut self, offset: u64, data: &[u8]) -> std::io::Result; } -async fn clone_arc(arc: &Arc>) -> Arc> { - let inner = arc.deref(); - let lock = inner.lock().await; - Arc::new(Mutex::new(lock.deref().clone())) -} - /// Entry in the filesystem /// /// cloning the entry would clone file state @@ -66,13 +60,16 @@ impl Entry where F: AsyncRead + AsyncSeek + Unpin + Send + 'static, { + /// create a new file entry with empty content pub fn new_file() -> Self { Self::MemFile(MemBlock::default()) } - pub fn new_file_with_content(content: Vec) -> Self { + /// create a new file entry with content + pub fn new_file_with_vec(content: Vec) -> Self { Self::MemFile(MemBlock::new(content)) } - pub fn kind(&self) -> FileType { + /// get kind of the file + pub(super) fn kind(&self) -> FileType { match self { Self::SymLink(_) => FileType::Symlink, Self::HardLink(_) => FileType::RegularFile, @@ -81,6 +78,7 @@ where Self::MemFile(_) => FileType::RegularFile, } } + /// get size of the file pub fn get_size(&self) -> u64 { match self { Self::SymLink(x) => x.len() as u64, diff --git a/judger/src/filesystem/entry/ro.rs b/judger/src/filesystem/entry/ro.rs index 7ad570a..541ffe4 100644 --- a/judger/src/filesystem/entry/ro.rs +++ b/judger/src/filesystem/entry/ro.rs @@ -86,7 +86,7 @@ where { async fn read(&mut self, offset: u64, size: u32) -> std::io::Result { let size = size.min(self.size - self.cursor) as usize; - let size=size.min(BLOCKSIZE*MAX_READ_BLK); + let size = size.min(BLOCKSIZE * MAX_READ_BLK); let mut lock = self.file.lock().await; let seek_from = self.get_seek_from(offset).ok_or(io::Error::new( diff --git a/judger/src/filesystem/entry/tar.rs b/judger/src/filesystem/entry/tar.rs index a8abcab..d7bcc2e 100644 --- a/judger/src/filesystem/entry/tar.rs +++ b/judger/src/filesystem/entry/tar.rs @@ -1,11 +1,4 @@ -use std::{ - ffi::OsString, - io::Read, - ops::{Deref, DerefMut}, - os::unix::ffi::OsStringExt, - path::Path, - sync::Arc, -}; +use std::{ffi::OsString, io::Read, os::unix::ffi::OsStringExt, path::Path, sync::Arc}; #[cfg(test)] use std::io::Cursor; @@ -22,7 +15,7 @@ use crate::filesystem::table::{to_internal_path, AdjTable}; use super::{ro::TarBlock, Entry}; -pub struct TarTree(AdjTable>) +pub struct TarTree(pub AdjTable>) where F: AsyncRead + AsyncSeek + Unpin + Send + 'static; @@ -35,26 +28,6 @@ where } } -impl DerefMut for TarTree -where - F: AsyncRead + AsyncSeek + Unpin + Send + 'static, -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl Deref for TarTree -where - F: AsyncRead + AsyncSeek + Unpin + Send + 'static, -{ - type Target = AdjTable>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - impl Default for TarTree where F: AsyncRead + AsyncSeek + Unpin + Send + 'static, @@ -142,6 +115,7 @@ mod test { macro_rules! assert_kind { ($tree:expr,$path:expr, $kind:ident) => {{ let node = $tree + .0 .get_by_path(to_internal_path(Path::new($path))) .unwrap(); let entry = node; diff --git a/judger/src/filesystem/handle.rs b/judger/src/filesystem/handle.rs index 227a638..ca048cd 100644 --- a/judger/src/filesystem/handle.rs +++ b/judger/src/filesystem/handle.rs @@ -29,7 +29,7 @@ where { pub async fn mount(self) -> std::io::Result { let mountpoint = MkdTemp::new(); - let handle = self.mount_with_path(mountpoint.get_path()).await?; + let handle = self.raw_mount_with_path(mountpoint.get_path()).await?; Ok(MountHandle(Some(handle), Some(mountpoint))) } } diff --git a/judger/src/filesystem/resource.rs b/judger/src/filesystem/resource.rs index e86f618..4d7a0d7 100644 --- a/judger/src/filesystem/resource.rs +++ b/judger/src/filesystem/resource.rs @@ -1,8 +1,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; - /// A resource counter /// -/// unlike `Semaphore`, the resource is not reusable +/// unlike [`tokio::sync::Semaphore`], the resource is not reusable pub struct Resource(AtomicU64); impl Resource { diff --git a/judger/src/filesystem/table.rs b/judger/src/filesystem/table.rs index 5c43f3c..6be08c4 100644 --- a/judger/src/filesystem/table.rs +++ b/judger/src/filesystem/table.rs @@ -29,6 +29,8 @@ struct Node { /// the ability to allocate id up to [`MAX_ID_CAPACITY`] /// /// The table has ability to store a multiple disconnected tree +/// +/// Note that cloning the table would actually clone the WHOLE tree #[derive(Clone)] pub struct AdjTable { by_id: Vec>, diff --git a/judger/src/language/builder.rs b/judger/src/language/builder.rs index e727a1d..d53f3e5 100644 --- a/judger/src/language/builder.rs +++ b/judger/src/language/builder.rs @@ -1,4 +1,6 @@ -use grpc::judger::{JudgeResponse, JudgerCode}; +use grpc::judger::{ + exec_result as execute_response, ExecResult as ExecuteResponse, JudgeResponse, JudgerCode, Log, +}; use super::stage::{AssertionMode, StatusCode}; @@ -36,10 +38,40 @@ impl From for JudgeResponse { } pub struct ExecuteResult { + pub status: StatusCode, pub time: u64, pub memory: u64, pub output: Vec, - pub code: i32, +} + +impl From for ExecuteResponse { + fn from(value: ExecuteResult) -> Self { + macro_rules! execute_log { + ($msg:expr) => { + execute_response::Result::Log(Log { + level: 4, + msg: $msg.to_string(), + }) + }; + } + let result = match value.status { + StatusCode::Accepted => execute_response::Result::Output(value.output), + StatusCode::WrongAnswer => execute_log!("Wrong Answer"), + StatusCode::RuntimeError => { + execute_log!("Runtime Error, maybe program return non-zero code") + } + StatusCode::TimeLimitExceeded | StatusCode::RealTimeLimitExceeded => { + execute_log!("Time Limit Exceeded") + } + StatusCode::MemoryLimitExceeded => execute_log!("Memory Limit Exceeded"), + StatusCode::OutputLimitExceeded => execute_log!("Output Limit Exceeded"), + StatusCode::CompileError => execute_log!("Compile Error"), + _ => execute_log!("System Error"), + }; + ExecuteResponse { + result: Some(result), + } + } } pub struct JudgeArgBuilder { diff --git a/judger/src/language/plugin.rs b/judger/src/language/plugin.rs index 5bd91d7..119837a 100644 --- a/judger/src/language/plugin.rs +++ b/judger/src/language/plugin.rs @@ -145,7 +145,6 @@ where ) -> Pin> + Send>> { let compiler = trys!(self.as_compiler(args.source).await); let maybe_runner = trys!(compiler.compile().await); - log::debug!("runner created"); let mut runner = trys!(maybe_runner, Ok(JudgeResult::compile_error())); let mem_cpu = (args.mem, args.cpu); @@ -153,36 +152,30 @@ where let mut io = args.input.into_iter().zip(args.output.into_iter()); Box::pin(try_stream! { while let Some((input,output))=io.next(){ - let judger = runner.run(mem_cpu.clone(), input).await?; - let status = judger.get_result(&output, mode); - log::trace!("status: {:?}", status); - - let stat = judger.stat(); - yield JudgeResult { - status, - time: stat.cpu.total, - memory: stat.memory.total, - }; - if status!=StatusCode::Accepted{ + let judger = runner.judge(mem_cpu.clone(), input).await?; + + yield judger.get_result(&output, mode); + if judger.get_code(&output, mode)!=StatusCode::Accepted{ break; } } }) } - pub async fn execute(&self, args: ExecuteArgs) -> Result> { + pub async fn execute(&self, args: ExecuteArgs) -> Result { let compiler = self.as_compiler(args.source).await?; - Ok(match compiler.compile().await? { + let maybe_runner = compiler.compile().await?; + match maybe_runner { Some(mut runner) => { - let judger = runner.run((args.mem, args.cpu), args.input).await?; - - todo!("stream output"); - - let stat = judger.stat(); - - Some(todo!()) + let executor = runner.stream((args.mem, args.cpu), args.input).await?; + Ok(executor.get_result()) } - None => None, - }) + None => Ok(ExecuteResult { + status: StatusCode::CompileError, + time: 0, + memory: 0, + output: Vec::new(), + }), + } } pub fn get_memory_reserved(&self, mem: u64) -> u64 { self.spec.get_memory_reserved_size(mem) diff --git a/judger/src/language/spec.rs b/judger/src/language/spec.rs index e8ee10a..d7f4891 100644 --- a/judger/src/language/spec.rs +++ b/judger/src/language/spec.rs @@ -273,7 +273,7 @@ impl Default for RawJudge { memory_multiplier: Some(1.0), cpu_multiplier: Some(1.0), walltime: Some(360e9 as u64), - output: Some(1024*1024*16), + output: Some(1024 * 1024 * 16), } } } diff --git a/judger/src/language/stage/compile.rs b/judger/src/language/stage/compile.rs index d192fc0..86aef0c 100644 --- a/judger/src/language/stage/compile.rs +++ b/judger/src/language/stage/compile.rs @@ -10,6 +10,11 @@ use crate::{ use super::Runner; +/// First stage of language processing, compile the source code +/// +/// Note that by compile, we doesn't mean the traditional compile process +/// it could be any process that prepare the code to be ready for execution, +/// or do nothing(like python) pub struct Compiler { spec: Arc, handle: MountHandle, @@ -37,6 +42,7 @@ impl Compiler { } } +/// Context for compile stage struct CompileCtx { spec: Arc, path: PathBuf, diff --git a/judger/src/language/stage/judge.rs b/judger/src/language/stage/judge.rs index 07205fa..805a930 100644 --- a/judger/src/language/stage/judge.rs +++ b/judger/src/language/stage/judge.rs @@ -1,12 +1,13 @@ use std::sync::Arc; use crate::{ - language::spec::Spec, + language::{spec::Spec, JudgeResult}, sandbox::{Corpse, MonitorKind, Stat}, }; use super::{AssertionMode, StatusCode}; +/// The third stage of language processing, compare the output pub struct Judger { spec: Arc, corpse: Corpse, @@ -90,11 +91,11 @@ impl Judger { StatusCode::Accepted } - pub fn get_result(&self, output: &[u8], mode: AssertionMode) -> StatusCode { + pub fn get_code(&self, output: &[u8], mode: AssertionMode) -> StatusCode { match self.corpse.status() { Ok(status) => match status.success() { true => self.assert_output(output, mode), - false => StatusCode::WrongAnswer, + false => StatusCode::RuntimeError, }, Err(reason) => match reason { MonitorKind::Cpu => StatusCode::TimeLimitExceeded, @@ -104,4 +105,13 @@ impl Judger { }, } } + pub fn get_result(&self, output: &[u8], mode: AssertionMode) -> JudgeResult { + let status = self.get_code(output, mode); + let stat = self.stat(); + JudgeResult { + status, + time: stat.cpu.total, + memory: stat.memory.total, + } + } } diff --git a/judger/src/language/stage/mod.rs b/judger/src/language/stage/mod.rs index 52987dd..4bd2961 100644 --- a/judger/src/language/stage/mod.rs +++ b/judger/src/language/stage/mod.rs @@ -1,11 +1,17 @@ +//! collection of steps for judge and execute + mod compile; mod judge; mod run; +mod stream; pub use compile::Compiler; use grpc::{judger::JudgeMatchRule, judger::JudgerCode}; pub use run::Runner; +/// internal status code, use to decouple the grpc status code +/// +/// Status code is commonly use in OJ, it include example such as: AC, WA... #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum StatusCode { Accepted, @@ -19,10 +25,20 @@ pub enum StatusCode { SystemError, } +/// internal assertion mode, use to decouple the grpc status code +/// +/// Assertion mode reperesent the way to compare the output #[derive(Clone, Copy)] pub enum AssertionMode { + /// Skip single space and newline + /// + /// `a b`, and `a\nb\n` are the same SkipSpace, + /// Skip continous space and newline + /// + /// `a b`, `ab ` and `ab` are the same SkipContinousSpace, + /// Exact match Exact, } @@ -45,7 +61,6 @@ impl From for AssertionMode { impl From for JudgerCode { fn from(value: StatusCode) -> Self { - match value { StatusCode::Accepted => Self::Ac, StatusCode::WrongAnswer => Self::Wa, diff --git a/judger/src/language/stage/run.rs b/judger/src/language/stage/run.rs index 46421f4..2474969 100644 --- a/judger/src/language/stage/run.rs +++ b/judger/src/language/stage/run.rs @@ -7,8 +7,9 @@ use crate::{ Result, }; -use super::judge::Judger; +use super::{judge::Judger, stream::Streamer}; +/// Second stage of the language process, run the compiled code pub struct Runner { filesystem: MountHandle, spec: Arc, @@ -18,7 +19,7 @@ impl Runner { pub fn new(filesystem: MountHandle, spec: Arc) -> Self { Self { filesystem, spec } } - pub async fn run(&mut self, (mem, cpu): (u64, u64), input: Vec) -> Result { + pub async fn judge(&mut self, (mem, cpu): (u64, u64), input: Vec) -> Result { let ctx = RunCtx { spec: self.spec.clone(), path: self.filesystem.get_path().to_path_buf(), @@ -28,6 +29,16 @@ impl Runner { let corpse = process.wait(input).await?; Ok(Judger::new(self.spec.clone(), corpse)) } + pub async fn stream(&mut self, (mem, cpu): (u64, u64), input: Vec) -> Result { + let ctx = RunCtx { + spec: self.spec.clone(), + path: self.filesystem.get_path().to_path_buf(), + limit: self.spec.get_judge_limit(cpu, mem), + }; + let process = Process::new(ctx)?; + let corpse = process.wait(input).await?; + Ok(Streamer::new(self.spec.clone(), corpse)) + } } struct RunCtx { diff --git a/judger/src/language/stage/stream.rs b/judger/src/language/stage/stream.rs new file mode 100644 index 0000000..99c1bd6 --- /dev/null +++ b/judger/src/language/stage/stream.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; + +use crate::{ + language::{ + spec::{self, Spec}, + ExecuteResult, + }, + sandbox::{Corpse, MonitorKind}, +}; + +use super::StatusCode; + +/// Third stage of language processing, stream execution result +pub struct Streamer { + spec: Arc, + corpse: Corpse, +} + +impl Streamer { + pub fn new(spec: Arc, corpse: Corpse) -> Self { + Self { spec, corpse } + } + pub fn get_code(&self) -> StatusCode { + match self.corpse.status() { + Ok(status) => match status.success() { + true => StatusCode::Accepted, + false => StatusCode::RuntimeError, + }, + Err(reason) => match reason { + MonitorKind::Cpu => StatusCode::TimeLimitExceeded, + MonitorKind::Memory => StatusCode::MemoryLimitExceeded, + MonitorKind::Output => StatusCode::OutputLimitExceeded, + MonitorKind::Walltime => StatusCode::RealTimeLimitExceeded, + }, + } + } + pub fn get_result(&self) -> ExecuteResult { + let stat = self.corpse.stat(); + ExecuteResult { + status: self.get_code(), + time: stat.cpu.total, + memory: stat.memory.total, + output: self.corpse.stdout().to_vec(), + } + } +} diff --git a/judger/src/main.rs b/judger/src/main.rs index a12896f..3eec9fc 100644 --- a/judger/src/main.rs +++ b/judger/src/main.rs @@ -26,7 +26,6 @@ async fn main() { default_panic(info); std::process::exit(1); })); - #[cfg(debug_assertions)] log::warn!("running debug build"); diff --git a/judger/src/sandbox/dev.md b/judger/src/sandbox/dev.md new file mode 100644 index 0000000..f4fc389 --- /dev/null +++ b/judger/src/sandbox/dev.md @@ -0,0 +1,15 @@ +## Module Layout + +## Prerequisite knowledge + +### Control Group(linux) + +> In Linux, control groups (cgroups for short) act like a resource manager for your system. It lets you organize processes into groups and set limits on how much CPU, memory, network bandwidth, or other resources they can use. + +> cgroup is abbr for control group + +In practice, linux kernel expose cgroup's interface by vfs. + +To get started, you can follow [it article](https://access.redhat.com/documentation/zh-tw/red_hat_enterprise_linux/6/html/resource_management_guide/sec-creating_cgroups) from red hat to create one. + +In this project, we use `cgroups_rs`, which is an abstraction over underlying vfs. diff --git a/judger/src/sandbox/monitor/mem_cpu.rs b/judger/src/sandbox/monitor/mem_cpu.rs index 2bce248..2643d8c 100644 --- a/judger/src/sandbox/monitor/mem_cpu.rs +++ b/judger/src/sandbox/monitor/mem_cpu.rs @@ -46,14 +46,14 @@ impl Drop for Monitor { // FIXME: use explicit control flow // currently is controlled by dropping order, and it can be broken // if one of the thread panics - match self.cgroup.v2(){ + match self.cgroup.v2() { true => { self.cgroup.kill().expect("cgroup.kill does not exist"); self.cgroup.delete().unwrap(); - }, + } false => { self.cgroup.set_release_agent("").unwrap(); - }, + } } } } diff --git a/judger/src/server.rs b/judger/src/server.rs index 9e03f63..b4806b3 100644 --- a/judger/src/server.rs +++ b/judger/src/server.rs @@ -10,7 +10,7 @@ use uuid::Uuid; use crate::{ error::{ClientError, Error}, - language::{JudgeArgBuilder, Map}, + language::{ExecuteArgBuilder, JudgeArgBuilder, Map}, CONFIG, }; @@ -121,7 +121,7 @@ impl Judger for Server { })) } - type ExecStream = Pin> + Send>>; + type ExecStream = tokio_stream::Once>; async fn exec( &self, @@ -129,6 +129,41 @@ impl Judger for Server { ) -> Result, tonic::Status> { let payload = check_secret(req)?; - todo!() + let memory = payload.memory; + let cpu = payload.time; + + let source = payload.code; + let input = payload.input; + + let uuid = + Uuid::from_str(&payload.lang_uid).map_err(|_| ClientError::InvaildLanguageUuid)?; + + let plugin = self + .plugins + .get(&uuid) + .ok_or(ClientError::InvaildLanguageUuid)?; + + let resource: u32 = plugin + .get_memory_reserved(payload.memory) + .try_into() + .map_err(|_| Error::Platform)?; + + let permit = self + .semaphore + .clone() + .acquire_many_owned(resource) + .await + .map_err(|_| ClientError::ImpossibleMemoryRequirement)?; + + let args = ExecuteArgBuilder::new() + .cpu(cpu) + .mem(memory) + .source(source) + .input(input) + .build(); + + let result = plugin.execute(args).await?; + drop(permit); + Ok(Response::new(tokio_stream::once(Ok(result.into())))) } }