diff --git a/judger/src/sandbox/limiter/hier.rs b/judger/src/sandbox/limiter/hier.rs index 601ad88..87573b9 100644 --- a/judger/src/sandbox/limiter/hier.rs +++ b/judger/src/sandbox/limiter/hier.rs @@ -1,7 +1,4 @@ use cgroups_rs::*; -use std::time::Duration; - -pub const MONITOR_ACCURACY: Duration = Duration::from_millis(80); pub enum MonitorKind { Cpu, diff --git a/judger/src/sandbox/limiter/mod.rs b/judger/src/sandbox/limiter/mod.rs index 1d6c183..479fcc7 100644 --- a/judger/src/sandbox/limiter/mod.rs +++ b/judger/src/sandbox/limiter/mod.rs @@ -1,3 +1,8 @@ +//! Provide ability to limit resource and retrieve final cpu and memory usage +//! +//! To use this module, you need to create it (provide resource limitation) and mount it, +//! finally, spawn process(it's user's responsibility to ensure the process +//! is spawned within the cgroup) pub(self) mod hier; pub(self) mod stat; pub(self) mod wrapper; @@ -10,6 +15,9 @@ use hier::*; use std::sync::Arc; use tokio::time::*; +const MONITOR_ACCURACY: Duration = Duration::from_millis(80); + +/// Exit reason of the process pub enum ExitReason { TimeOut, MemoryOut, @@ -21,7 +29,7 @@ pub async fn monitor(cgroup: Arc, cpu: Cpu) -> ExitReason { let oom_signal = wrapper.oom(); loop { - sleep(MONITOR_ACCURACY/2).await; + sleep(MONITOR_ACCURACY / 2).await; if let Ok(oom_hint) = oom_signal.try_recv() { log::trace!("oom hint: {}", oom_hint); @@ -46,16 +54,10 @@ impl Drop for Limiter { if let Some(monitor_task) = &self.monitor_task { monitor_task.abort(); } - if MONITER_KIND.heir().v2() { - self.cgroup.kill().expect("cgroup.kill does not exist"); - } else { - // use rustix::process::*; - // pid should not be reused until SIGPIPE send(when Process is Drop) - // therefore, it is safe to try killing the process(only true for nsjail) - - // current implementation of v1 support do nothing, wait for action of cleaning - // up the process on drop - // for pid in self.cgroup.tasks() {} + debug_assert!(self.cgroup.tasks().is_empty()); + match self.cgroup.tasks().is_empty() { + true => log::warn!("cgroup still have process running"), + false => self.cgroup.delete().expect("cgroup cannot be deleted"), } } } @@ -71,7 +73,7 @@ impl Limiter { .memory_swap_limit(0) .done() .cpu() - .period((MONITOR_ACCURACY/2).as_nanos() as u64) + .period((MONITOR_ACCURACY / 2).as_nanos() as u64) .quota(MONITOR_ACCURACY.as_nanos() as i64) .realtime_period(MONITOR_ACCURACY.as_nanos() as u64) .realtime_runtime(MONITOR_ACCURACY.as_nanos() as i64) @@ -87,12 +89,31 @@ impl Limiter { }) } /// wait for resource to exhaust + /// + /// Please remember that [`Drop::drop`] only optimistic kill(`SIGKILL`) + /// the process inside it, + /// user SHOULD NOT rely on this to kill the process. + /// + /// + /// 2. Actively limit(notify) cpu resource is achieved by polling the cgroup, + /// the delay require special attention, it is only guaranteed + /// to below limitation provided + [`MONITOR_ACCURACY`]. pub async fn wait_exhaust(&mut self) -> ExitReason { - self.monitor_task.take().unwrap().await.unwrap() + let reason = self.monitor_task.take().unwrap().await.unwrap(); + // optimistic kill(`SIGKILL`) the process inside + self.cgroup.kill().expect("cgroup.kill does not exist"); + reason } - /// get the current resource usage - pub async fn stat(self)->(Cpu,Memory){ + /// get the final resource usage + /// + /// Please remember thatActively limit(notify) cpu resource is achieved + /// by polling the cgroup, therefore the delay requirespecial attention, + /// it is only guaranteed to below limitation provided + [`MONITOR_ACCURACY`]. + pub async fn stat(self) -> (Cpu, Memory) { + // there should be no process left + debug_assert!(self.cgroup.tasks().is_empty()); + // poll once more to get final stat let wrapper = wrapper::CgroupWrapper::new(&self.cgroup); - (wrapper.cpu(),wrapper.memory()) + (wrapper.cpu(), wrapper.memory()) } } diff --git a/judger/src/sandbox/limiter/stat.rs b/judger/src/sandbox/limiter/stat.rs index bfc8793..a719810 100644 --- a/judger/src/sandbox/limiter/stat.rs +++ b/judger/src/sandbox/limiter/stat.rs @@ -1,3 +1,5 @@ +use cgroups_rs::cpuacct::CpuAcct; + pub struct Memory { pub kernel: u64, pub user: u64, @@ -14,4 +16,46 @@ impl Cpu { pub(super) fn out_of_resources(resource: &Self, stat: Self) -> bool { stat.kernel > resource.kernel || stat.user > resource.user || stat.total > resource.total } + + pub(super) fn from_acct(acct: CpuAcct) -> Self { + Cpu { + kernel: acct.usage_sys, + user: acct.usage_user, + total: acct.usage, + } + } + pub(super) fn from_raw(raw: &str) -> Self { + let mut kernel = u64::MAX; + let mut user = u64::MAX; + let mut total = u64::MAX; + + for (key, value) in raw.split('\n').filter_map(|stmt| stmt.split_once(' ')) { + match key { + "usage_usec" => total = value.parse().unwrap(), + "user_usec" => user = value.parse().unwrap(), + "system_usec" => kernel = value.parse().unwrap(), + _ => {} + }; + } + + Self { + kernel, + user, + total, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + #[test] + /// Test the [`Cpu::from_raw`] function + fn cpu_from_raw() { + let raw = "usage_usec 158972260000\nuser_usec 115998852000\nsystem_usec 42973408000\ncore_sched.force_idle_usec 0\nnr_periods 0\nnr_throttled 0\nthrottled_usec 0\nnr_bursts 0\nburst_usec 0\n"; + let cpu = Cpu::from_raw(raw); + assert_eq!(cpu.kernel, 158972260000); + assert_eq!(cpu.user, 115998852000); + assert_eq!(cpu.total, 42973408000); + } } diff --git a/judger/src/sandbox/limiter/wrapper.rs b/judger/src/sandbox/limiter/wrapper.rs index f5832cb..234a5f5 100644 --- a/judger/src/sandbox/limiter/wrapper.rs +++ b/judger/src/sandbox/limiter/wrapper.rs @@ -1,6 +1,7 @@ -use cgroups_rs::{cpu::CpuController, cpuacct::CpuAcctController, memory::MemController, Cgroup}; - +use super::hier::*; use super::stat::*; +use cgroups_rs::{cpu::CpuController, cpuacct::CpuAcctController, memory::MemController, Cgroup}; +use std::ops::Deref; /// newtype wrapper for cgroup form cgroup_rs pub struct CgroupWrapper<'a> { @@ -12,38 +13,17 @@ impl<'a> CgroupWrapper<'a> { Self { cgroup } } pub fn cpu(&self) -> Cpu { - let mut kernel = u64::MAX; - let mut user = u64::MAX; - let mut total = u64::MAX; - - match self.cgroup.controller_of::() { - Some(controller) => { - let usage = controller.cpuacct(); - kernel = usage.usage_sys; - user = usage.usage_user; - total = usage.usage; + match MONITER_KIND.deref() { + MonitorKind::Cpu => { + let controller: &CpuAcctController = self.cgroup.controller_of().unwrap(); + Cpu::from_acct(controller.cpuacct()) } - None => { + MonitorKind::CpuAcct => { let controller: &CpuController = self.cgroup.controller_of().unwrap(); - let raw: &str = &controller.cpu().stat; - - for (key, value) in raw.split('\n').filter_map(|stmt| stmt.split_once(' ')) { - match key { - "usage_usec" => total = value.parse().unwrap(), - "user_usec" => user = value.parse().unwrap(), - "system_usec" => kernel = value.parse().unwrap(), - _ => {} - }; - } + Cpu::from_raw(raw) } } - - Cpu { - kernel, - user, - total, - } } pub fn oom(&self) -> std::sync::mpsc::Receiver { let controller = self.cgroup.controller_of::().unwrap(); diff --git a/judger/src/sandbox/resource.rs b/judger/src/sandbox/resource.rs deleted file mode 100644 index e69de29..0000000 diff --git a/judger/src/sandbox/semaphore.rs b/judger/src/sandbox/semaphore.rs deleted file mode 100644 index 854a50a..0000000 --- a/judger/src/sandbox/semaphore.rs +++ /dev/null @@ -1,137 +0,0 @@ -use std::sync::{ - atomic::{self, Ordering}, - Arc, -}; - -use spin::Mutex; -use tokio::sync::oneshot::*; - -struct SemaphoreInner { - permits: atomic::AtomicUsize, - all_permits: usize, - max_wait: usize, - waiters: Mutex>)>>, -} - -#[derive(Clone)] -struct Semaphore(Arc); - -impl Semaphore { - /// Create a new asynchronous semaphore with the given number of permits. - /// - /// asynchronous semaphore is a synchronization primitive that limits the number of concurrent, - /// instead of blocking the thread, yeild to scheduler and wait for the permit. - /// - /// Note that there is no preemption. - pub fn new(all_permits: usize, max_wait: usize) -> Self { - Semaphore(Arc::new(SemaphoreInner { - permits: atomic::AtomicUsize::new(all_permits), - all_permits, - max_wait, - waiters: Mutex::new(Vec::new()), - })) - } - /// get a permit from semaphore - /// - /// It return None if - /// 1. It's impossible to get the permit even no other task is holding the permit - /// 2. The number of waiting task is greater than max_wait - pub async fn get_permit(&self, permit: usize) -> Option { - // FIXME: return Result to differentiate between max_wait_reached and impossible_resource_condition - if permit > self.0.all_permits { - return None; - } - let (tx, rx) = channel::<()>(); - { - let mut waiter = self.0.waiters.lock(); - if waiter.len() >= self.0.max_wait { - return None; - } - waiter.push((permit, Some(tx))); - } - - self.try_wake(); - - rx.await.ok()?; - - Some(Permit { - semaphore: self.clone(), - permit, - }) - } - fn release(&self, permit: usize) { - self.0.permits.fetch_add(permit, Ordering::Relaxed); - self.try_wake(); - } - fn try_wake(&self) { - let mut waiter = self.0.waiters.lock(); - if let Some((permit, ref mut waker)) = waiter.last_mut() { - let mut current = self.0.permits.load(Ordering::Acquire); - loop { - if current < *permit { - return; - } - if let Err(x) = self.0.permits.compare_exchange( - current, - current - *permit, - Ordering::SeqCst, - Ordering::Acquire, - ) { - current = x; - } else { - break; - }; - } - if waker.take().unwrap().send(()).is_err() { - log::warn!("Semaphore waiter disconnected"); - } - waiter.pop(); - } - } -} - -pub struct Permit { - semaphore: Semaphore, - permit: usize, -} - -impl Drop for Permit { - fn drop(&mut self) { - self.semaphore.release(self.permit); - } -} - -#[cfg(test)] -mod test { - use super::Semaphore; - #[tokio::test] - /// test max value of permit - async fn get_permit_max() { - let semaphore = Semaphore::new(1024, 1024); - assert!(semaphore.get_permit(1024).await.is_some()); - assert!(semaphore.get_permit(1025).await.is_none()); - } - #[tokio::test] - // test getting permit with ordering - async fn get_permit_unorder() { - let semaphore = Semaphore::new(1024, 1024); - let permit = semaphore.get_permit(1).await.unwrap(); - let permit1 = tokio::spawn(async move { - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - semaphore.get_permit(1024).await - }); - drop(permit); - assert!(permit1.await.unwrap().is_some()); - } - #[tokio::test] - // test `get_permit` return None when max_wait is reached - async fn get_permit_max_wait() { - let semaphore = Semaphore::new(1024, 1); - let semaphore1 = semaphore.clone(); - let _ = semaphore.get_permit(1).await.unwrap(); - let _ = tokio::spawn(async move { - semaphore.get_permit(1024).await.unwrap(); - }); - dbg!(semaphore1.get_permit(1).await.is_none()); - } -}