From d0bf6561d1657aa922f27685a24eee53b962fbd2 Mon Sep 17 00:00:00 2001 From: Eason <30045503+Eason0729@users.noreply.github.com> Date: Fri, 10 May 2024 17:04:03 +0800 Subject: [PATCH] feat(Judger): :construction: draft crate::sandbox::Context implementer in language model --- judger/src/filesystem/adapter/fuse.rs | 106 ++++++++++-- judger/src/filesystem/adapter/mod.rs | 14 +- judger/src/filesystem/adapter/reply.rs | 2 +- judger/src/filesystem/adapter/template.rs | 10 +- judger/src/filesystem/entry/mod.rs | 17 +- judger/src/filesystem/entry/ro.rs | 15 +- judger/src/filesystem/entry/rw.rs | 2 +- judger/src/filesystem/entry/tar.rs | 4 + judger/src/filesystem/mkdtemp.rs | 33 ++++ judger/src/filesystem/mod.rs | 4 +- judger/src/filesystem/table.rs | 14 -- judger/src/language/config.rs | 21 ++- judger/src/language/daemon.rs | 13 +- judger/src/language/mod.rs | 1 + judger/src/language/plugin.rs | 96 +++++++++++ judger/src/main.rs | 3 - judger/src/sandbox/mod.rs | 13 +- judger/src/semaphore.rs | 199 ---------------------- 18 files changed, 293 insertions(+), 274 deletions(-) create mode 100644 judger/src/filesystem/mkdtemp.rs create mode 100644 judger/src/language/plugin.rs delete mode 100644 judger/src/semaphore.rs diff --git a/judger/src/filesystem/adapter/fuse.rs b/judger/src/filesystem/adapter/fuse.rs index a39eecc..ee2ed6b 100644 --- a/judger/src/filesystem/adapter/fuse.rs +++ b/judger/src/filesystem/adapter/fuse.rs @@ -3,12 +3,10 @@ use std::{ffi::OsStr, num::NonZeroU32, path::Path, sync::Arc}; use futures_core::Future; use spin::Mutex; use tokio::io::{AsyncRead, AsyncSeek}; -use tokio::sync::Mutex as AsyncMutex; +use tokio::sync::{Mutex as AsyncMutex, OwnedSemaphorePermit}; -use crate::{ - filesystem::{resource::Resource, Entry, TarTree, BLOCKSIZE}, - semaphore::Permit, -}; +use crate::filesystem::entry::{Entry, TarTree, BLOCKSIZE}; +use crate::filesystem::resource::Resource; use super::{error::FuseError, handle::HandleTable, reply::*}; use fuse3::{ @@ -24,19 +22,17 @@ where handle_table: HandleTable>>, tree: Mutex>, resource: Arc, - _permit: Permit, } impl Filesystem where F: AsyncRead + AsyncSeek + Unpin + Send + Sync + 'static, { - pub(super) fn new(tree: TarTree, permit: Permit) -> Self { + pub(super) fn new(tree: TarTree, permit: u64) -> Self { Self { handle_table: HandleTable::new(), tree: Mutex::new(tree), - resource: Arc::new(Resource::new(permit.count())), - _permit: permit, + resource: Arc::new(Resource::new(permit)), } } pub async fn mount(self, path: impl AsRef + Clone) -> std::io::Result { @@ -81,7 +77,9 @@ where async move { let tree = self.tree.lock(); let parent_node = tree.get(parent as usize).ok_or(FuseError::InvaildIno)?; - let node = parent_node.get_by_component(name).ok_or(FuseError::InvalidPath)?; + let node = parent_node + .get_by_component(name) + .ok_or(FuseError::InvalidPath)?; // FIXME: unsure about the inode Ok(reply_entry(&req, node.get_value(), node.get_id() as u64)) } @@ -89,6 +87,20 @@ where fn forget(&self, _: Request, inode: u64, _: u64) -> impl Future + Send { async {} } + fn release( + &self, + req: Request, + inode: u64, + fh: u64, + flags: u32, + lock_owner: u64, + flush: bool, + ) -> impl Future> + Send { + async move { + self.handle_table.remove(fh); + Ok(()) + } + } fn statfs( &self, _: Request, @@ -412,3 +424,77 @@ where } } } + +#[cfg(test)] +mod test { + use std::{ + ffi::OsStr, + sync::atomic::{AtomicU64, Ordering}, + }; + + use fuse3::{ + raw::{Filesystem as _, Request}, + Errno, + }; + use tokio::fs::File; + + use crate::filesystem::adapter::Template; + + use super::Filesystem; + + const UNIQUE_COUNTER: AtomicU64 = AtomicU64::new(0); + + async fn nested_tar() -> Filesystem { + let template = Template::new("test/nested.tar").await.unwrap(); + template.as_filesystem(1024 * 1024) + } + fn spawn_request() -> Request { + Request { + unique: UNIQUE_COUNTER.fetch_add(1, Ordering::AcqRel), + uid: 1000, + gid: 1000, + pid: 2, + } + } + + #[tokio::test] + async fn lookup() { + let fs = nested_tar().await; + assert_eq!( + fs.lookup(spawn_request(), 1, OsStr::new("nest")) + .await + .unwrap() + .attr + .ino, + 2 + ); + assert_eq!( + fs.lookup(spawn_request(), 1, OsStr::new("o.txt")) + .await + .unwrap() + .attr + .ino, + 5 + ); + assert_eq!( + fs.lookup(spawn_request(), 2, OsStr::new("a.txt")) + .await + .unwrap() + .attr + .ino, + 3 + ); + assert_eq!( + fs.lookup(spawn_request(), 2, OsStr::new("o.txt")) + .await + .unwrap_err(), + Errno::new_not_exist() + ); + assert_eq!( + fs.lookup(spawn_request(), 100, OsStr::new("o.txt")) + .await + .unwrap_err(), + libc::ENOENT.into() + ) + } +} diff --git a/judger/src/filesystem/adapter/mod.rs b/judger/src/filesystem/adapter/mod.rs index 4935643..f7e8e20 100644 --- a/judger/src/filesystem/adapter/mod.rs +++ b/judger/src/filesystem/adapter/mod.rs @@ -10,7 +10,7 @@ pub use template::Template; #[cfg(test)] mod test { use super::*; - use crate::semaphore::Semaphore; + // use crate::semaphore::Semaphore; use env_logger::*; #[tokio::test] @@ -22,17 +22,9 @@ mod test { .ok(); log::info!("mounting test tarball in .temp ..."); - let global_resource = Semaphore::new(4096 * 1024 * 1024, 1); let template = Template::new("test/nested.tar").await.unwrap(); - let filesystem = template - .as_filesystem( - global_resource - .get_permit(1024 * 1024 * 1024) - .await - .unwrap(), - ) - .await; - let mut mount_handle = filesystem.mount("./.temp/18").await.unwrap(); + let filesystem = template.as_filesystem(1024 * 1024 * 1024); + let mut mount_handle = filesystem.mount("./.temp/1").await.unwrap(); let handle = &mut mount_handle; tokio::select! { diff --git a/judger/src/filesystem/adapter/reply.rs b/judger/src/filesystem/adapter/reply.rs index 957472d..b5c8cc7 100644 --- a/judger/src/filesystem/adapter/reply.rs +++ b/judger/src/filesystem/adapter/reply.rs @@ -6,7 +6,7 @@ use fuse3::{ }; use tokio::io::{AsyncRead, AsyncSeek}; -use crate::filesystem::{Entry, BLOCKSIZE}; +use crate::filesystem::{entry::Entry, entry::BLOCKSIZE}; const TTL: Duration = Duration::from_secs(1); diff --git a/judger/src/filesystem/adapter/template.rs b/judger/src/filesystem/adapter/template.rs index b4d17f0..ecd752b 100644 --- a/judger/src/filesystem/adapter/template.rs +++ b/judger/src/filesystem/adapter/template.rs @@ -5,10 +5,7 @@ use tokio::{ io::{AsyncRead, AsyncSeek}, }; -use crate::{ - filesystem::{table::DeepClone, TarTree}, - semaphore::Permit, -}; +use crate::filesystem::entry::TarTree; use super::fuse::Filesystem; @@ -26,9 +23,12 @@ where pub fn new_inner(tree: TarTree) -> Self { Self { tree } } - pub async fn as_filesystem(&self, permit: Permit) -> Filesystem { + pub fn as_filesystem(&self, permit: u64) -> Filesystem { Filesystem::new(self.tree.clone(), permit) } + pub async fn read_by_path(&self, path: impl AsRef) -> Option> { + self.tree.read_by_path(path).await + } } impl Template { diff --git a/judger/src/filesystem/entry/mod.rs b/judger/src/filesystem/entry/mod.rs index 0568b27..53104ef 100644 --- a/judger/src/filesystem/entry/mod.rs +++ b/judger/src/filesystem/entry/mod.rs @@ -1,11 +1,6 @@ mod ro; mod rw; mod tar; -pub mod prelude { - pub use super::tar::TarTree; - pub use super::Entry; - pub use super::MEMBLOCK_BLOCKSIZE as BLOCKSIZE; -} use self::{ro::TarBlock, rw::MemBlock}; use bytes::Bytes; @@ -16,9 +11,10 @@ use tokio::{ sync::{Mutex, OwnedMutexGuard}, }; -use super::{resource::Resource, table::DeepClone}; +use super::resource::Resource; -pub const MEMBLOCK_BLOCKSIZE: usize = 4096; +pub use tar::TarTree; +pub const BLOCKSIZE: usize = 4096; pub trait FuseReadTrait { async fn read(&mut self, offset: u64, size: u32) -> std::io::Result; @@ -97,6 +93,13 @@ where _ => None, } } + pub async fn read_all(&self) -> Option> { + match self { + Self::TarFile(block) => Some(block.read_all().await.expect("tar ball corrupted")), + Self::MemFile(block) => None, + _ => None, + } + } pub async fn write( self_: Arc>, offset: u64, diff --git a/judger/src/filesystem/entry/ro.rs b/judger/src/filesystem/entry/ro.rs index b6c71e6..d4aebc3 100644 --- a/judger/src/filesystem/entry/ro.rs +++ b/judger/src/filesystem/entry/ro.rs @@ -50,14 +50,13 @@ where pub fn get_size(&self) -> u32 { self.size } - // pub async fn read_all(&self) -> std::io::Result> { - // // let mut buf = Vec::with_capacity(self.size as usize); - // // let mut block = self.clone(); - // // block.seek(SeekFrom::Start(0)).await?; - // // block.read_to_end(&mut buf).await?; - // // Ok(buf) - // todo!() - // } + pub async fn read_all(&self) -> std::io::Result> { + let mut lock = self.file.lock().await; + lock.seek(SeekFrom::Start(self.start)).await?; + let mut buf = vec![0_u8; self.size as usize]; + lock.read_exact(&mut buf).await?; + Ok(buf) + } #[cfg(test)] fn from_raw(file: F, start: u64, size: u32) -> Self { Self { diff --git a/judger/src/filesystem/entry/rw.rs b/judger/src/filesystem/entry/rw.rs index de13703..1b3c32d 100644 --- a/judger/src/filesystem/entry/rw.rs +++ b/judger/src/filesystem/entry/rw.rs @@ -1,7 +1,7 @@ use std::{io, ops::Deref, sync::Arc}; use tokio::sync::Mutex; -use super::{FuseReadTrait, FuseWriteTrait, MEMBLOCK_BLOCKSIZE}; +use super::{FuseReadTrait, FuseWriteTrait, BLOCKSIZE}; /// A block in memory /// diff --git a/judger/src/filesystem/entry/tar.rs b/judger/src/filesystem/entry/tar.rs index 0be0b09..a8abcab 100644 --- a/judger/src/filesystem/entry/tar.rs +++ b/judger/src/filesystem/entry/tar.rs @@ -70,6 +70,10 @@ impl TarTree where F: AsyncRead + AsyncSeek + Unpin + Send + 'static, { + pub async fn read_by_path(&self, path: impl AsRef) -> Option> { + let node = self.0.get_by_path(to_internal_path(path.as_ref()))?; + Some(node.get_value().read_all().await.unwrap()) + } async fn parse_entry( &mut self, entry: tar::Entry<'_, R>, diff --git a/judger/src/filesystem/mkdtemp.rs b/judger/src/filesystem/mkdtemp.rs new file mode 100644 index 0000000..f353a7c --- /dev/null +++ b/judger/src/filesystem/mkdtemp.rs @@ -0,0 +1,33 @@ +use std::{ + ffi::{CStr, CString, OsStr}, + os::unix::ffi::OsStrExt, + path::{Path, PathBuf}, +}; + +use tokio::fs::remove_dir; + +pub struct MkdTemp(PathBuf); + +impl Drop for MkdTemp { + fn drop(&mut self) { + tokio::spawn(remove_dir(self.0.clone())); + } +} + +impl MkdTemp { + pub fn new() -> Self { + Self(unsafe { Self::new_inner("mdoj-XXXXXX") }) + } + pub unsafe fn new_inner(template: &str) -> PathBuf { + let template = CString::new(template).unwrap(); + let tmp_ptr = libc::mkdtemp(template.as_ptr() as *mut _); + let tmp_path = CStr::from_ptr(tmp_ptr); + let str_path = OsStr::from_bytes(tmp_path.to_bytes()); + drop(template); + // libc::free(tmp_ptr as *mut _); + PathBuf::from(str_path) + } + pub fn get_path(&self) -> &Path { + self.0.as_path() + } +} diff --git a/judger/src/filesystem/mod.rs b/judger/src/filesystem/mod.rs index 90c8e28..4dcfc33 100644 --- a/judger/src/filesystem/mod.rs +++ b/judger/src/filesystem/mod.rs @@ -3,7 +3,9 @@ mod adapter; mod entry; mod error; +mod mkdtemp; mod resource; mod table; -pub use entry::prelude::*; +pub use adapter::{Filesystem, Template}; +pub use fuse3::raw::MountHandle; diff --git a/judger/src/filesystem/table.rs b/judger/src/filesystem/table.rs index fdcdd1e..2550ebf 100644 --- a/judger/src/filesystem/table.rs +++ b/judger/src/filesystem/table.rs @@ -16,10 +16,6 @@ pub fn to_internal_path<'a>(path: &'a Path) -> impl Iterator + 'a // .collect::>() } -pub trait DeepClone { - async fn deep_clone(&self) -> Self; -} - #[derive(Clone)] struct Node { parent_idx: usize, @@ -27,16 +23,6 @@ struct Node { children: BTreeMap, // FIXME: use BtreeMap } -impl DeepClone for Node { - async fn deep_clone(&self) -> Self { - Self { - parent_idx: self.parent_idx, - value: self.value.deep_clone().await, - children: self.children.iter().map(|(k, v)| (k.clone(), *v)).collect(), - } - } -} - #[derive(Clone)] pub struct AdjTable { by_id: Vec>, diff --git a/judger/src/language/config.rs b/judger/src/language/config.rs index f2352c6..08cbf78 100644 --- a/judger/src/language/config.rs +++ b/judger/src/language/config.rs @@ -1,23 +1,28 @@ -use std::{ffi::OsString, time::Duration}; +use std::{ffi::OsString, path::Path, time::Duration}; use serde::Deserialize; -use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio::{ + fs::read_dir, + io::{AsyncRead, AsyncReadExt}, +}; use uuid::Uuid; use crate::sandbox::{Cpu, Memory}; -pub struct Config { +async fn load_plugin(path: impl AsRef) { + let dir_list = read_dir(path).await; +} + +pub struct Spec { pub compile_limit: (Cpu, Memory, u64, Duration), pub judge_limit: (Cpu, Memory, u64, Duration), pub compile_command: Vec, pub judge_command: Vec, } -impl Config { - async fn from_reader(mut reader: impl AsyncRead + Unpin) -> Self { - let mut buf = String::new(); - reader.read_to_string(&mut buf).await.unwrap(); - let mut raw: Raw = toml::from_str(&buf).unwrap(); +impl Spec { + pub fn from_str(content: &str) -> Self { + let mut raw: Raw = toml::from_str(content).unwrap(); raw.compile.fill(); raw.judge.fill(); diff --git a/judger/src/language/daemon.rs b/judger/src/language/daemon.rs index 931fdd8..67432f1 100644 --- a/judger/src/language/daemon.rs +++ b/judger/src/language/daemon.rs @@ -1,10 +1,10 @@ use std::collections::BTreeMap; +use tokio::{fs::File, sync::Semaphore}; use uuid::Uuid; use super::config::*; -use crate::semaphore::Semaphore; -use crate::CONFIG; +use crate::{filesystem::Template, CONFIG}; static PLUGIN_PATH: &str = "./plugins"; /// max queue judging task @@ -12,12 +12,17 @@ const MAX_QUEUE: usize = 10; pub struct Daemon { semaphore: Semaphore, - templates: BTreeMap, + templates: BTreeMap, +} + +struct Plugin { + config: Spec, + template: Template, } impl Daemon { pub fn new() -> Self { - let semaphore = Semaphore::new(CONFIG.memory, MAX_QUEUE); + let semaphore = Semaphore::new(todo!()); let mut templates = BTreeMap::new(); todo!("Load plugins"); // design a loader struct diff --git a/judger/src/language/mod.rs b/judger/src/language/mod.rs index 863bfb9..589f46c 100644 --- a/judger/src/language/mod.rs +++ b/judger/src/language/mod.rs @@ -1,2 +1,3 @@ mod config; mod daemon; +mod plugin; diff --git a/judger/src/language/plugin.rs b/judger/src/language/plugin.rs new file mode 100644 index 0000000..0764025 --- /dev/null +++ b/judger/src/language/plugin.rs @@ -0,0 +1,96 @@ +use std::{ + ffi::OsStr, + marker::PhantomData, + path::{Path, PathBuf}, + sync::Arc, +}; + +use rustix::path::Arg; +use tokio::fs::{read_dir, File}; + +use crate::{ + filesystem::{Filesystem, Template}, + sandbox::{Context as SandboxCtx, Filesystem as SandboxFS, Limit}, +}; + +use super::config::Spec; + +static EXTENSION: &str = "lang"; + +pub async fn load_plugins(path: impl AsRef) -> std::io::Result> { + let mut plugins = Vec::new(); + let mut dir_list = read_dir(path).await?; + while let Some(entry) = dir_list.next_entry().await? { + let path = entry.path(); + let ext = path.extension(); + if path.is_file() && ext.is_some() && ext.unwrap() == EXTENSION { + let plugin = Plugin::new(path).await?; + plugins.push(plugin); + } + } + Ok(plugins) +} + +pub struct Plugin { + spec: Spec, + template: Template, +} + +impl Plugin { + pub async fn new(path: impl AsRef + Clone) -> std::io::Result { + let template = Template::new(path.clone()).await?; + let spec_source = template.read_by_path("spec.toml").await.expect(&format!( + "sepc.toml not found in plugin {}", + path.as_ref().display() + )); + let spec = Spec::from_str(&spec_source.to_string_lossy()); + + Ok(Self { spec, template }) + } + pub async fn as_runner(self: Arc) -> PluginRunner { + PluginRunner { + source: self.clone(), + filesystem: self.template.as_filesystem(0), + _stage: PhantomData, + } + } +} + +pub struct Compile; +pub struct Execute; + +pub struct PluginRunner { + source: Arc, + filesystem: Filesystem, + _stage: PhantomData, +} + +impl SandboxCtx for PluginRunner { + type FS = PathBuf; + + fn create_fs(&mut self) -> Self::FS { + todo!() + } + + fn get_args(&mut self) -> impl Iterator { + self.source + .spec + .compile_command + .iter() + .map(|arg| arg.as_ref()) + } +} + +impl Limit for PluginRunner { + fn get_cpu(&mut self) -> crate::sandbox::Cpu { + todo!() + } + + fn get_memory(&mut self) -> crate::sandbox::Memory { + todo!() + } + + fn get_output(&mut self) -> u64 { + todo!() + } +} diff --git a/judger/src/main.rs b/judger/src/main.rs index 53cfe9f..d00a56d 100644 --- a/judger/src/main.rs +++ b/judger/src/main.rs @@ -2,9 +2,6 @@ mod config; mod filesystem; mod language; mod sandbox; -mod semaphore; - -use std::sync::Arc; pub use config::CONFIG; diff --git a/judger/src/sandbox/mod.rs b/judger/src/sandbox/mod.rs index 572c688..27fe20c 100644 --- a/judger/src/sandbox/mod.rs +++ b/judger/src/sandbox/mod.rs @@ -2,7 +2,11 @@ mod error; mod monitor; mod process; -use std::{ffi::OsStr, path::Path, time::Duration}; +use std::{ + ffi::OsStr, + path::{Path, PathBuf}, + time::Duration, +}; pub use self::monitor::{Cpu, Memory}; pub use error::Error; @@ -26,7 +30,12 @@ pub trait Limit { pub trait Filesystem { fn mount(&mut self) -> impl AsRef + Send; - fn get_size(&mut self) -> u64; +} + +impl Filesystem for PathBuf { + fn mount(&mut self) -> impl AsRef + Send { + self.as_path().iter() + } } impl Limit for (Cpu, Memory, u64, Duration) { diff --git a/judger/src/semaphore.rs b/judger/src/semaphore.rs deleted file mode 100644 index cf79f86..0000000 --- a/judger/src/semaphore.rs +++ /dev/null @@ -1,199 +0,0 @@ -use std::{ - fmt::Debug, - sync::{ - atomic::{self, Ordering}, - Arc, - }, -}; - -use spin::Mutex; -use tokio::sync::oneshot::*; - -#[derive(Debug, thiserror::Error, PartialEq)] -pub enum Error { - #[error("Max wait reached")] - MaxWaitReached, - #[error("Impossible to get the permit")] - ImpossibleResourceCondition, -} - -// impl From for CrateError { -// fn from(value: Error) -> CrateError { -// match value { -// Error::MaxWaitReached => CrateError::Insufficient("queuing quota"), -// Error::ImpossibleResourceCondition => CrateError::Insufficient("memory"), -// } -// } -// } - -struct SemaphoreInner { - permits: atomic::AtomicU64, - all_permits: u64, - max_wait: usize, - waiters: Mutex>)>>, -} - -#[derive(Clone)] -pub struct Semaphore(Arc); - -impl Semaphore { - /// Create a new asynchronous semaphore with the given number of permits. - /// - /// asynchronous semaphore is a synchronization primitive that limits the number of concurrent, - /// instead of blocking the thread, yeild to scheduler and wait for the permit. - /// - /// Note that there is no preemption. - pub fn new(all_permits: u64, max_wait: usize) -> Self { - Semaphore(Arc::new(SemaphoreInner { - permits: atomic::AtomicU64::new(all_permits), - all_permits, - max_wait, - waiters: Mutex::new(Vec::new()), - })) - } - /// get a permit from semaphore - /// - /// It return None if - /// 1. It's impossible to get the permit even no other task is holding the permit - /// 2. The number of waiting task is greater than max_wait - pub async fn get_permit(&self, permit: u64) -> Result { - // FIXME: return Result to differentiate between max_wait_reached and impossible_resource_condition - if permit > self.0.all_permits { - return Err(Error::ImpossibleResourceCondition); - } - let (tx, rx) = channel::<()>(); - { - let mut waiter = self.0.waiters.lock(); - if waiter.len() >= self.0.max_wait { - return Err(Error::MaxWaitReached); - } - waiter.push((permit, Some(tx))); - } - - self.try_wake(); - - rx.await.ok().expect("Channel closed"); - - Ok(Permit { - semaphore: self.clone(), - permit, - }) - } - fn release(&self, permit: u64) { - self.0.permits.fetch_add(permit, Ordering::Relaxed); - self.try_wake(); - } - fn try_wake(&self) { - let mut waiter = self.0.waiters.lock(); - if let Some((permit, ref mut waker)) = waiter.last_mut() { - let mut current = self.0.permits.load(Ordering::Acquire); - loop { - if current < *permit { - return; - } - if let Err(x) = self.0.permits.compare_exchange( - current, - current - *permit, - Ordering::SeqCst, - Ordering::Acquire, - ) { - current = x; - } else { - break; - }; - } - if waker.take().unwrap().send(()).is_err() { - log::warn!("Semaphore waiter disconnected"); - } - waiter.pop(); - } - } -} - -pub struct Permit { - semaphore: Semaphore, - permit: u64, -} - -impl Permit { - #[inline] - pub fn merge(&mut self, mut other: Permit) { - self.permit += other.permit; - other.permit = 0; - } - pub async fn add(&mut self, permit: u64) -> Result<(), Error> { - let other = self.semaphore.get_permit(permit).await?; - self.merge(other); - Ok(()) - } - pub fn count(&self) -> u64 { - self.permit - } -} - -impl Drop for Permit { - fn drop(&mut self) { - self.semaphore.release(self.permit); - } -} - -impl Debug for Permit { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Permit") - .field("permit", &self.permit) - .finish() - } -} - -impl PartialEq for Permit { - fn eq(&self, other: &Self) -> bool { - self.permit == other.permit - } -} - -#[cfg(test)] -mod test { - use tokio::time; - - use super::*; - #[tokio::test] - /// test [`Semaphore::get_permit`] return [`Err(Error::ImpossibleResourceCondition)`] when max_wait is reached - async fn get_permit_max() { - let semaphore = Semaphore::new(1024, 1024); - assert!(semaphore.get_permit(1024).await.is_ok()); - assert_eq!( - Err(Error::ImpossibleResourceCondition), - semaphore.get_permit(1025).await - ); - } - #[tokio::test] - /// test [`Semaphore::get_permit`] to ensure permit is distributed in order - /// (First come first serve, no matter amount of permit requested) - async fn get_permit_unorder() { - let semaphore = Semaphore::new(1024, 1024); - let permit = semaphore.get_permit(1).await.unwrap(); - let permit1 = tokio::spawn(async move { - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - semaphore.get_permit(1024).await - }); - drop(permit); - assert!(permit1.await.unwrap().is_ok()); - } - #[tokio::test] - /// test [`Semaphore::get_permit`] return [`Err(Error::MaxWaitReached)`] when max_wait is reached - async fn get_permit_max_wait() { - let semaphore = Semaphore::new(1024, 1); - let permit = semaphore.get_permit(1).await.unwrap(); - - let semaphore1 = semaphore.clone(); - - let _ = tokio::spawn(async move { - semaphore.get_permit(1024).await.unwrap(); - }); - - time::sleep(time::Duration::from_millis(4)).await; - assert_eq!(Err(Error::MaxWaitReached), semaphore1.get_permit(1).await); - - drop(permit); - } -}