Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add write operation #42

Merged
merged 13 commits into from
Feb 8, 2024
54 changes: 54 additions & 0 deletions tokio-epoll-uring/src/ops/fsync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::os::fd::AsRawFd;

use uring_common::{
io_fd::IoFd,
io_uring::{self},
};

use crate::system::submission::op_fut::Op;

pub struct FsyncOp<F>
where
F: IoFd + Send,
{
pub(crate) file: F,
pub(crate) flags: io_uring::types::FsyncFlags,
}

impl<F> crate::sealed::Sealed for FsyncOp<F> where F: IoFd + Send {}

impl<F> Op for FsyncOp<F>
where
F: IoFd + Send,
{
type Resources = F;
type Success = ();
type Error = std::io::Error;

fn make_sqe(&mut self) -> io_uring::squeue::Entry {
io_uring::opcode::Fsync::new(io_uring::types::Fd(
// SAFETY: we hold `F` in self, and if `self` is dropped, we hand the fd to the
// `System` to keep it live until the operation completes.
#[allow(unused_unsafe)]
unsafe {
self.file.as_fd().as_raw_fd()
},
))
.flags(self.flags)
.build()
}

fn on_failed_submission(self) -> Self::Resources {
self.file
}

fn on_op_completion(self, res: i32) -> (Self::Resources, Result<Self::Success, Self::Error>) {
// https://man.archlinux.org/man/extra/liburing/io_uring_prep_fsync.3.en
let res = if res < 0 {
Err(std::io::Error::from_raw_os_error(-res))
} else {
Ok(())
};
(self.file, res)
}
}
3 changes: 3 additions & 0 deletions tokio-epoll-uring/src/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
#[doc(inline)]
pub use crate::system::submission::op_fut::Op;

pub mod fsync;
pub mod nop;
pub mod open_at;
pub mod read;
pub mod statx;
pub mod write;
116 changes: 116 additions & 0 deletions tokio-epoll-uring/src/ops/statx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use crate::system::submission::op_fut::Op;
use crate::util::submitting_box::SubmittingBox;
use std::os::fd::AsRawFd;
use uring_common::libc;
pub use uring_common::libc::statx;
use uring_common::{
io_fd::IoFd,
io_uring::{self},
};

pub(crate) fn op<F>(
resources: Resources<F>,
) -> impl Op<Resources = Resources<F>, Success = (), Error = std::io::Error>
where
F: IoFd + Send,
{
match resources {
Resources::ByFileDescriptor { file, statxbuf } => StatxOp::ByFileDescriptor {
file,
statxbuf: SubmittingBox::new(statxbuf),
},
}
}

pub enum Resources<F>
where
F: IoFd + Send,
{
ByFileDescriptor {
file: F,
statxbuf: Box<uring_common::libc::statx>,
},
}

// See `https://man.archlinux.org/man/statx.2.en#Invoking_%3Cb%3Estatx%3C/b%3E():`
// to understand why there are different variants and why they're named the way they are.
enum StatxOp<F>
where
F: IoFd + Send,
{
ByFileDescriptor {
file: F,
statxbuf: SubmittingBox<uring_common::libc::statx>,
},
}

impl<F> crate::sealed::Sealed for StatxOp<F> where F: IoFd + Send {}

impl<F> Op for StatxOp<F>
where
F: IoFd + Send,
{
type Resources = Resources<F>;
type Success = ();
type Error = std::io::Error;

fn make_sqe(&mut self) -> io_uring::squeue::Entry {
// See https://man.archlinux.org/man/statx.2.en#Invoking_%3Cb%3Estatx%3C/b%3E():
match self {
StatxOp::ByFileDescriptor { file, statxbuf } => {
let fd = io_uring::types::Fd(
// SAFETY: we hold `F` in self, and if `self` is dropped, we hand the fd to the
// `System` to keep it live until the operation completes.
#[allow(unused_unsafe)]
unsafe {
file.as_fd().as_raw_fd()
},
);
// This is equivalent to what rust std 1.75 does if statx is supported
io_uring::opcode::Statx::new(
fd,
// SAFETY: static byte string is zero-terminated.
unsafe { std::ffi::CStr::from_bytes_with_nul_unchecked(b"\0").as_ptr() },
// Yes, this cast is what the io_uring / tokio-uring crates currently do as well.
// Don't understand why io_uring crate just doesn't take a `libc::statx` directly.
// https://github.com/tokio-rs/tokio-uring/blob/c4320fa2e7b146b28ad921ae25b552a0894c9697/src/io/statx.rs#L47-L61
statxbuf.start_submitting() as *mut uring_common::libc::statx
as *mut uring_common::io_uring::types::statx,
)
.flags(libc::AT_EMPTY_PATH | libc::AT_STATX_SYNC_AS_STAT)
.mask(uring_common::libc::STATX_ALL)
.build()
}
}
}

fn on_failed_submission(self) -> Self::Resources {
self.on_ownership_back_with_userspace()
}

fn on_op_completion(self, res: i32) -> (Self::Resources, Result<Self::Success, Self::Error>) {
// https://man.archlinux.org/man/io_uring_prep_statx.3.en
let res = if res < 0 {
Err(std::io::Error::from_raw_os_error(-res))
} else {
Ok(())
};
(self.on_ownership_back_with_userspace(), res)
}
}

