Skip to content

Commit

Permalink
refactor(Judger): 🔥 use direct sync function to produce future
Browse files Browse the repository at this point in the history
  • Loading branch information
Eason0729 committed May 10, 2024
1 parent ec73c1a commit a84ba27
Show file tree
Hide file tree
Showing 16 changed files with 559 additions and 680 deletions.
2 changes: 2 additions & 0 deletions judger/src/filesystem/adapter/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub enum FuseError {

impl From<FuseError> for fuse3::Errno {
fn from(value: FuseError) -> Self {
#[cfg(test)]
log::warn!("FUSE driver return result: {}", value);
match value {
FuseError::IsDir => libc::EISDIR,
FuseError::NotDir => libc::ENOTDIR,
Expand Down
123 changes: 85 additions & 38 deletions judger/src/filesystem/adapter/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::{ffi::OsStr, num::NonZeroU32, path::Path, sync::Arc};
use futures_core::Future;
use spin::Mutex;
use tokio::io::{AsyncRead, AsyncSeek};
use tokio::sync::Mutex as AsyncMutex;

use crate::{
filesystem::{resource::Resource, TarTree, BLOCKSIZE},
filesystem::{resource::Resource, Entry, TarTree, BLOCKSIZE},
semaphore::Permit,
};

Expand All @@ -20,7 +21,7 @@ pub struct Filesystem<F>
where
F: AsyncRead + AsyncSeek + Unpin + Send + 'static,
{
handle_table: HandleTable<usize>,
handle_table: HandleTable<AsyncMutex<Entry<F>>>,
tree: Mutex<TarTree<F>>,
resource: Arc<Resource>,
_permit: Permit,
Expand Down Expand Up @@ -80,17 +81,9 @@ where
async move {
let tree = self.tree.lock();
let node = tree.get(parent as usize).ok_or(FuseError::InvaildIno)?;
log::info!(
"parent name: {}",
String::from_utf8_lossy(node.get_name().as_encoded_bytes())
);
log::info!(
"lookup name: {}",
String::from_utf8_lossy(name.as_encoded_bytes())
);
let entry = node.get_by_component(name).ok_or(FuseError::InvalidPath)?;
// FIXME: unsure about the inode
Ok(reply_entry(req, entry.get_value(), parent))
Ok(reply_entry(&req, entry.get_value(), parent))
}
}
fn forget(&self, _: Request, inode: u64, _: u64) -> impl Future<Output = ()> + Send {
Expand Down Expand Up @@ -127,7 +120,9 @@ where
if node.get_value().kind() != FileType::Directory {
return Err(FuseError::NotDir.into());
}
let fh = self.handle_table.add(node.get_id());
let fh = self
.handle_table
.add(AsyncMutex::new(node.get_value().clone()));
Ok(ReplyOpen { fh, flags: 0 })
}
}
Expand All @@ -140,7 +135,12 @@ where
async move {
let tree = self.tree.lock();
let entry = tree.get(inode as usize).ok_or(FuseError::InvaildIno)?;
let fh = self.handle_table.add(entry.get_id());
if entry.get_value().kind() == FileType::Directory {
return Err(FuseError::IsDir.into());
}
let fh = self
.handle_table
.add(AsyncMutex::new(entry.get_value().clone()));
Ok(ReplyOpen { fh, flags: 0 })
}
}
Expand Down Expand Up @@ -215,12 +215,14 @@ where

let entries = vec![
Ok(dir_entry_plus(
&req,
OsStr::new(".").to_os_string(),
node.get_value(),
node.get_id() as u64,
1,
)),
Ok(dir_entry_plus(
&req,
OsStr::new("..").to_os_string(),
parent_node.get_value(),
parent_node.get_id() as u64,
Expand All @@ -234,6 +236,7 @@ where
.map(|(offset, inode)| {
let node = tree.get(inode).unwrap();
dir_entry_plus(
&req,
node.get_name().to_os_string(),
node.get_value(),
inode as u64,
Expand All @@ -259,20 +262,13 @@ where
size: u32,
) -> impl Future<Output = FuseResult<ReplyData>> + Send {
async move {
let handle = {
let tree = self.tree.lock();
let entry = self.handle_table.get(fh).ok_or(FuseError::HandleNotFound)?;
let node = tree.get(entry).ok_or(FuseError::InvaildIno)?;
let entry = node.get_value();
entry.get_read_handle()
}
.ok_or(FuseError::IsDir)?;

handle
let entry = self.handle_table.get(fh).ok_or(FuseError::HandleNotFound)?;
let mut lock = entry.lock().await;
Ok(lock
.read(offset, size)
.await
.map(|data| ReplyData { data })
.map_err(Into::into)
.ok_or(Into::<Errno>::into(FuseError::IsDir))?
.map(|data| ReplyData { data })?)
}
}
fn write(
Expand All @@ -286,17 +282,16 @@ where
flags: u32,
) -> impl Future<Output = FuseResult<ReplyWrite>> + Send {
async move {
let handle = {
let mut tree = self.tree.lock();
let entry = self.handle_table.get(fh).ok_or(FuseError::HandleNotFound)?;
let mut node = tree.get_mut(entry).ok_or(FuseError::InvaildIno)?;
let entry = node.get_value();
entry.get_write_handle()
}
.ok_or(FuseError::IsDir)?;
let resource = self.resource.clone();
let written = handle.write(offset, data, &resource).await?;
Ok(ReplyWrite { written })
let entry = self
.handle_table
.get(fh)
.ok_or(FuseError::HandleNotFound)
.unwrap();

Ok(Entry::write(entry, offset, data, &self.resource)
.await
.ok_or_else(|| Into::<Errno>::into(FuseError::IsDir))?
.map(|written| ReplyWrite { written })?)
}
}
fn access(
Expand All @@ -305,6 +300,7 @@ where
inode: u64,
mask: u32,
) -> impl Future<Output = FuseResult<()>> + Send {
// FIXME: only allow current user to access
async { Ok(()) }
}
fn fsync(
Expand Down Expand Up @@ -346,7 +342,6 @@ where
}
}
}

fn interrupt(&self, req: Request, unique: u64) -> impl Future<Output = FuseResult<()>> + Send {
async { Ok(()) }
}
Expand All @@ -361,7 +356,59 @@ where
let tree = self.tree.lock();
let entry = tree.get(inode as usize).ok_or(FuseError::InvaildIno)?;
// FIXME: unsure about the inode
Ok(reply_attr(entry.get_value(), inode))
Ok(reply_attr(&req, entry.get_value(), inode))
}
}
fn setattr(
&self,
req: Request,
inode: Inode,
fh: Option<u64>,
set_attr: SetAttr,
) -> impl Future<Output = FuseResult<ReplyAttr>> + Send {
async move {
let tree = self.tree.lock();
let node = tree.get(inode as usize).ok_or(FuseError::InvaildIno)?;
Ok(reply_attr(&req, node.get_value(), inode))
}
}
fn create(
&self,
req: Request,
parent: u64,
name: &OsStr,
mode: u32,
flags: u32,
) -> impl Future<Output = FuseResult<ReplyCreated>> + Send {
async move {
let mut tree = self.tree.lock();
let mut parent_node = tree.get_mut(parent as usize).ok_or(FuseError::InvaildIno)?;
if parent_node.get_value().kind() != FileType::Directory {
return Err(FuseError::NotDir.into());
}
let mut node = parent_node.insert(name.to_os_string(), Entry::new_file());

// FIXME: append mode
Ok(reply_created(&req, node.get_value()))
}
}
fn mkdir(
&self,
req: Request,
parent: u64,
name: &OsStr,
mode: u32,
umask: u32,
) -> impl Future<Output = FuseResult<ReplyEntry>> + Send {
async move {
let mut tree = self.tree.lock();
let mut parent_node = tree.get_mut(parent as usize).ok_or(FuseError::InvaildIno)?;
if parent_node.get_value().kind() != FileType::Directory {
return Err(FuseError::NotDir.into());
}
let mut node = parent_node.insert(name.to_os_string(), Entry::Directory);
let ino = node.get_id() as u64;
Ok(reply_entry(&req, node.get_value(), ino))
}
}
}
25 changes: 14 additions & 11 deletions judger/src/filesystem/adapter/handle.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,36 @@
use std::{collections::BTreeMap, sync::atomic::AtomicU64};
use std::{
collections::BTreeMap,
sync::{atomic::AtomicU64, Arc},
};

use spin::RwLock;
use spin::Mutex;

pub struct HandleTable<E: Clone> {
pub struct HandleTable<E> {
handle_generator: AtomicU64,
table: RwLock<BTreeMap<u64, E>>,
table: Mutex<BTreeMap<u64, Arc<E>>>,
}

impl<E: Clone> HandleTable<E> {
impl<E> HandleTable<E> {
pub fn new() -> Self {
Self {
handle_generator: AtomicU64::new(1),
table: RwLock::new(BTreeMap::new()),
table: Mutex::new(BTreeMap::new()),
}
}
pub fn add(&self, entry: E) -> u64 {
let handle = self
.handle_generator
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
log::trace!("allocate handle: {}", handle);
self.table.write().insert(handle, entry);
self.table.lock().insert(handle, Arc::new(entry));
handle
}
pub fn get(&self, handle: u64) -> Option<E> {
pub fn get(&self, handle: u64) -> Option<Arc<E>> {
log::debug!("get handle: {}", handle);
self.table.read().get(&handle).cloned()
self.table.lock().get(&handle).cloned()
}
pub fn remove(&self, handle: u64) -> Option<E> {
pub fn remove(&self, handle: u64) -> Option<Arc<E>> {
log::trace!("deallocate handle: {}", handle);
self.table.write().remove(&handle)
self.table.lock().remove(&handle)
}
}
2 changes: 1 addition & 1 deletion judger/src/filesystem/adapter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mod test {
.unwrap(),
)
.await;
let mut mount_handle = filesystem.mount("./.temp/1").await.unwrap();
let mut mount_handle = filesystem.mount("./.temp/11").await.unwrap();
let handle = &mut mount_handle;

tokio::select! {
Expand Down
40 changes: 28 additions & 12 deletions judger/src/filesystem/adapter/reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use tokio::io::{AsyncRead, AsyncSeek};

use crate::filesystem::{Entry, BLOCKSIZE};

const TTL: Duration = Duration::from_secs(1);

pub fn dir_entry_plus<F>(
req: &Request,
name: OsString,
entry: &Entry<F>,
inode: u64,
Expand All @@ -23,9 +26,9 @@ where
kind: entry.kind(),
name,
offset,
attr: file_attr(entry, inode),
entry_ttl: Duration::from_secs(30),
attr_ttl: Duration::from_secs(30),
attr: file_attr(req, entry, inode),
entry_ttl: TTL,
attr_ttl: TTL,
}
}

Expand All @@ -41,28 +44,28 @@ where
}
}

pub fn reply_attr<F>(entry: &Entry<F>, inode: u64) -> ReplyAttr
pub fn reply_attr<F>(req: &Request, entry: &Entry<F>, inode: u64) -> ReplyAttr
where
F: AsyncRead + AsyncSeek + Send + Unpin + 'static,
{
ReplyAttr {
ttl: Duration::from_secs(30),
attr: file_attr(&entry, inode),
ttl: TTL,
attr: file_attr(req, &entry, inode),
}
}

pub fn reply_entry<F>(request: Request, entry: &Entry<F>, inode: u64) -> ReplyEntry
pub fn reply_entry<F>(req: &Request, entry: &Entry<F>, inode: u64) -> ReplyEntry
where
F: AsyncRead + AsyncSeek + Send + Unpin + 'static,
{
ReplyEntry {
ttl: Duration::from_secs(30),
attr: file_attr(&entry, inode),
ttl: TTL,
attr: file_attr(req, &entry, inode),
generation: 0,
}
}

pub fn file_attr<F>(entry: &Entry<F>, inode: u64) -> FileAttr
pub fn file_attr<F>(req: &Request, entry: &Entry<F>, inode: u64) -> FileAttr
where
F: AsyncRead + AsyncSeek + Send + Unpin + 'static,
{
Expand All @@ -81,9 +84,22 @@ where
| libc::S_IRWXO
| libc::S_ISVTX) as u16,
nlink: 1,
uid: 0,
gid: 0,
uid: req.uid,
gid: req.gid,
rdev: 179 << 16 + 02,
blksize: BLOCKSIZE as u32,
}
}

pub fn reply_created<F>(req: &Request, entry: &Entry<F>) -> ReplyCreated
where
F: AsyncRead + AsyncSeek + Send + Unpin + 'static,
{
ReplyCreated {
ttl: TTL,
attr: file_attr(req, entry, 0),
generation: 0,
fh: 0,
flags: 0,
}
}
4 changes: 2 additions & 2 deletions judger/src/filesystem/adapter/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::{
};

use crate::{
filesystem::{adj::DeepClone, TarTree},
filesystem::{table::DeepClone, TarTree},
semaphore::Permit,
};

Expand All @@ -27,7 +27,7 @@ where
Self { tree }
}
pub async fn as_filesystem(&self, permit: Permit) -> Filesystem<F> {
Filesystem::new(self.tree.deep_clone().await, permit)
Filesystem::new(self.tree.clone(), permit)
}
}

Expand Down
Loading

0 comments on commit a84ba27

Please sign in to comment.