diff --git a/judger/src/filesystem/adapter/error.rs b/judger/src/filesystem/adapter/error.rs index 239e6ac..2df5faa 100644 --- a/judger/src/filesystem/adapter/error.rs +++ b/judger/src/filesystem/adapter/error.rs @@ -24,6 +24,8 @@ pub enum FuseError { impl From for fuse3::Errno { fn from(value: FuseError) -> Self { + #[cfg(test)] + log::warn!("FUSE driver return result: {}", value); match value { FuseError::IsDir => libc::EISDIR, FuseError::NotDir => libc::ENOTDIR, diff --git a/judger/src/filesystem/adapter/fuse.rs b/judger/src/filesystem/adapter/fuse.rs index 9bb7ca1..fdfaef2 100644 --- a/judger/src/filesystem/adapter/fuse.rs +++ b/judger/src/filesystem/adapter/fuse.rs @@ -3,9 +3,10 @@ use std::{ffi::OsStr, num::NonZeroU32, path::Path, sync::Arc}; use futures_core::Future; use spin::Mutex; use tokio::io::{AsyncRead, AsyncSeek}; +use tokio::sync::Mutex as AsyncMutex; use crate::{ - filesystem::{resource::Resource, TarTree, BLOCKSIZE}, + filesystem::{resource::Resource, Entry, TarTree, BLOCKSIZE}, semaphore::Permit, }; @@ -20,7 +21,7 @@ pub struct Filesystem where F: AsyncRead + AsyncSeek + Unpin + Send + 'static, { - handle_table: HandleTable, + handle_table: HandleTable>>, tree: Mutex>, resource: Arc, _permit: Permit, @@ -80,17 +81,9 @@ where async move { let tree = self.tree.lock(); let node = tree.get(parent as usize).ok_or(FuseError::InvaildIno)?; - log::info!( - "parent name: {}", - String::from_utf8_lossy(node.get_name().as_encoded_bytes()) - ); - log::info!( - "lookup name: {}", - String::from_utf8_lossy(name.as_encoded_bytes()) - ); let entry = node.get_by_component(name).ok_or(FuseError::InvalidPath)?; // FIXME: unsure about the inode - Ok(reply_entry(req, entry.get_value(), parent)) + Ok(reply_entry(&req, entry.get_value(), parent)) } } fn forget(&self, _: Request, inode: u64, _: u64) -> impl Future + Send { @@ -127,7 +120,9 @@ where if node.get_value().kind() != FileType::Directory { return Err(FuseError::NotDir.into()); } - let fh = self.handle_table.add(node.get_id()); + let fh = self + .handle_table + .add(AsyncMutex::new(node.get_value().clone())); Ok(ReplyOpen { fh, flags: 0 }) } } @@ -140,7 +135,12 @@ where async move { let tree = self.tree.lock(); let entry = tree.get(inode as usize).ok_or(FuseError::InvaildIno)?; - let fh = self.handle_table.add(entry.get_id()); + if entry.get_value().kind() == FileType::Directory { + return Err(FuseError::IsDir.into()); + } + let fh = self + .handle_table + .add(AsyncMutex::new(entry.get_value().clone())); Ok(ReplyOpen { fh, flags: 0 }) } } @@ -215,12 +215,14 @@ where let entries = vec![ Ok(dir_entry_plus( + &req, OsStr::new(".").to_os_string(), node.get_value(), node.get_id() as u64, 1, )), Ok(dir_entry_plus( + &req, OsStr::new("..").to_os_string(), parent_node.get_value(), parent_node.get_id() as u64, @@ -234,6 +236,7 @@ where .map(|(offset, inode)| { let node = tree.get(inode).unwrap(); dir_entry_plus( + &req, node.get_name().to_os_string(), node.get_value(), inode as u64, @@ -259,20 +262,13 @@ where size: u32, ) -> impl Future> + Send { async move { - let handle = { - let tree = self.tree.lock(); - let entry = self.handle_table.get(fh).ok_or(FuseError::HandleNotFound)?; - let node = tree.get(entry).ok_or(FuseError::InvaildIno)?; - let entry = node.get_value(); - entry.get_read_handle() - } - .ok_or(FuseError::IsDir)?; - - handle + let entry = self.handle_table.get(fh).ok_or(FuseError::HandleNotFound)?; + let mut lock = entry.lock().await; + Ok(lock .read(offset, size) .await - .map(|data| ReplyData { data }) - .map_err(Into::into) + .ok_or(Into::::into(FuseError::IsDir))? + .map(|data| ReplyData { data })?) } } fn write( @@ -286,17 +282,16 @@ where flags: u32, ) -> impl Future> + Send { async move { - let handle = { - let mut tree = self.tree.lock(); - let entry = self.handle_table.get(fh).ok_or(FuseError::HandleNotFound)?; - let mut node = tree.get_mut(entry).ok_or(FuseError::InvaildIno)?; - let entry = node.get_value(); - entry.get_write_handle() - } - .ok_or(FuseError::IsDir)?; - let resource = self.resource.clone(); - let written = handle.write(offset, data, &resource).await?; - Ok(ReplyWrite { written }) + let entry = self + .handle_table + .get(fh) + .ok_or(FuseError::HandleNotFound) + .unwrap(); + + Ok(Entry::write(entry, offset, data, &self.resource) + .await + .ok_or_else(|| Into::::into(FuseError::IsDir))? + .map(|written| ReplyWrite { written })?) } } fn access( @@ -305,6 +300,7 @@ where inode: u64, mask: u32, ) -> impl Future> + Send { + // FIXME: only allow current user to access async { Ok(()) } } fn fsync( @@ -346,7 +342,6 @@ where } } } - fn interrupt(&self, req: Request, unique: u64) -> impl Future> + Send { async { Ok(()) } } @@ -361,7 +356,59 @@ where let tree = self.tree.lock(); let entry = tree.get(inode as usize).ok_or(FuseError::InvaildIno)?; // FIXME: unsure about the inode - Ok(reply_attr(entry.get_value(), inode)) + Ok(reply_attr(&req, entry.get_value(), inode)) + } + } + fn setattr( + &self, + req: Request, + inode: Inode, + fh: Option, + set_attr: SetAttr, + ) -> impl Future> + Send { + async move { + let tree = self.tree.lock(); + let node = tree.get(inode as usize).ok_or(FuseError::InvaildIno)?; + Ok(reply_attr(&req, node.get_value(), inode)) + } + } + fn create( + &self, + req: Request, + parent: u64, + name: &OsStr, + mode: u32, + flags: u32, + ) -> impl Future> + Send { + async move { + let mut tree = self.tree.lock(); + let mut parent_node = tree.get_mut(parent as usize).ok_or(FuseError::InvaildIno)?; + if parent_node.get_value().kind() != FileType::Directory { + return Err(FuseError::NotDir.into()); + } + let mut node = parent_node.insert(name.to_os_string(), Entry::new_file()); + + // FIXME: append mode + Ok(reply_created(&req, node.get_value())) + } + } + fn mkdir( + &self, + req: Request, + parent: u64, + name: &OsStr, + mode: u32, + umask: u32, + ) -> impl Future> + Send { + async move { + let mut tree = self.tree.lock(); + let mut parent_node = tree.get_mut(parent as usize).ok_or(FuseError::InvaildIno)?; + if parent_node.get_value().kind() != FileType::Directory { + return Err(FuseError::NotDir.into()); + } + let mut node = parent_node.insert(name.to_os_string(), Entry::Directory); + let ino = node.get_id() as u64; + Ok(reply_entry(&req, node.get_value(), ino)) } } } diff --git a/judger/src/filesystem/adapter/handle.rs b/judger/src/filesystem/adapter/handle.rs index 2bfe571..ba27b3e 100644 --- a/judger/src/filesystem/adapter/handle.rs +++ b/judger/src/filesystem/adapter/handle.rs @@ -1,17 +1,20 @@ -use std::{collections::BTreeMap, sync::atomic::AtomicU64}; +use std::{ + collections::BTreeMap, + sync::{atomic::AtomicU64, Arc}, +}; -use spin::RwLock; +use spin::Mutex; -pub struct HandleTable { +pub struct HandleTable { handle_generator: AtomicU64, - table: RwLock>, + table: Mutex>>, } -impl HandleTable { +impl HandleTable { pub fn new() -> Self { Self { handle_generator: AtomicU64::new(1), - table: RwLock::new(BTreeMap::new()), + table: Mutex::new(BTreeMap::new()), } } pub fn add(&self, entry: E) -> u64 { @@ -19,15 +22,15 @@ impl HandleTable { .handle_generator .fetch_add(1, std::sync::atomic::Ordering::AcqRel); log::trace!("allocate handle: {}", handle); - self.table.write().insert(handle, entry); + self.table.lock().insert(handle, Arc::new(entry)); handle } - pub fn get(&self, handle: u64) -> Option { + pub fn get(&self, handle: u64) -> Option> { log::debug!("get handle: {}", handle); - self.table.read().get(&handle).cloned() + self.table.lock().get(&handle).cloned() } - pub fn remove(&self, handle: u64) -> Option { + pub fn remove(&self, handle: u64) -> Option> { log::trace!("deallocate handle: {}", handle); - self.table.write().remove(&handle) + self.table.lock().remove(&handle) } } diff --git a/judger/src/filesystem/adapter/mod.rs b/judger/src/filesystem/adapter/mod.rs index 07c9a92..34d4350 100644 --- a/judger/src/filesystem/adapter/mod.rs +++ b/judger/src/filesystem/adapter/mod.rs @@ -32,7 +32,7 @@ mod test { .unwrap(), ) .await; - let mut mount_handle = filesystem.mount("./.temp/1").await.unwrap(); + let mut mount_handle = filesystem.mount("./.temp/11").await.unwrap(); let handle = &mut mount_handle; tokio::select! { diff --git a/judger/src/filesystem/adapter/reply.rs b/judger/src/filesystem/adapter/reply.rs index 4f93598..957472d 100644 --- a/judger/src/filesystem/adapter/reply.rs +++ b/judger/src/filesystem/adapter/reply.rs @@ -8,7 +8,10 @@ use tokio::io::{AsyncRead, AsyncSeek}; use crate::filesystem::{Entry, BLOCKSIZE}; +const TTL: Duration = Duration::from_secs(1); + pub fn dir_entry_plus( + req: &Request, name: OsString, entry: &Entry, inode: u64, @@ -23,9 +26,9 @@ where kind: entry.kind(), name, offset, - attr: file_attr(entry, inode), - entry_ttl: Duration::from_secs(30), - attr_ttl: Duration::from_secs(30), + attr: file_attr(req, entry, inode), + entry_ttl: TTL, + attr_ttl: TTL, } } @@ -41,28 +44,28 @@ where } } -pub fn reply_attr(entry: &Entry, inode: u64) -> ReplyAttr +pub fn reply_attr(req: &Request, entry: &Entry, inode: u64) -> ReplyAttr where F: AsyncRead + AsyncSeek + Send + Unpin + 'static, { ReplyAttr { - ttl: Duration::from_secs(30), - attr: file_attr(&entry, inode), + ttl: TTL, + attr: file_attr(req, &entry, inode), } } -pub fn reply_entry(request: Request, entry: &Entry, inode: u64) -> ReplyEntry +pub fn reply_entry(req: &Request, entry: &Entry, inode: u64) -> ReplyEntry where F: AsyncRead + AsyncSeek + Send + Unpin + 'static, { ReplyEntry { - ttl: Duration::from_secs(30), - attr: file_attr(&entry, inode), + ttl: TTL, + attr: file_attr(req, &entry, inode), generation: 0, } } -pub fn file_attr(entry: &Entry, inode: u64) -> FileAttr +pub fn file_attr(req: &Request, entry: &Entry, inode: u64) -> FileAttr where F: AsyncRead + AsyncSeek + Send + Unpin + 'static, { @@ -81,9 +84,22 @@ where | libc::S_IRWXO | libc::S_ISVTX) as u16, nlink: 1, - uid: 0, - gid: 0, + uid: req.uid, + gid: req.gid, rdev: 179 << 16 + 02, blksize: BLOCKSIZE as u32, } } + +pub fn reply_created(req: &Request, entry: &Entry) -> ReplyCreated +where + F: AsyncRead + AsyncSeek + Send + Unpin + 'static, +{ + ReplyCreated { + ttl: TTL, + attr: file_attr(req, entry, 0), + generation: 0, + fh: 0, + flags: 0, + } +} diff --git a/judger/src/filesystem/adapter/template.rs b/judger/src/filesystem/adapter/template.rs index 991edb2..b4d17f0 100644 --- a/judger/src/filesystem/adapter/template.rs +++ b/judger/src/filesystem/adapter/template.rs @@ -6,7 +6,7 @@ use tokio::{ }; use crate::{ - filesystem::{adj::DeepClone, TarTree}, + filesystem::{table::DeepClone, TarTree}, semaphore::Permit, }; @@ -27,7 +27,7 @@ where Self { tree } } pub async fn as_filesystem(&self, permit: Permit) -> Filesystem { - Filesystem::new(self.tree.deep_clone().await, permit) + Filesystem::new(self.tree.clone(), permit) } } diff --git a/judger/src/filesystem/entry/mod.rs b/judger/src/filesystem/entry/mod.rs index 310d15e..5c24dae 100644 --- a/judger/src/filesystem/entry/mod.rs +++ b/judger/src/filesystem/entry/mod.rs @@ -1,31 +1,31 @@ -use std::{ffi::OsString, ops::Deref, sync::Arc}; +mod ro; +mod rw; +mod tar; +pub mod prelude { + pub use super::tar::TarTree; + pub use super::Entry; + pub use super::MEMBLOCK_BLOCKSIZE as BLOCKSIZE; +} +use self::{ro::TarBlock, rw::MemBlock}; use bytes::Bytes; use fuse3::FileType; +use std::{ffi::OsString, ops::Deref, sync::Arc}; use tokio::{ io::{AsyncRead, AsyncSeek}, sync::{Mutex, OwnedMutexGuard}, }; -use self::{ - ro::TarBlock, - rw::MemBlock, - wrapper::{FuseRead, FuseWrite}, -}; - -use super::{adj::DeepClone, resource::Resource}; - -mod ro; -mod rw; -mod tar; -mod wrapper; +use super::{table::DeepClone, resource::Resource}; pub const MEMBLOCK_BLOCKSIZE: usize = 4096; -pub mod prelude { - pub use super::tar::TarTree; - pub use super::Entry; - pub use super::MEMBLOCK_BLOCKSIZE as BLOCKSIZE; +pub trait FuseReadTrait { + async fn read(&mut self, offset: u64, size: u32) -> std::io::Result; +} + +pub trait FuseWriteTrait { + async fn write(&mut self, offset: u64, data: &[u8]) -> std::io::Result; } async fn clone_arc(arc: &Arc>) -> Arc> { @@ -34,6 +34,9 @@ async fn clone_arc(arc: &Arc>) -> Arc> { Arc::new(Mutex::new(lock.deref().clone())) } +/// Entry in the filesystem +/// +/// cloning the entry would clone file state #[derive(Debug, Default)] pub enum Entry where @@ -43,21 +46,21 @@ where HardLink(u64), #[default] Directory, - TarFile(Arc>>), - MemFile(Arc>), + TarFile(TarBlock), + MemFile(MemBlock), } -impl DeepClone for Entry +impl Clone for Entry where F: AsyncRead + AsyncSeek + Unpin + Send + 'static, { - async fn deep_clone(&self) -> Self { + fn clone(&self) -> Self { match self { - Self::SymLink(x) => Self::SymLink(x.clone()), - Self::HardLink(x) => Self::HardLink(*x), + Self::SymLink(arg0) => Self::SymLink(arg0.clone()), + Self::HardLink(arg0) => Self::HardLink(arg0.clone()), Self::Directory => Self::Directory, - Self::TarFile(block) => Self::TarFile(clone_arc(block).await), - Self::MemFile(block) => Self::MemFile(clone_arc(block).await), + Self::TarFile(arg0) => Self::TarFile(arg0.clone()), + Self::MemFile(arg0) => Self::MemFile(arg0.clone()), } } } @@ -66,6 +69,9 @@ impl Entry where F: AsyncRead + AsyncSeek + Unpin + Send + 'static, { + pub fn new_file() -> Self { + Self::MemFile(MemBlock::default()) + } pub fn kind(&self) -> FileType { match self { Self::SymLink(_) => FileType::Symlink, @@ -80,90 +86,33 @@ where Self::SymLink(x) => x.len() as u64, Self::HardLink(_) => 0, Self::Directory => 0, - Self::TarFile(_) | Self::MemFile(_) => 1, + Self::TarFile(x) => x.get_size() as u64, + Self::MemFile(x) => x.get_size(), } } - pub fn get_read_handle(&self) -> Option> { + pub async fn read(&mut self, offset: u64, size: u32) -> Option> { match self { - Self::TarFile(block) => Some(ReadHandle::TarFile(block.clone())), - Self::MemFile(block) => Some(ReadHandle::MemFile(block.clone())), + Self::TarFile(block) => Some(Ok(block.read(offset, size).await.unwrap())), + Self::MemFile(block) => Some(block.read(offset, size).await), _ => None, } } - pub fn get_write_handle(&mut self) -> Option> { - match self { - Self::TarFile(block) => { - let tar_block = block.clone(); - let mem_block = Arc::new(Mutex::new(MemBlock::new(Vec::new()))); - *self = Self::MemFile(mem_block.clone()); - Some(WriteHandle::TarFile( - tar_block, - mem_block.try_lock_owned().unwrap(), - )) - } - Self::MemFile(block) => Some(WriteHandle::MemFile(block.clone())), - _ => None, - } - } -} - -pub enum ReadHandle -where - F: AsyncRead + AsyncSeek + Unpin + Send + 'static, -{ - TarFile(Arc>>), - MemFile(Arc>), -} - -impl ReadHandle -where - F: AsyncRead + AsyncSeek + Unpin + Send + 'static, -{ - pub async fn read(&self, offset: u64, size: u32) -> std::io::Result { - match self { - Self::TarFile(block) => { - let mut lock = block.lock().await; - FuseRead(&mut *lock).read(offset, size).await - } - Self::MemFile(block) => { - let mut lock = block.lock().await; - FuseRead(&mut *lock).read(offset, size).await - } - } - } -} - -pub enum WriteHandle -where - F: AsyncRead + AsyncSeek + Unpin + Send + 'static, -{ - TarFile(Arc>>, OwnedMutexGuard), - MemFile(Arc>), -} - -impl WriteHandle -where - F: AsyncRead + AsyncSeek + Unpin + Send + 'static, -{ pub async fn write( - &self, + self_: Arc>, offset: u64, data: &[u8], resource: &Resource, - ) -> std::io::Result { - match self { - Self::TarFile(tar_block, mem_block) => { - // let mut lock = tar_block.lock().await; - // let mem_block=MemBlock::new(lock.read_all().await.unwrap()); + ) -> Option> { + let mut lock = self_.lock().await; + if resource.comsume(data.len() as u32).is_none() { + return Some(Err(std::io::Error::from(std::io::ErrorKind::Other))); + } + match &mut *lock { + Self::MemFile(block) => Some(block.write(offset, data).await), + Self::TarFile(block) => { todo!() } - Self::MemFile(block) => { - resource - .comsume(data.len() as u32) - .ok_or(std::io::Error::from(std::io::ErrorKind::Other))?; - let mut lock = block.lock().await; - FuseWrite(&mut *lock).write(offset, data).await - } + _ => None, } } } diff --git a/judger/src/filesystem/entry/ro.rs b/judger/src/filesystem/entry/ro.rs index 310beba..b6c71e6 100644 --- a/judger/src/filesystem/entry/ro.rs +++ b/judger/src/filesystem/entry/ro.rs @@ -5,39 +5,16 @@ //! //! And we map each type of content to BTreeMap -use crate::filesystem::macro_::{chain_poll, report_poll}; use std::{ - future::Future, io::{self, SeekFrom}, - ops::DerefMut, - pin::{pin, Pin}, sync::Arc, - task::{Context, Poll}, }; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}, - sync::{Mutex, OwnedMutexGuard}, + sync::Mutex, }; -#[derive(Default, Debug)] -enum TarStage { - Reading(OwnedMutexGuard), - Seeking(OwnedMutexGuard), - #[default] - Done, -} - -impl TarStage { - fn take(&mut self) -> Self { - std::mem::take(self) - } - fn take_seeking(&mut self) -> OwnedMutexGuard { - if let Self::Seeking(locked) = self.take() { - return locked; - } - unreachable!("") - } -} +use super::FuseReadTrait; /// A block in tar file, should be readonly /// @@ -52,215 +29,177 @@ where { file: Arc>, start: u64, - size: u64, - cursor: u64, - stage: TarStage, -} - -impl PartialEq for TarBlock -where - F: AsyncRead + AsyncSeek + Unpin, -{ - fn eq(&self, other: &Self) -> bool { - Arc::ptr_eq(&self.file, &other.file) - && self.start == other.start - && self.size == other.size - && self.cursor == other.cursor - } -} - -impl Clone for TarBlock -where - F: AsyncRead + AsyncSeek + Unpin, -{ - fn clone(&self) -> Self { - Self { - file: self.file.clone(), - start: self.start, - size: self.size, - cursor: self.cursor, - stage: TarStage::Done, - } - } + size: u32, + cursor: u32, } impl TarBlock where F: AsyncRead + AsyncSeek + Unpin + 'static, { - pub fn new(file: Arc>, start: u64, size: u64) -> Self { + pub fn new(file: Arc>, start: u64, size: u32) -> Self { + log::info!("new block: start={}, size={}", start, size); Self { file, start, size, cursor: 0, - stage: TarStage::Done, } } #[inline] - pub fn get_size(&self) -> u64 { + pub fn get_size(&self) -> u32 { self.size } - pub async fn read_all(&self) -> std::io::Result> { - let mut buf = Vec::with_capacity(self.size as usize); - let mut block = self.clone(); - block.seek(SeekFrom::Start(0)).await?; - block.read_to_end(&mut buf).await?; - Ok(buf) - } + // pub async fn read_all(&self) -> std::io::Result> { + // // let mut buf = Vec::with_capacity(self.size as usize); + // // let mut block = self.clone(); + // // block.seek(SeekFrom::Start(0)).await?; + // // block.read_to_end(&mut buf).await?; + // // Ok(buf) + // todo!() + // } #[cfg(test)] - fn from_raw(file: F, start: u64, size: u64) -> Self { + fn from_raw(file: F, start: u64, size: u32) -> Self { Self { file: Arc::new(Mutex::new(file)), start, size, cursor: 0, - stage: TarStage::Done, } } #[inline] - fn get_seek_from(&self) -> SeekFrom { - SeekFrom::Start(self.start + self.cursor) - } - #[inline] - fn check_bound(&self) -> bool { - self.cursor > self.size + fn get_seek_from(&self, offset: u64) -> Option { + if self.cursor > self.size { + None + } else { + Some(SeekFrom::Start(self.start + offset + (self.cursor) as u64)) + } } #[inline] - fn get_remain(&self) -> u64 { + fn get_remain(&self) -> u32 { self.size - self.cursor } } -impl AsyncRead for TarBlock +impl FuseReadTrait for TarBlock where F: AsyncRead + AsyncSeek + Unpin + 'static, { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - if self.check_bound() { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "tar block out of bound", - ))); - } - let original_size = buf.filled().len(); - match &mut self.stage { - TarStage::Reading(ref mut locked) => { - report_poll!(chain_poll!(pin!(locked.deref_mut()).poll_read(cx, buf))); - let read_byte = (buf.filled().len() - original_size) as u64; - match read_byte > self.get_remain() { - true => { - buf.set_filled(original_size + self.get_remain() as usize); - self.cursor += self.get_remain(); - } - false => self.cursor += read_byte, - }; - self.stage.take(); - return Poll::Ready(Ok(())); - } - TarStage::Seeking(ref mut locked) => { - let result = chain_poll!(pin!(locked.deref_mut()).poll_complete(cx)); - let read_byte = report_poll!(result); - self.as_mut().stage = TarStage::Reading(self.stage.take_seeking()); - self.as_mut().cursor = read_byte - self.start; - cx.waker().wake_by_ref(); - } - TarStage::Done => { - let mut locked = chain_poll!(pin!(self.file.clone().lock_owned()).poll(cx)); - if let Err(err) = pin!(locked.deref_mut()).start_seek(self.get_seek_from()) { - return Poll::Ready(Err(err)); - } - self.as_mut().stage = TarStage::Seeking(locked); - cx.waker().wake_by_ref(); - } + async fn read(&mut self, offset: u64, size: u32) -> std::io::Result { + let size = size as usize; + let mut lock = self.file.lock().await; + let seek_from = self.get_seek_from(offset).ok_or(io::Error::new( + io::ErrorKind::UnexpectedEof, + "tar block out of bound", + ))?; + lock.seek(seek_from).await?; + + let mut buf = vec![0_u8; size]; + + let mut readed_byte = 0; + while readed_byte < size { + match lock.read(&mut buf).await { + Err(err) if readed_byte == 0 => return Err(err), + Ok(0) | Err(_) => break, + Ok(x) => readed_byte += x, + }; } - Poll::Pending + readed_byte = readed_byte.min(size); + self.cursor += readed_byte as u32; + + buf.resize(readed_byte, 0_u8); + Ok(bytes::Bytes::from(buf)) } } -impl AsyncSeek for TarBlock +impl PartialEq for TarBlock where - F: AsyncRead + AsyncSeek + Unpin + 'static, + F: AsyncRead + AsyncSeek + Unpin, { - fn start_seek(self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> { - let self_ = self.get_mut(); - self_.cursor = match position { - io::SeekFrom::Start(x) => x, - io::SeekFrom::End(x) => (self_.size as i64 + x).try_into().unwrap_or_default(), - io::SeekFrom::Current(x) => (self_.cursor as i64 + x).try_into().unwrap_or_default(), - }; - if self_.check_bound() { - return Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "tar block out of bound", - )); - } - Ok(()) - } - - fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(self.cursor)) + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.file, &other.file) + && self.start == other.start + && self.size == other.size + && self.cursor == other.cursor } } -#[cfg(test)] -mod test { - use std::io::Cursor; - - use tokio::io::BufReader; - - use super::*; - #[tokio::test] - async fn normal_read() { - let underlying = BufReader::new(Cursor::new(b"111hello world111")); - let mut block = TarBlock::from_raw(underlying, 3, 11); - - let mut buf = [0_u8; 11]; - block.read_exact(&mut buf).await.unwrap(); - - assert_eq!(buf, *b"hello world"); - } - #[tokio::test] - async fn end_of_file_read() { - let underlying = BufReader::new(Cursor::new(b"111hello world")); - let mut block = TarBlock::from_raw(underlying, 3, 11); - - let mut buf = [0_u8; 11]; - block.read_exact(&mut buf).await.unwrap(); - - assert_eq!( - block.read_u8().await.unwrap_err().kind(), - io::ErrorKind::UnexpectedEof - ); - } - #[tokio::test] - async fn multi_sequential_read() { - let underlying = BufReader::new(Cursor::new(b"111hello world111")); - let mut block = TarBlock::from_raw(underlying, 3, 11); - - for c in b"hello world" { - assert_eq!(block.read_u8().await.unwrap(), *c); - } - } - #[tokio::test(flavor = "multi_thread", worker_threads = 8)] - async fn multi_reader_read() { - let underlying = BufReader::new(Cursor::new(b"111hello world111")); - let underlying = Arc::new(Mutex::new(underlying)); - let block = TarBlock::new(underlying, 3, 11); - - for _ in 0..30 { - let mut block = block.clone(); - tokio::spawn(async move { - for _ in 0..400 { - let mut buf = [0_u8; 11]; - block.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, *b"hello world"); - } - }); +impl Clone for TarBlock +where + F: AsyncRead + AsyncSeek + Unpin, +{ + fn clone(&self) -> Self { + Self { + file: self.file.clone(), + start: self.start, + size: self.size, + cursor: 0, } } } + +// #[cfg(test)] +// mod test { +// use std::io::Cursor; + +// use tokio::{fs::File, io::BufReader}; + +// use super::*; +// #[tokio::test] +// async fn file_io() { +// let file = File::open("test/single_file.tar").await.unwrap(); +// let mut block = TarBlock::new(Arc::new(Mutex::new(file)), 512, 11); +// let mut buf = [0_u8; 11]; +// block.read_exact(&mut buf).await.unwrap(); +// assert_eq!(buf, *b"hello world"); +// } +// #[tokio::test] +// async fn normal_read() { +// let underlying = BufReader::new(Cursor::new(b"111hello world111")); +// let mut block = TarBlock::from_raw(underlying, 3, 11); + +// let mut buf = [0_u8; 11]; +// block.read_exact(&mut buf).await.unwrap(); + +// assert_eq!(buf, *b"hello world"); +// } +// #[tokio::test] +// async fn end_of_file_read() { +// let underlying = BufReader::new(Cursor::new(b"111hello world")); +// let mut block = TarBlock::from_raw(underlying, 3, 11); + +// let mut buf = [0_u8; 11]; +// block.read_exact(&mut buf).await.unwrap(); + +// assert_eq!( +// block.read_u8().await.unwrap_err().kind(), +// io::ErrorKind::UnexpectedEof +// ); +// } +// #[tokio::test] +// async fn multi_sequential_read() { +// let underlying = BufReader::new(Cursor::new(b"111hello world111")); +// let mut block = TarBlock::from_raw(underlying, 3, 11); + +// for c in b"hello world" { +// assert_eq!(block.read_u8().await.unwrap(), *c); +// } +// } +// #[tokio::test(flavor = "multi_thread", worker_threads = 8)] +// async fn multi_reader_read() { +// let underlying = BufReader::new(Cursor::new(b"111hello world111")); +// let underlying = Arc::new(Mutex::new(underlying)); +// let block = TarBlock::new(underlying, 3, 11); + +// for _ in 0..30 { +// let mut block = block.clone(); +// tokio::spawn(async move { +// for _ in 0..400 { +// let mut buf = [0_u8; 11]; +// block.read_exact(&mut buf).await.unwrap(); +// assert_eq!(buf, *b"hello world"); +// } +// }); +// } +// } +// } diff --git a/judger/src/filesystem/entry/rw.rs b/judger/src/filesystem/entry/rw.rs index 3319809..de13703 100644 --- a/judger/src/filesystem/entry/rw.rs +++ b/judger/src/filesystem/entry/rw.rs @@ -1,273 +1,282 @@ -use crate::filesystem::macro_::{chain_poll, report_poll}; -use std::{ - future::Future, - io::{self, SeekFrom}, - ops::Deref, - pin::{pin, Pin}, - sync::Arc, - task::{Context, Poll}, -}; -use tokio::{ - io::{AsyncRead, AsyncSeek, AsyncWrite}, - sync::{Mutex, OwnedMutexGuard}, -}; +use std::{io, ops::Deref, sync::Arc}; +use tokio::sync::Mutex; -use super::MEMBLOCK_BLOCKSIZE; - -#[derive(Debug, Default)] -enum MemStage { - Seeking(OwnedMutexGuard>, SeekFrom), - SeekStart(SeekFrom), - Reading(OwnedMutexGuard>), - // Writing(OwnedMutexGuard>), - #[default] - Done, -} - -impl MemStage { - fn take(&mut self) -> Self { - std::mem::take(self) - } -} +use super::{FuseReadTrait, FuseWriteTrait, MEMBLOCK_BLOCKSIZE}; +/// A block in memory +/// +/// Note that [`MemBlock`] behavior like [`tokio::fs::File`], +/// except that it dones't shares the same underlying file session +/// by cloning(Reads, writes, and seeks would **not** affect both +/// [`MemBlock`] instances simultaneously.) #[derive(Default, Debug)] pub struct MemBlock { data: Arc>>, cursor: usize, - stage: MemStage, write_buffer: Vec, } -impl Clone for MemBlock { - fn clone(&self) -> Self { - Self { - data: self.data.clone(), - cursor: self.cursor.clone(), - stage: MemStage::default(), - write_buffer: self.write_buffer.clone(), - } - } -} - impl MemBlock { pub fn new(data: Vec) -> Self { Self { data: Arc::new(Mutex::new(data)), cursor: 0, - stage: MemStage::Done, write_buffer: Vec::new(), } } + pub fn get_size(&self) -> u64 { + self.data.try_lock().map(|x| x.len()).unwrap_or_default() as u64 + } } -impl AsyncRead for MemBlock { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - match self.stage.take() { - MemStage::Reading(locked) => { - if locked.len() < self.cursor { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "mem block out of bound", - ))); - } - let slice = &locked.deref()[self.cursor - ..(self.cursor + MEMBLOCK_BLOCKSIZE.min(buf.remaining())).min(locked.len())]; - self.cursor += slice.len(); - buf.put_slice(slice); - return Poll::Ready(Ok(())); - } - _ => { - let locked = chain_poll!(pin!(self.data.clone().lock_owned()).poll(cx)); - self.as_mut().stage = MemStage::Reading(locked); - cx.waker().wake_by_ref(); - } +impl FuseReadTrait for MemBlock { + async fn read(&mut self, offset: u64, size: u32) -> std::io::Result { + let locked = self.data.lock().await; + if locked.len() < offset as usize { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "mem block out of bound", + )); } - Poll::Pending + let offset = offset as usize; + let slice = &locked.deref()[offset..(offset + size as usize).min(locked.len())]; + Ok(bytes::Bytes::copy_from_slice(slice)) } } - -impl AsyncSeek for MemBlock { - fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> { - self.stage = MemStage::SeekStart(position); - Ok(()) - } - - fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.stage.take() { - MemStage::SeekStart(position) => { - let locked = chain_poll!(pin!(self.data.clone().lock_owned()).poll(cx)); - self.stage = MemStage::Seeking(locked, position); - cx.waker().wake_by_ref(); - } - MemStage::Seeking(locked, position) => { - let size = locked.len() as i64; - let new_position = match position { - SeekFrom::Start(x) => x.try_into().unwrap_or_default(), - SeekFrom::End(x) => size.saturating_sub(x), - SeekFrom::Current(x) => (self.cursor as i64).saturating_add(x), - }; - if new_position < 0 { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::InvalidInput, - "invalid seek position", - ))); - } - if new_position > size { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "mem block out of bound", - ))); - } - self.cursor = new_position as usize; - return Poll::Ready(Ok(self.cursor as u64)); - } - _ => { - return Poll::Ready(Ok(self.cursor as u64)); - } +impl FuseWriteTrait for MemBlock { + async fn write(&mut self, offset: u64, data: &[u8]) -> std::io::Result { + let mut locked = self.data.lock().await; + if locked.len() < offset as usize { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "mem block out of bound", + )); } - Poll::Pending + locked.resize(offset as usize, 0); + locked.extend_from_slice(data); + Ok(data.len() as u32) } } -impl AsyncWrite for MemBlock { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.write_buffer.extend_from_slice(&buf); - if self.write_buffer.len() >= MEMBLOCK_BLOCKSIZE { - report_poll!(chain_poll!(self.as_mut().poll_flush(cx))); +impl Clone for MemBlock { + fn clone(&self) -> Self { + Self { + data: self.data.clone(), + cursor: self.cursor.clone(), + write_buffer: self.write_buffer.clone(), } - Poll::Ready(Ok(buf.len())) } +} - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut locked = chain_poll!(pin!(self.data.clone().lock_owned()).poll(cx)); - locked.extend_from_slice(&self.write_buffer); - self.write_buffer.clear(); - Poll::Ready(Ok(())) - } +// impl AsyncRead for MemBlock { +// fn poll_read( +// mut self: Pin<&mut Self>, +// cx: &mut Context<'_>, +// buf: &mut tokio::io::ReadBuf<'_>, +// ) -> Poll> { +// let cursor = self.cursor; +// match &mut self.stage { +// MemStage::Reading(ref mut locked) => { +// if locked.len() < cursor { +// return Poll::Ready(Err(io::Error::new( +// io::ErrorKind::UnexpectedEof, +// "mem block out of bound", +// ))); +// } +// let slice = &locked.deref() +// [cursor..(cursor + MEMBLOCK_BLOCKSIZE.min(buf.remaining())).min(locked.len())]; +// buf.put_slice(slice); +// self.cursor += slice.len(); +// return Poll::Ready(Ok(())); +// } +// _ => { +// let locked = chain_poll!(pin!(self.data.clone().lock_owned()).poll(cx)); +// self.as_mut().stage = MemStage::Reading(locked); +// cx.waker().wake_by_ref(); +// } +// } +// Poll::Pending +// } +// } - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.as_mut().poll_flush(cx) - } -} +// impl AsyncSeek for MemBlock { +// fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> { +// self.stage = MemStage::SeekStart(position); +// Ok(()) +// } -#[cfg(test)] -mod test { - use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +// fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { +// match &self.stage { +// MemStage::SeekStart(_) => { +// let locked = chain_poll!(pin!(self.data.clone().lock_owned()).poll(cx)); +// self.stage = MemStage::Seeking(locked, self.stage.take_seek_start()); +// cx.waker().wake_by_ref(); +// } +// MemStage::Seeking(ref locked, ref position) => { +// let size = locked.len() as i64; +// let new_position = match position { +// SeekFrom::Start(x) => (*x).try_into().unwrap_or_default(), +// SeekFrom::End(x) => size.saturating_sub(*x), +// SeekFrom::Current(x) => (self.cursor as i64).saturating_add(*x), +// }; +// if new_position < 0 { +// return Poll::Ready(Err(io::Error::new( +// io::ErrorKind::InvalidInput, +// "invalid seek position", +// ))); +// } +// if new_position > size { +// return Poll::Ready(Err(io::Error::new( +// io::ErrorKind::UnexpectedEof, +// "mem block out of bound", +// ))); +// } +// self.cursor = new_position as usize; +// return Poll::Ready(Ok(self.cursor as u64)); +// } +// _ => { +// return Poll::Ready(Ok(self.cursor as u64)); +// } +// } +// Poll::Pending +// } +// } - use super::*; - #[tokio::test] - async fn normal_read() { - let data = b"hello world".to_vec(); - let mut block = MemBlock::new(data); - let mut buf = [0_u8; 11]; - block.read_exact(&mut buf).await.unwrap(); +// impl AsyncWrite for MemBlock { +// fn poll_write( +// mut self: Pin<&mut Self>, +// cx: &mut Context<'_>, +// buf: &[u8], +// ) -> Poll> { +// self.write_buffer.extend_from_slice(&buf); +// if self.write_buffer.len() >= MEMBLOCK_BLOCKSIZE { +// report_poll!(chain_poll!(self.as_mut().poll_flush(cx)), self.stage); +// } +// Poll::Ready(Ok(buf.len())) +// } - assert_eq!(buf, *b"hello world"); - } - #[tokio::test] - async fn end_of_file_read() { - let mut block = MemBlock::new(b"1234".to_vec()); - let mut buf = Vec::new(); - block.read_to_end(&mut buf).await.unwrap(); +// fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { +// let mut locked = chain_poll!(pin!(self.data.clone().lock_owned()).poll(cx)); +// locked.extend_from_slice(&self.write_buffer); +// self.write_buffer.clear(); +// Poll::Ready(Ok(())) +// } - assert_eq!(&*buf, b"1234"); - } - #[tokio::test] - async fn start_seek() { - let mut block = MemBlock::new(b"111hello world1111".to_vec()); - block.seek(SeekFrom::Start(3)).await.unwrap(); +// fn poll_shutdown( +// mut self: Pin<&mut Self>, +// cx: &mut Context<'_>, +// ) -> Poll> { +// self.as_mut().poll_flush(cx) +// } +// } - let mut buf = [0_u8; 11]; - block.read_exact(&mut buf).await.unwrap(); +// #[cfg(test)] +// mod test { +// use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; - assert_eq!(buf, *b"hello world"); - } - #[tokio::test] - async fn end_seek() { - let mut block = MemBlock::new(b"111hello world1111".to_vec()); - block.seek(SeekFrom::End(15)).await.unwrap(); +// use super::*; +// #[tokio::test] +// async fn normal_read() { +// let data = b"hello world".to_vec(); +// let mut block = MemBlock::new(data); +// let mut buf = [0_u8; 11]; +// block.read_exact(&mut buf).await.unwrap(); - let mut buf = [0_u8; 11]; - block.read_exact(&mut buf).await.unwrap(); +// assert_eq!(buf, *b"hello world"); +// } +// #[tokio::test] +// async fn end_of_file_read() { +// let mut block = MemBlock::new(b"1234".to_vec()); +// let mut buf = Vec::new(); +// block.read_to_end(&mut buf).await.unwrap(); - assert_eq!(buf, *b"hello world"); - } - #[tokio::test] - async fn rel_seek() { - let mut block = MemBlock::new(b"111hello world1111".to_vec()); - for _ in 0..3 { - block.seek(SeekFrom::Current(1)).await.unwrap(); - } +// assert_eq!(&*buf, b"1234"); +// } +// #[tokio::test] +// async fn start_seek() { +// let mut block = MemBlock::new(b"111hello world1111".to_vec()); +// block.seek(SeekFrom::Start(3)).await.unwrap(); - let mut buf = [0_u8; 11]; - block.read_exact(&mut buf).await.unwrap(); +// let mut buf = [0_u8; 11]; +// block.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, *b"hello world"); - } - #[tokio::test] - async fn normal_write() { - let mut block = MemBlock::default(); - block.write_all(b"hello").await.unwrap(); - block.write_all(b" ").await.unwrap(); - block.write_all(b"world").await.unwrap(); +// assert_eq!(buf, *b"hello world"); +// } +// #[tokio::test] +// async fn end_seek() { +// let mut block = MemBlock::new(b"111hello world1111".to_vec()); +// block.seek(SeekFrom::End(15)).await.unwrap(); - assert!(block.read_u8().await.is_err()); +// let mut buf = [0_u8; 11]; +// block.read_exact(&mut buf).await.unwrap(); - block.flush().await.unwrap(); +// assert_eq!(buf, *b"hello world"); +// } +// #[tokio::test] +// async fn rel_seek() { +// let mut block = MemBlock::new(b"111hello world1111".to_vec()); +// for _ in 0..3 { +// block.seek(SeekFrom::Current(1)).await.unwrap(); +// } - let mut buf = [0_u8; 11]; - block.read_exact(&mut buf).await.unwrap(); +// let mut buf = [0_u8; 11]; +// block.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, *b"hello world"); - } - #[tokio::test] - async fn multi_read() { - let block = MemBlock::new(b"hello world".to_vec()); +// assert_eq!(buf, *b"hello world"); +// } +// #[tokio::test] +// async fn normal_write() { +// let mut block = MemBlock::default(); +// block.write_all(b"hello").await.unwrap(); +// block.write_all(b" ").await.unwrap(); +// block.write_all(b"world").await.unwrap(); - for _ in 0..3000 { - let mut block = block.clone(); - tokio::spawn(async move { - let mut buf = [0_u8; 11]; - block.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, *b"hello world"); - }); - } - } - #[tokio::test] - #[should_panic] - async fn test_take_read() { - let block = MemBlock::new(b"hello world".to_vec()); - let mut buffer = [0; 5]; +// assert!(block.read_u8().await.is_err()); - // read at most five bytes - let mut handle = block.take(5); - handle.read_exact(&mut buffer).await.unwrap(); - assert_eq!(buffer, *b"hello"); +// block.flush().await.unwrap(); - // read the rest - let mut buffer = [0; 6]; - handle.read_exact(&mut buffer).await.unwrap(); - assert_eq!(buffer, *b" world"); - } - #[tokio::test] - async fn test_take_short_read() { - let block = MemBlock::new(b"hello ".to_vec()); - let mut buffer = Vec::new(); +// let mut buf = [0_u8; 11]; +// block.read_exact(&mut buf).await.unwrap(); - // read at most five bytes - let mut handle = block.take(100); - handle.read_to_end(&mut buffer).await.unwrap(); - assert_eq!(buffer, b"hello "); - } -} +// assert_eq!(buf, *b"hello world"); +// } +// #[tokio::test] +// async fn multi_read() { +// let block = MemBlock::new(b"hello world".to_vec()); + +// for _ in 0..3000 { +// let mut block = block.clone(); +// tokio::spawn(async move { +// let mut buf = [0_u8; 11]; +// block.read_exact(&mut buf).await.unwrap(); +// assert_eq!(buf, *b"hello world"); +// }); +// } +// } +// #[tokio::test] +// #[should_panic] +// async fn test_take_read() { +// let block = MemBlock::new(b"hello world".to_vec()); +// let mut buffer = [0; 5]; + +// // read at most five bytes +// let mut handle = block.take(5); +// handle.read_exact(&mut buffer).await.unwrap(); +// assert_eq!(buffer, *b"hello"); + +// // read the rest +// let mut buffer = [0; 6]; +// handle.read_exact(&mut buffer).await.unwrap(); +// assert_eq!(buffer, *b" world"); +// } +// #[tokio::test] +// async fn test_take_short_read() { +// let block = MemBlock::new(b"hello ".to_vec()); +// let mut buffer = Vec::new(); + +// // read at most five bytes +// let mut handle = block.take(100); +// handle.read_to_end(&mut buffer).await.unwrap(); +// assert_eq!(buffer, b"hello "); +// } +// } diff --git a/judger/src/filesystem/entry/tar.rs b/judger/src/filesystem/entry/tar.rs index 14250ec..0be0b09 100644 --- a/judger/src/filesystem/entry/tar.rs +++ b/judger/src/filesystem/entry/tar.rs @@ -18,7 +18,7 @@ use tokio::{ sync::Mutex, }; -use crate::filesystem::adj::{to_internal_path, AdjTable, DeepClone}; +use crate::filesystem::table::{to_internal_path, AdjTable}; use super::{ro::TarBlock, Entry}; @@ -26,12 +26,12 @@ pub struct TarTree(AdjTable>) where F: AsyncRead + AsyncSeek + Unpin + Send + 'static; -impl DeepClone for TarTree +impl Clone for TarTree where F: AsyncRead + AsyncSeek + Unpin + Send + 'static, { - async fn deep_clone(&self) -> Self { - Self(self.0.deep_clone().await) + fn clone(&self) -> Self { + Self(self.0.clone()) } } @@ -80,11 +80,7 @@ where EntryType::Regular | EntryType::Continuous => { let start = entry.raw_file_position(); let size = entry.size(); - Entry::TarFile(Arc::new(Mutex::new(TarBlock::new( - file.clone(), - start, - size, - )))) + Entry::TarFile(TarBlock::new(file.clone(), start, size as u32)) } EntryType::Symlink => Entry::SymLink(OsString::from_vec( entry.link_name_bytes().unwrap().into_owned(), diff --git a/judger/src/filesystem/entry/wrapper.rs b/judger/src/filesystem/entry/wrapper.rs deleted file mode 100644 index 7aa590b..0000000 --- a/judger/src/filesystem/entry/wrapper.rs +++ /dev/null @@ -1,45 +0,0 @@ -use std::io::SeekFrom; - -use bytes::Bytes; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; - -pub struct FuseRead<'a, W>(pub &'a mut W) -where - W: AsyncRead + AsyncSeek + Clone + Unpin; - -impl<'a, W> FuseRead<'a, W> -where - W: AsyncRead + AsyncSeek + Clone + Unpin, -{ - pub async fn read(&mut self, offset: u64, size: u32) -> std::io::Result { - let mut buf = Vec::with_capacity(size as usize); - self.0.seek(SeekFrom::Start(offset)).await?; - - self.0 - .clone() - .take(size as u64) - .read_to_end(&mut buf) - .await?; - - self.0.seek(SeekFrom::Current(buf.len() as i64)).await?; - - Ok(buf.try_into().unwrap()) - } -} - -pub struct FuseWrite<'a, W>(pub &'a mut W) -where - W: AsyncWrite + AsyncSeek + Clone + Unpin; - -impl<'a, W> FuseWrite<'a, W> -where - W: AsyncWrite + AsyncSeek + Clone + Unpin, -{ - pub async fn write(&mut self, offset: u64, data: &[u8]) -> std::io::Result { - assert!(data.len() < (u32::MAX - 1) as usize); - self.0.seek(SeekFrom::Start(offset)).await?; - - self.0.write_all(data).await?; - Ok(data.len() as u32) - } -} diff --git a/judger/src/filesystem/macro_.rs b/judger/src/filesystem/macro_.rs deleted file mode 100644 index 953391a..0000000 --- a/judger/src/filesystem/macro_.rs +++ /dev/null @@ -1,25 +0,0 @@ -macro_rules! chain_poll { - ($poll:expr) => {{ - let poll_ = $poll; - if poll_.is_pending() { - return Poll::Pending; - } - match poll_ { - Poll::Ready(x) => x, - Poll::Pending => unreachable!(), - } - }}; -} -macro_rules! report_poll { - ($ans:expr) => {{ - if let Err(err) = $ans { - return Poll::Ready(Err(err)); - } - match $ans { - Ok(x) => x, - Err(_) => unreachable!(), - } - }}; -} - -pub(crate) use {chain_poll, report_poll}; diff --git a/judger/src/filesystem/mod.rs b/judger/src/filesystem/mod.rs index bf32947..a2dcc3b 100644 --- a/judger/src/filesystem/mod.rs +++ b/judger/src/filesystem/mod.rs @@ -1,10 +1,9 @@ //! Filesystem module that is mountable(actuall mount and //! is accessible for user in this operation system) mod adapter; -mod adj; +mod table; mod entry; mod error; -mod macro_; mod resource; pub use entry::prelude::*; diff --git a/judger/src/filesystem/resource.rs b/judger/src/filesystem/resource.rs index d19b88e..ff94339 100644 --- a/judger/src/filesystem/resource.rs +++ b/judger/src/filesystem/resource.rs @@ -8,7 +8,7 @@ impl Resource { } pub fn comsume(&self, size: u32) -> Option<()> { let a = self.0.fetch_sub(size as u64, Ordering::AcqRel); - if (a | (1 << 63)) != 0 { + if (a & (1 << 63)) != 0 { None } else { Some(()) diff --git a/judger/src/filesystem/adj.rs b/judger/src/filesystem/table.rs similarity index 92% rename from judger/src/filesystem/adj.rs rename to judger/src/filesystem/table.rs index d2c3957..8d4700d 100644 --- a/judger/src/filesystem/adj.rs +++ b/judger/src/filesystem/table.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{BTreeMap}, ffi::{OsStr, OsString}, path::{Component, Path}, }; @@ -20,10 +20,11 @@ pub trait DeepClone { async fn deep_clone(&self) -> Self; } +#[derive(Clone)] struct Node { parent_idx: usize, value: V, - children: HashMap, + children: BTreeMap, // FIXME: use BtreeMap } impl DeepClone for Node { @@ -36,20 +37,11 @@ impl DeepClone for Node { } } +#[derive(Clone)] pub struct AdjTable { by_id: Vec>, } -impl DeepClone for AdjTable { - async fn deep_clone(&self) -> Self { - let mut by_id = Vec::with_capacity(self.by_id.len()); - for node in &self.by_id { - by_id.push(node.deep_clone().await); - } - Self { by_id } - } -} - impl AdjTable { pub fn new() -> Self { Self { by_id: vec![] } @@ -59,7 +51,7 @@ impl AdjTable { self.by_id.push(Node { parent_idx: 0, value, - children: HashMap::new(), + children: BTreeMap::new(), }); NodeWrapperMut { table: self, idx } } @@ -121,7 +113,7 @@ impl AdjTable { self.by_id.push(Node { parent_idx: idx, value: default_value(), - children: HashMap::new(), + children: BTreeMap::new(), }); self.by_id[idx].children.insert(name, new_idx); } @@ -150,7 +142,7 @@ impl AdjTable { self.by_id.push(Node { parent_idx: idx, value: default_value(), - children: HashMap::new(), + children: BTreeMap::new(), }); self.by_id[idx].children.insert(seg.to_os_string(), new_idx); idx = new_idx; @@ -184,10 +176,6 @@ impl<'a, V> NodeWrapper<'a, V> { }) } pub fn children(self) -> impl Iterator + 'a { - log::info!( - "children length: {}", - self.table.by_id[self.idx].children.len() - ); self.table.by_id[self.idx] .children .iter() @@ -231,7 +219,7 @@ impl<'a, V> NodeWrapperMut<'a, V> { self.table.by_id.push(Node { parent_idx: self.idx, value, - children: HashMap::new(), + children: BTreeMap::new(), }); self.table.by_id[self.idx].children.insert(name, idx); NodeWrapperMut { @@ -263,7 +251,7 @@ impl<'a, V> NodeWrapperMut<'a, V> { self.table.by_id[self.idx] .children .iter() - .map(|(_, &idx)| idx) + .map(|(_, &idx)| idx + ID_MIN) } } diff --git a/judger/test/helloworld.txt b/judger/test/helloworld.txt new file mode 100644 index 0000000..71d7efc --- /dev/null +++ b/judger/test/helloworld.txt @@ -0,0 +1 @@ +111hello world111 \ No newline at end of file