impl<F> StatxOp<F>
where
F: IoFd + Send,
{
fn on_ownership_back_with_userspace(self) -> Resources<F> {
match self {
StatxOp::ByFileDescriptor { file, statxbuf } => {
// SAFETY: the `System` guarantees that when it calls us here,
// ownership of the resources is with us.
let statxbuf = unsafe { statxbuf.ownership_back_in_userspace() };
Resources::ByFileDescriptor { file, statxbuf }
}
}
}
}
63 changes: 63 additions & 0 deletions tokio-epoll-uring/src/ops/write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::os::fd::AsRawFd;

use uring_common::{buf::BoundedBuf, io_fd::IoFd, io_uring};

use crate::system::submission::op_fut::Op;

pub struct WriteOp<F, B>
where
F: IoFd + Send,
B: BoundedBuf + Send,
{
pub(crate) file: F,
pub(crate) offset: u64,
pub(crate) buf: B,
}

impl<F, B> crate::sealed::Sealed for WriteOp<F, B>
where
F: IoFd + Send,
B: BoundedBuf + Send,
{
}

impl<F, B> Op for WriteOp<F, B>
where
F: IoFd + Send,
B: BoundedBuf + Send,
{
type Resources = (F, B);
type Success = usize;
type Error = std::io::Error;

fn make_sqe(&mut self) -> io_uring::squeue::Entry {
io_uring::opcode::Write::new(
io_uring::types::Fd(
// SAFETY: we hold `F` in self, and if `self` is dropped, we hand the fd to the
// `System` to keep it live until the operation completes.
#[allow(unused_unsafe)]
unsafe {
self.file.as_fd().as_raw_fd()
},
),
self.buf.stable_ptr(),
self.buf.bytes_init() as _,
)
.offset(self.offset)
.build()
}

fn on_failed_submission(self) -> Self::Resources {
(self.file, self.buf)
}

fn on_op_completion(self, res: i32) -> (Self::Resources, Result<Self::Success, Self::Error>) {
// https://man.archlinux.org/man/extra/liburing/io_uring_prep_write.3.en
let res = if res < 0 {
Err(std::io::Error::from_raw_os_error(-res))
} else {
Ok(res as usize)
};
((self.file, self.buf), res)
}
}
82 changes: 80 additions & 2 deletions tokio-epoll-uring/src/system/lifecycle/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

use futures::FutureExt;
use std::{os::fd::OwnedFd, path::Path, task::ready};
use uring_common::{buf::BoundedBufMut, io_fd::IoFd};
use uring_common::{
buf::{BoundedBuf, BoundedBufMut},
io_fd::IoFd,
};

use crate::{
ops::{open_at::OpenAtOp, read::ReadOp},
ops::{fsync::FsyncOp, open_at::OpenAtOp, read::ReadOp, statx, write::WriteOp},
system::submission::{op_fut::execute_op, SubmitSide},
};

Expand Down Expand Up @@ -141,4 +144,79 @@ impl crate::SystemHandle {
res
})
}

pub async fn fsync<F: IoFd + Send>(
&self,
file: F,
) -> (
F,
Result<(), crate::system::submission::op_fut::Error<std::io::Error>>,
) {
let op = FsyncOp {
file,
flags: uring_common::io_uring::types::FsyncFlags::empty(),
};
let inner = self.inner.as_ref().unwrap();
execute_op(op, inner.submit_side.weak(), None).await
}

pub async fn fdatasync<F: IoFd + Send>(
&self,
file: F,
) -> (
F,
Result<(), crate::system::submission::op_fut::Error<std::io::Error>>,
) {
let op = FsyncOp {
file,
flags: uring_common::io_uring::types::FsyncFlags::DATASYNC,
};
let inner = self.inner.as_ref().unwrap();
execute_op(op, inner.submit_side.weak(), None).await
}

pub async fn statx<F: IoFd + Send>(
&self,
file: F,
) -> (
F,
Result<
Box<uring_common::libc::statx>,
crate::system::submission::op_fut::Error<std::io::Error>,
>,
) {
// TODO: avoid the allocation, or optimize using a slab cacke?
let buf: Box<uring_common::libc::statx> = Box::new(
// TODO replace with Box<MaybeUninit>, https://github.com/rust-lang/rust/issues/63291
// SAFETY: we only use the memory if the fstat succeeds, should be using MaybeUninit here.
unsafe { std::mem::zeroed() },
);
let op = statx::op(statx::Resources::ByFileDescriptor {
file,
statxbuf: buf,
});
let inner = self.inner.as_ref().unwrap();
let (resources, result) = execute_op(op, inner.submit_side.weak(), None).await;
let crate::ops::statx::Resources::ByFileDescriptor { file, statxbuf } = resources;
match result {
Ok(()) => (file, Ok(statxbuf)),
Err(e) => (file, Err(e)),
}
}

pub fn write<F: IoFd + Send, B: BoundedBuf + Send>(
&self,
file: F,
offset: u64,
buf: B,
) -> impl std::future::Future<
Output = (
(F, B),
Result<usize, crate::system::submission::op_fut::Error<std::io::Error>>,
),
> {
let op = WriteOp { file, offset, buf };
let inner = self.inner.as_ref().unwrap();
execute_op(op, inner.submit_side.weak(), None)
}
}
1 change: 1 addition & 0 deletions tokio-epoll-uring/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub(crate) mod oneshot_nonconsuming;
pub(crate) mod submitting_box;
Loading
Loading