From abd0604755174844ac6e8e619e5aa06224b1b7fd Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 17 Jan 2024 17:29:34 +0000 Subject: [PATCH 01/11] add fsync / fdatasync operations --- tokio-epoll-uring/src/ops/fsync.rs | 54 +++++++++++++++++++ tokio-epoll-uring/src/ops/mod.rs | 1 + .../src/system/lifecycle/handle.rs | 32 ++++++++++- 3 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 tokio-epoll-uring/src/ops/fsync.rs diff --git a/tokio-epoll-uring/src/ops/fsync.rs b/tokio-epoll-uring/src/ops/fsync.rs new file mode 100644 index 0000000..42f96d5 --- /dev/null +++ b/tokio-epoll-uring/src/ops/fsync.rs @@ -0,0 +1,54 @@ +use std::os::fd::AsRawFd; + +use uring_common::{ + io_fd::IoFd, + io_uring::{self}, +}; + +use crate::system::submission::op_fut::Op; + +pub struct FsyncOp +where + F: IoFd + Send, +{ + pub(crate) file: F, + pub(crate) flags: io_uring::types::FsyncFlags, +} + +impl crate::sealed::Sealed for FsyncOp where F: IoFd + Send {} + +impl Op for FsyncOp +where + F: IoFd + Send, +{ + type Resources = F; + type Success = (); + type Error = std::io::Error; + + fn make_sqe(&mut self) -> io_uring::squeue::Entry { + io_uring::opcode::Fsync::new(io_uring::types::Fd( + // SAFETY: we hold `F` in self, and if `self` is dropped, we hand the fd to the + // `System` to keep it live until the operation completes. + #[allow(unused_unsafe)] + unsafe { + self.file.as_fd().as_raw_fd() + }, + )) + .flags(self.flags) + .build() + } + + fn on_failed_submission(self) -> Self::Resources { + self.file + } + + fn on_op_completion(self, res: i32) -> (Self::Resources, Result) { + // https://man.archlinux.org/man/extra/liburing/io_uring_prep_fsync.3.en + let res = if res < 0 { + Err(std::io::Error::from_raw_os_error(-res)) + } else { + Ok(()) + }; + (self.file, res) + } +} diff --git a/tokio-epoll-uring/src/ops/mod.rs b/tokio-epoll-uring/src/ops/mod.rs index 7e13eda..f874ef9 100644 --- a/tokio-epoll-uring/src/ops/mod.rs +++ b/tokio-epoll-uring/src/ops/mod.rs @@ -3,6 +3,7 @@ #[doc(inline)] pub use crate::system::submission::op_fut::Op; +pub mod fsync; pub mod nop; pub mod open_at; pub mod read; diff --git a/tokio-epoll-uring/src/system/lifecycle/handle.rs b/tokio-epoll-uring/src/system/lifecycle/handle.rs index 1b1c63c..97a2632 100644 --- a/tokio-epoll-uring/src/system/lifecycle/handle.rs +++ b/tokio-epoll-uring/src/system/lifecycle/handle.rs @@ -5,7 +5,7 @@ use std::{os::fd::OwnedFd, path::Path, task::ready}; use uring_common::{buf::BoundedBufMut, io_fd::IoFd}; use crate::{ - ops::{open_at::OpenAtOp, read::ReadOp}, + ops::{fsync::FsyncOp, open_at::OpenAtOp, read::ReadOp}, system::submission::{op_fut::execute_op, SubmitSide}, }; @@ -141,4 +141,34 @@ impl crate::SystemHandle { res }) } + + pub async fn fsync( + &self, + file: F, + ) -> ( + F, + Result<(), crate::system::submission::op_fut::Error>, + ) { + let op = FsyncOp { + file, + flags: uring_common::io_uring::types::FsyncFlags::empty(), + }; + let inner = self.inner.as_ref().unwrap(); + execute_op(op, inner.submit_side.weak(), None).await + } + + pub async fn fdatasync( + &self, + file: F, + ) -> ( + F, + Result<(), crate::system::submission::op_fut::Error>, + ) { + let op = FsyncOp { + file, + flags: uring_common::io_uring::types::FsyncFlags::DATASYNC, + }; + let inner = self.inner.as_ref().unwrap(); + execute_op(op, inner.submit_side.weak(), None).await + } } From 12d698f5ede66111c5d659e0d9ccedeba111dacc Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 19 Jan 2024 16:11:28 +0000 Subject: [PATCH 02/11] WIP: statx --- tokio-epoll-uring/src/ops/mod.rs | 1 + tokio-epoll-uring/src/ops/statx.rs | 131 ++++++++++++++++++ .../src/system/lifecycle/handle.rs | 13 +- uring-common/src/lib.rs | 2 + 4 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 tokio-epoll-uring/src/ops/statx.rs diff --git a/tokio-epoll-uring/src/ops/mod.rs b/tokio-epoll-uring/src/ops/mod.rs index f874ef9..a030801 100644 --- a/tokio-epoll-uring/src/ops/mod.rs +++ b/tokio-epoll-uring/src/ops/mod.rs @@ -7,3 +7,4 @@ pub mod fsync; pub mod nop; pub mod open_at; pub mod read; +pub mod statx; diff --git a/tokio-epoll-uring/src/ops/statx.rs b/tokio-epoll-uring/src/ops/statx.rs new file mode 100644 index 0000000..4f7ef98 --- /dev/null +++ b/tokio-epoll-uring/src/ops/statx.rs @@ -0,0 +1,131 @@ +use std::os::fd::AsRawFd; +use uring_common::libc; + +use uring_common::{ + io_fd::IoFd, + io_uring::{self}, +}; + +use crate::system::submission::op_fut::Op; + +// See `https://man.archlinux.org/man/statx.2.en#Invoking_%3Cb%3Estatx%3C/b%3E():` +// to understand why there are different variants and why they're named the way they are. +pub enum StatxOp +where + F: IoFd + Send, +{ + ByFileDescriptor { + file: F, + statxbuf: Submitting< + Box, + *mut uring_common::io_uring::types::statx, + >, + }, +} + +impl StatxOp +where + F: IoFd + Send, +{ + // Do the equivalent of fstat. + pub fn new_fstat(file: F, statxbuf: Box) -> StatxOp { + StatxOp::ByFileDescriptor { + file, + statxbuf: Submitting::No(statxbuf), + } + } +} + +pub enum Resources +where + F: IoFd + Send, +{ + ByFileDescriptor { + file: F, + statxbuf: Box, + }, +} + +// TODO: refine the `Op` trait so we encode this state in the typesystem +enum Submitting { + No(A), + Yes(B), +} + +/// SAFETY: we only needs this because we store the pointer while Submitting::Yes +unsafe impl Send for StatxOp where F: IoFd + Send {} + +impl crate::sealed::Sealed for StatxOp where F: IoFd + Send {} + +impl Op for StatxOp +where + F: IoFd + Send, +{ + type Resources = Resources; + type Success = (); + type Error = std::io::Error; + + fn make_sqe(&mut self) -> io_uring::squeue::Entry { + // See https://man.archlinux.org/man/statx.2.en#Invoking_%3Cb%3Estatx%3C/b%3E(): + match self { + StatxOp::ByFileDescriptor { file, statxbuf } => { + let fd = io_uring::types::Fd( + // SAFETY: we hold `F` in self, and if `self` is dropped, we hand the fd to the + // `System` to keep it live until the operation completes. + #[allow(unused_unsafe)] + unsafe { + file.as_fd().as_raw_fd() + }, + ); + // SAFETY: by Box::into_raw'ing the statxbuf box, the memory won't be re-used + // until we Box::from_raw it in `on_failed_submission` or `on_op_completion` + let statxbuf: *mut uring_common::io_uring::types::statx = match statxbuf { + Submitting::No(statxbuf_box) => Box::into_raw(statxbuf_box), + Submitting::Yes(statxbuf_ptr) => { + unreachable!("make_sqe is only called once") + } + }; + // This is equivalent to what rust std 1.75 does if statx is supported + io_uring::opcode::Statx::new(fd, b"\0" as *const _, statxbuf) + .flags(libc::AT_EMPTY_PATH | libc::AT_STATX_SYNC_AS_STAT) + .mask(uring_common::libc::STATX_ALL) + .build() + } + } + } + + fn on_failed_submission(self) -> Self::Resources { + self.on_ownership_back_with_userspace() + } + + fn on_op_completion(self, res: i32) -> (Self::Resources, Result) { + // https://man.archlinux.org/man/io_uring_prep_statx.3.en + let res = if res < 0 { + Err(std::io::Error::from_raw_os_error(-res)) + } else { + Ok(()) + }; + (self.on_ownership_back_with_userspace(), res) + } +} + +impl StatxOp +where + F: IoFd + Send, +{ + fn on_ownership_back_with_userspace(self) -> Resources { + match self { + StatxOp::ByFileDescriptor { file, statxbuf } => { + let statxbuf = match statxbuf { + Submitting::No(_) => unreachable!("only called after make_sqe"), + Submitting::Yes(statxbuf_ptr) => { + // SAFETY: the `System` guarantees that when it calls us here, + // ownership of the resources is with us. + unsafe { Box::from_raw(statxbuf_ptr) } + } + }; + Resources::ByFileDescriptor { file, statxbuf } + } + } + } +} diff --git a/tokio-epoll-uring/src/system/lifecycle/handle.rs b/tokio-epoll-uring/src/system/lifecycle/handle.rs index 97a2632..d2528cc 100644 --- a/tokio-epoll-uring/src/system/lifecycle/handle.rs +++ b/tokio-epoll-uring/src/system/lifecycle/handle.rs @@ -5,7 +5,7 @@ use std::{os::fd::OwnedFd, path::Path, task::ready}; use uring_common::{buf::BoundedBufMut, io_fd::IoFd}; use crate::{ - ops::{fsync::FsyncOp, open_at::OpenAtOp, read::ReadOp}, + ops::{fsync::FsyncOp, open_at::OpenAtOp, read::ReadOp, statx::StatxOp}, system::submission::{op_fut::execute_op, SubmitSide}, }; @@ -171,4 +171,15 @@ impl crate::SystemHandle { let inner = self.inner.as_ref().unwrap(); execute_op(op, inner.submit_side.weak(), None).await } + + pub async fn statx( + &self, + file: F, + ) -> ( + F, Result> + ) { + uring_common::io_uring::types::statx + let op = StatxOp::new_fstat(file, statxbuf) + + } } diff --git a/uring-common/src/lib.rs b/uring-common/src/lib.rs index 8153c52..a519f6e 100644 --- a/uring-common/src/lib.rs +++ b/uring-common/src/lib.rs @@ -7,3 +7,5 @@ pub mod io_fd; #[cfg(target_os = "linux")] pub use io_uring; +#[cfg(target_os = "linux")] +pub use libc; From dda54159aaed61bf27cb67c0f0ae459aa616899a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 5 Feb 2024 14:17:04 +0000 Subject: [PATCH 03/11] continue statx: not happy with some of the internal APIs but good enough --- tokio-epoll-uring/src/ops/statx.rs | 94 +++++++++++++------ .../src/system/lifecycle/handle.rs | 33 ++++++- 2 files changed, 91 insertions(+), 36 deletions(-) diff --git a/tokio-epoll-uring/src/ops/statx.rs b/tokio-epoll-uring/src/ops/statx.rs index 4f7ef98..336bcf2 100644 --- a/tokio-epoll-uring/src/ops/statx.rs +++ b/tokio-epoll-uring/src/ops/statx.rs @@ -1,3 +1,4 @@ +use core::panic; use std::os::fd::AsRawFd; use uring_common::libc; @@ -8,6 +9,8 @@ use uring_common::{ use crate::system::submission::op_fut::Op; +pub use uring_common::libc::statx; + // See `https://man.archlinux.org/man/statx.2.en#Invoking_%3Cb%3Estatx%3C/b%3E():` // to understand why there are different variants and why they're named the way they are. pub enum StatxOp @@ -16,10 +19,7 @@ where { ByFileDescriptor { file: F, - statxbuf: Submitting< - Box, - *mut uring_common::io_uring::types::statx, - >, + statxbuf: SubmittingBox, }, } @@ -28,28 +28,65 @@ where F: IoFd + Send, { // Do the equivalent of fstat. - pub fn new_fstat(file: F, statxbuf: Box) -> StatxOp { + pub fn new_fstat(file: F, statxbuf: Box) -> StatxOp { StatxOp::ByFileDescriptor { file, - statxbuf: Submitting::No(statxbuf), + statxbuf: SubmittingBox::NotSubmitting(statxbuf), } } } +#[non_exhaustive] pub enum Resources where F: IoFd + Send, { ByFileDescriptor { file: F, - statxbuf: Box, + statxbuf: Box, }, } -// TODO: refine the `Op` trait so we encode this state in the typesystem -enum Submitting { - No(A), - Yes(B), +// TODO: refine the `Op` trait so we encode this state in the typesystem as a typestate +pub enum SubmittingBox +where + A: 'static, +{ + NotSubmitting(Box), + Submitting(*mut A), + Undefined, +} + +impl SubmittingBox { + fn start_submitting(&mut self) -> &'static mut A { + match std::mem::replace(self, Self::Undefined) { + SubmittingBox::NotSubmitting(v) => { + let leaked = Box::leak(v); + *self = Self::Submitting(leaked as *mut _); + leaked + } + SubmittingBox::Submitting(_) => { + panic!("must not call this function more than once without ownership_back_in_userspace() inbetween") + } + Self::Undefined => { + panic!("implementation error; did we panic earlier in the ::Submitting case?") + } + } + } + + /// # Safety + /// + /// Callers must ensure that userspace, and in particular, _the caller_ has again exclusive ownership + /// over the memory. + unsafe fn ownership_back_in_userspace(mut self) -> Box { + match std::mem::replace(&mut self, SubmittingBox::Undefined) { + SubmittingBox::NotSubmitting(_) => { + panic!("must not call this function without prior call to start_submitting()") + } + SubmittingBox::Submitting(leaked) => Box::from_raw(leaked), + SubmittingBox::Undefined => todo!(), + } + } } /// SAFETY: we only needs this because we store the pointer while Submitting::Yes @@ -77,19 +114,19 @@ where file.as_fd().as_raw_fd() }, ); - // SAFETY: by Box::into_raw'ing the statxbuf box, the memory won't be re-used - // until we Box::from_raw it in `on_failed_submission` or `on_op_completion` - let statxbuf: *mut uring_common::io_uring::types::statx = match statxbuf { - Submitting::No(statxbuf_box) => Box::into_raw(statxbuf_box), - Submitting::Yes(statxbuf_ptr) => { - unreachable!("make_sqe is only called once") - } - }; // This is equivalent to what rust std 1.75 does if statx is supported - io_uring::opcode::Statx::new(fd, b"\0" as *const _, statxbuf) - .flags(libc::AT_EMPTY_PATH | libc::AT_STATX_SYNC_AS_STAT) - .mask(uring_common::libc::STATX_ALL) - .build() + io_uring::opcode::Statx::new( + fd, + b"\0" as *const _, + // Yes, this cast is what the io_uring / tokio-uring crates currently do as well. + // Don't understand why io_uring crate just doesn't take a `libc::statx` directly. + // https://github.com/tokio-rs/tokio-uring/blob/c4320fa2e7b146b28ad921ae25b552a0894c9697/src/io/statx.rs#L47-L61 + statxbuf.start_submitting() as *mut uring_common::libc::statx + as *mut uring_common::io_uring::types::statx, + ) + .flags(libc::AT_EMPTY_PATH | libc::AT_STATX_SYNC_AS_STAT) + .mask(uring_common::libc::STATX_ALL) + .build() } } } @@ -116,14 +153,9 @@ where fn on_ownership_back_with_userspace(self) -> Resources { match self { StatxOp::ByFileDescriptor { file, statxbuf } => { - let statxbuf = match statxbuf { - Submitting::No(_) => unreachable!("only called after make_sqe"), - Submitting::Yes(statxbuf_ptr) => { - // SAFETY: the `System` guarantees that when it calls us here, - // ownership of the resources is with us. - unsafe { Box::from_raw(statxbuf_ptr) } - } - }; + // SAFETY: the `System` guarantees that when it calls us here, + // ownership of the resources is with us. + let statxbuf = unsafe { statxbuf.ownership_back_in_userspace() }; Resources::ByFileDescriptor { file, statxbuf } } } diff --git a/tokio-epoll-uring/src/system/lifecycle/handle.rs b/tokio-epoll-uring/src/system/lifecycle/handle.rs index d2528cc..8913a32 100644 --- a/tokio-epoll-uring/src/system/lifecycle/handle.rs +++ b/tokio-epoll-uring/src/system/lifecycle/handle.rs @@ -5,7 +5,12 @@ use std::{os::fd::OwnedFd, path::Path, task::ready}; use uring_common::{buf::BoundedBufMut, io_fd::IoFd}; use crate::{ - ops::{fsync::FsyncOp, open_at::OpenAtOp, read::ReadOp, statx::StatxOp}, + ops::{ + fsync::FsyncOp, + open_at::OpenAtOp, + read::ReadOp, + statx::{StatxOp, SubmittingBox}, + }, system::submission::{op_fut::execute_op, SubmitSide}, }; @@ -176,10 +181,28 @@ impl crate::SystemHandle { &self, file: F, ) -> ( - F, Result> + F, + Result< + Box, + crate::system::submission::op_fut::Error, + >, ) { - uring_common::io_uring::types::statx - let op = StatxOp::new_fstat(file, statxbuf) - + // TODO: avoid the allocation? allow callers to provide their own buffer? + let buf: Box = Box::new( + // TODO replace with Box, https://github.com/rust-lang/rust/issues/63291 + // SAFETY: we only use the memory if the fstat succeeds, should be using MaybeUninit here. + unsafe { std::mem::zeroed() }, + ); + let op = StatxOp::ByFileDescriptor { + file, + statxbuf: SubmittingBox::NotSubmitting(buf), + }; + let inner = self.inner.as_ref().unwrap(); + let (resources, result) = execute_op(op, inner.submit_side.weak(), None).await; + let crate::ops::statx::Resources::ByFileDescriptor { file, statxbuf } = resources; + match result { + Ok(()) => (file, Ok(statxbuf)), + Err(e) => (file, Err(e)), + } } } From 0626d98bbe57a1431990bde92ab63459040b62c4 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 5 Feb 2024 16:37:27 +0000 Subject: [PATCH 04/11] clippy --- tokio-epoll-uring/src/ops/statx.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-epoll-uring/src/ops/statx.rs b/tokio-epoll-uring/src/ops/statx.rs index 336bcf2..ae5e986 100644 --- a/tokio-epoll-uring/src/ops/statx.rs +++ b/tokio-epoll-uring/src/ops/statx.rs @@ -117,7 +117,7 @@ where // This is equivalent to what rust std 1.75 does if statx is supported io_uring::opcode::Statx::new( fd, - b"\0" as *const _, + b"\0" as *const libc::c_char, // Yes, this cast is what the io_uring / tokio-uring crates currently do as well. // Don't understand why io_uring crate just doesn't take a `libc::statx` directly. // https://github.com/tokio-rs/tokio-uring/blob/c4320fa2e7b146b28ad921ae25b552a0894c9697/src/io/statx.rs#L47-L61 From 024f4be0cac13bc96c5d4207988923d2cb3b431f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 5 Feb 2024 19:30:13 +0000 Subject: [PATCH 05/11] for real this time --- tokio-epoll-uring/src/ops/statx.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tokio-epoll-uring/src/ops/statx.rs b/tokio-epoll-uring/src/ops/statx.rs index ae5e986..35d3f9f 100644 --- a/tokio-epoll-uring/src/ops/statx.rs +++ b/tokio-epoll-uring/src/ops/statx.rs @@ -117,7 +117,8 @@ where // This is equivalent to what rust std 1.75 does if statx is supported io_uring::opcode::Statx::new( fd, - b"\0" as *const libc::c_char, + // SAFETY: static byte string is zero-terminated. + unsafe { std::ffi::CStr::from_bytes_with_nul_unchecked(b"\0").as_ptr() }, // Yes, this cast is what the io_uring / tokio-uring crates currently do as well. // Don't understand why io_uring crate just doesn't take a `libc::statx` directly. // https://github.com/tokio-rs/tokio-uring/blob/c4320fa2e7b146b28ad921ae25b552a0894c9697/src/io/statx.rs#L47-L61 From 2d4b1c4dcf758d1319d8f65cf8bbf170416a2f08 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 7 Feb 2024 09:58:14 +0000 Subject: [PATCH 06/11] move SubmittingBox into its own `util` module --- tokio-epoll-uring/src/ops/statx.rs | 44 +-------------- .../src/system/lifecycle/handle.rs | 9 +-- tokio-epoll-uring/src/util/mod.rs | 1 + tokio-epoll-uring/src/util/submitting_box.rs | 56 +++++++++++++++++++ 4 files changed, 60 insertions(+), 50 deletions(-) create mode 100644 tokio-epoll-uring/src/util/submitting_box.rs diff --git a/tokio-epoll-uring/src/ops/statx.rs b/tokio-epoll-uring/src/ops/statx.rs index 35d3f9f..80517b3 100644 --- a/tokio-epoll-uring/src/ops/statx.rs +++ b/tokio-epoll-uring/src/ops/statx.rs @@ -1,4 +1,3 @@ -use core::panic; use std::os::fd::AsRawFd; use uring_common::libc; @@ -8,6 +7,7 @@ use uring_common::{ }; use crate::system::submission::op_fut::Op; +use crate::util::submitting_box::SubmittingBox; pub use uring_common::libc::statx; @@ -47,48 +47,6 @@ where }, } -// TODO: refine the `Op` trait so we encode this state in the typesystem as a typestate -pub enum SubmittingBox -where - A: 'static, -{ - NotSubmitting(Box), - Submitting(*mut A), - Undefined, -} - -impl SubmittingBox { - fn start_submitting(&mut self) -> &'static mut A { - match std::mem::replace(self, Self::Undefined) { - SubmittingBox::NotSubmitting(v) => { - let leaked = Box::leak(v); - *self = Self::Submitting(leaked as *mut _); - leaked - } - SubmittingBox::Submitting(_) => { - panic!("must not call this function more than once without ownership_back_in_userspace() inbetween") - } - Self::Undefined => { - panic!("implementation error; did we panic earlier in the ::Submitting case?") - } - } - } - - /// # Safety - /// - /// Callers must ensure that userspace, and in particular, _the caller_ has again exclusive ownership - /// over the memory. - unsafe fn ownership_back_in_userspace(mut self) -> Box { - match std::mem::replace(&mut self, SubmittingBox::Undefined) { - SubmittingBox::NotSubmitting(_) => { - panic!("must not call this function without prior call to start_submitting()") - } - SubmittingBox::Submitting(leaked) => Box::from_raw(leaked), - SubmittingBox::Undefined => todo!(), - } - } -} - /// SAFETY: we only needs this because we store the pointer while Submitting::Yes unsafe impl Send for StatxOp where F: IoFd + Send {} diff --git a/tokio-epoll-uring/src/system/lifecycle/handle.rs b/tokio-epoll-uring/src/system/lifecycle/handle.rs index 8913a32..42ad0a7 100644 --- a/tokio-epoll-uring/src/system/lifecycle/handle.rs +++ b/tokio-epoll-uring/src/system/lifecycle/handle.rs @@ -5,12 +5,7 @@ use std::{os::fd::OwnedFd, path::Path, task::ready}; use uring_common::{buf::BoundedBufMut, io_fd::IoFd}; use crate::{ - ops::{ - fsync::FsyncOp, - open_at::OpenAtOp, - read::ReadOp, - statx::{StatxOp, SubmittingBox}, - }, + ops::{fsync::FsyncOp, open_at::OpenAtOp, read::ReadOp, statx::StatxOp}, system::submission::{op_fut::execute_op, SubmitSide}, }; @@ -195,7 +190,7 @@ impl crate::SystemHandle { ); let op = StatxOp::ByFileDescriptor { file, - statxbuf: SubmittingBox::NotSubmitting(buf), + statxbuf: crate::util::submitting_box::SubmittingBox::NotSubmitting(buf), }; let inner = self.inner.as_ref().unwrap(); let (resources, result) = execute_op(op, inner.submit_side.weak(), None).await; diff --git a/tokio-epoll-uring/src/util/mod.rs b/tokio-epoll-uring/src/util/mod.rs index bb1bb94..0fa2427 100644 --- a/tokio-epoll-uring/src/util/mod.rs +++ b/tokio-epoll-uring/src/util/mod.rs @@ -1 +1,2 @@ pub(crate) mod oneshot_nonconsuming; +pub(crate) mod submitting_box; diff --git a/tokio-epoll-uring/src/util/submitting_box.rs b/tokio-epoll-uring/src/util/submitting_box.rs new file mode 100644 index 0000000..06cc280 --- /dev/null +++ b/tokio-epoll-uring/src/util/submitting_box.rs @@ -0,0 +1,56 @@ +//! See [`SubmittingBox`]. + +/// A wrapper around [`Box`] with an API that forces users to spell out +/// ownerhsip transitions of the memory between kernel and userspace. +pub enum SubmittingBox +where + A: 'static, +{ + NotSubmitting(Box), + Submitting(*mut A), + Undefined, +} + +impl SubmittingBox { + /// [`Box::leak`] the inner box. + /// + /// # Panics + /// + /// Panics if this function has already been called on `self` + /// before without a call to [`Self::ownership_back_in_userspace`] inbetween. + pub(crate) fn start_submitting(&mut self) -> &'static mut A { + match std::mem::replace(self, Self::Undefined) { + SubmittingBox::NotSubmitting(v) => { + let leaked = Box::leak(v); + *self = Self::Submitting(leaked as *mut _); + leaked + } + SubmittingBox::Submitting(_) => { + panic!("must not call this function more than once without ownership_back_in_userspace() inbetween") + } + Self::Undefined => { + panic!("implementation error; did we panic earlier in the ::Submitting case?") + } + } + } + + /// [`Box::from_raw`] the inner box. + /// + /// # Panics + /// + /// Panics if there was no preceding call to [`Self::start_submitting`]. + /// + /// # Safety + /// + /// Callers must ensure that userspace, and in particular, _the caller_ has again exclusive ownership + /// over the memory. + pub(crate) unsafe fn ownership_back_in_userspace(mut self) -> Box { + match std::mem::replace(&mut self, SubmittingBox::Undefined) { + SubmittingBox::NotSubmitting(_) => { + panic!("must not call this function without prior call to start_submitting()") + } + SubmittingBox::Submitting(leaked) => Box::from_raw(leaked), + SubmittingBox::Undefined => todo!(), + } + } +} From 2f26dbe0b874fd57c897bb48a75ef21a87a96741 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 7 Feb 2024 10:05:58 +0000 Subject: [PATCH 07/11] cleanup SubmittingBox construction --- tokio-epoll-uring/src/ops/statx.rs | 18 +------------ .../src/system/lifecycle/handle.rs | 4 +-- tokio-epoll-uring/src/util/submitting_box.rs | 26 ++++++++++++------- 3 files changed, 19 insertions(+), 29 deletions(-) diff --git a/tokio-epoll-uring/src/ops/statx.rs b/tokio-epoll-uring/src/ops/statx.rs index 80517b3..16003a5 100644 --- a/tokio-epoll-uring/src/ops/statx.rs +++ b/tokio-epoll-uring/src/ops/statx.rs @@ -13,7 +13,7 @@ pub use uring_common::libc::statx; // See `https://man.archlinux.org/man/statx.2.en#Invoking_%3Cb%3Estatx%3C/b%3E():` // to understand why there are different variants and why they're named the way they are. -pub enum StatxOp +pub(crate) enum StatxOp where F: IoFd + Send, { @@ -23,19 +23,6 @@ where }, } -impl StatxOp -where - F: IoFd + Send, -{ - // Do the equivalent of fstat. - pub fn new_fstat(file: F, statxbuf: Box) -> StatxOp { - StatxOp::ByFileDescriptor { - file, - statxbuf: SubmittingBox::NotSubmitting(statxbuf), - } - } -} - #[non_exhaustive] pub enum Resources where @@ -47,9 +34,6 @@ where }, } -/// SAFETY: we only needs this because we store the pointer while Submitting::Yes -unsafe impl Send for StatxOp where F: IoFd + Send {} - impl crate::sealed::Sealed for StatxOp where F: IoFd + Send {} impl Op for StatxOp diff --git a/tokio-epoll-uring/src/system/lifecycle/handle.rs b/tokio-epoll-uring/src/system/lifecycle/handle.rs index 42ad0a7..a81ef52 100644 --- a/tokio-epoll-uring/src/system/lifecycle/handle.rs +++ b/tokio-epoll-uring/src/system/lifecycle/handle.rs @@ -182,7 +182,7 @@ impl crate::SystemHandle { crate::system::submission::op_fut::Error, >, ) { - // TODO: avoid the allocation? allow callers to provide their own buffer? + // TODO: avoid the allocation, or optimize using a slab cacke? let buf: Box = Box::new( // TODO replace with Box, https://github.com/rust-lang/rust/issues/63291 // SAFETY: we only use the memory if the fstat succeeds, should be using MaybeUninit here. @@ -190,7 +190,7 @@ impl crate::SystemHandle { ); let op = StatxOp::ByFileDescriptor { file, - statxbuf: crate::util::submitting_box::SubmittingBox::NotSubmitting(buf), + statxbuf: crate::util::submitting_box::SubmittingBox::new(buf), }; let inner = self.inner.as_ref().unwrap(); let (resources, result) = execute_op(op, inner.submit_side.weak(), None).await; diff --git a/tokio-epoll-uring/src/util/submitting_box.rs b/tokio-epoll-uring/src/util/submitting_box.rs index 06cc280..1ce5d53 100644 --- a/tokio-epoll-uring/src/util/submitting_box.rs +++ b/tokio-epoll-uring/src/util/submitting_box.rs @@ -2,26 +2,32 @@ /// A wrapper around [`Box`] with an API that forces users to spell out /// ownerhsip transitions of the memory between kernel and userspace. -pub enum SubmittingBox +pub enum SubmittingBox where - A: 'static, + T: 'static, { - NotSubmitting(Box), - Submitting(*mut A), + NotSubmitting { inner: Box }, + Submitting(*mut T), Undefined, } -impl SubmittingBox { +unsafe impl Send for SubmittingBox where T: Send {} + +impl SubmittingBox { + pub(crate) fn new(inner: Box) -> Self { + Self::NotSubmitting { inner } + } + /// [`Box::leak`] the inner box. /// /// # Panics /// /// Panics if this function has already been called on `self` /// before without a call to [`Self::ownership_back_in_userspace`] inbetween. - pub(crate) fn start_submitting(&mut self) -> &'static mut A { + pub(crate) fn start_submitting(&mut self) -> &'static mut T { match std::mem::replace(self, Self::Undefined) { - SubmittingBox::NotSubmitting(v) => { - let leaked = Box::leak(v); + SubmittingBox::NotSubmitting { inner } => { + let leaked = Box::leak(inner); *self = Self::Submitting(leaked as *mut _); leaked } @@ -44,9 +50,9 @@ impl SubmittingBox { /// /// Callers must ensure that userspace, and in particular, _the caller_ has again exclusive ownership /// over the memory. - pub(crate) unsafe fn ownership_back_in_userspace(mut self) -> Box { + pub(crate) unsafe fn ownership_back_in_userspace(mut self) -> Box { match std::mem::replace(&mut self, SubmittingBox::Undefined) { - SubmittingBox::NotSubmitting(_) => { + SubmittingBox::NotSubmitting { .. } => { panic!("must not call this function without prior call to start_submitting()") } SubmittingBox::Submitting(leaked) => Box::from_raw(leaked), From 6b4242e3adba4297d221e9c7b1de491444fbaf52 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 7 Feb 2024 10:13:53 +0000 Subject: [PATCH 08/11] polish api visibility --- tokio-epoll-uring/src/ops/statx.rs | 34 ++++++++++++------- .../src/system/lifecycle/handle.rs | 8 ++--- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/tokio-epoll-uring/src/ops/statx.rs b/tokio-epoll-uring/src/ops/statx.rs index 16003a5..46e89b9 100644 --- a/tokio-epoll-uring/src/ops/statx.rs +++ b/tokio-epoll-uring/src/ops/statx.rs @@ -1,36 +1,46 @@ +use crate::system::submission::op_fut::Op; +use crate::util::submitting_box::SubmittingBox; use std::os::fd::AsRawFd; use uring_common::libc; - +pub use uring_common::libc::statx; use uring_common::{ io_fd::IoFd, io_uring::{self}, }; -use crate::system::submission::op_fut::Op; -use crate::util::submitting_box::SubmittingBox; - -pub use uring_common::libc::statx; +pub(crate) fn op( + resources: Resources, +) -> impl Op, Success = (), Error = std::io::Error> +where + F: IoFd + Send, +{ + match resources { + Resources::ByFileDescriptor { file, statxbuf } => StatxOp::ByFileDescriptor { + file, + statxbuf: SubmittingBox::new(statxbuf), + }, + } +} -// See `https://man.archlinux.org/man/statx.2.en#Invoking_%3Cb%3Estatx%3C/b%3E():` -// to understand why there are different variants and why they're named the way they are. -pub(crate) enum StatxOp +pub enum Resources where F: IoFd + Send, { ByFileDescriptor { file: F, - statxbuf: SubmittingBox, + statxbuf: Box, }, } -#[non_exhaustive] -pub enum Resources +// See `https://man.archlinux.org/man/statx.2.en#Invoking_%3Cb%3Estatx%3C/b%3E():` +// to understand why there are different variants and why they're named the way they are. +enum StatxOp where F: IoFd + Send, { ByFileDescriptor { file: F, - statxbuf: Box, + statxbuf: SubmittingBox, }, } diff --git a/tokio-epoll-uring/src/system/lifecycle/handle.rs b/tokio-epoll-uring/src/system/lifecycle/handle.rs index a81ef52..0957a2f 100644 --- a/tokio-epoll-uring/src/system/lifecycle/handle.rs +++ b/tokio-epoll-uring/src/system/lifecycle/handle.rs @@ -5,7 +5,7 @@ use std::{os::fd::OwnedFd, path::Path, task::ready}; use uring_common::{buf::BoundedBufMut, io_fd::IoFd}; use crate::{ - ops::{fsync::FsyncOp, open_at::OpenAtOp, read::ReadOp, statx::StatxOp}, + ops::{fsync::FsyncOp, open_at::OpenAtOp, read::ReadOp, statx}, system::submission::{op_fut::execute_op, SubmitSide}, }; @@ -188,10 +188,10 @@ impl crate::SystemHandle { // SAFETY: we only use the memory if the fstat succeeds, should be using MaybeUninit here. unsafe { std::mem::zeroed() }, ); - let op = StatxOp::ByFileDescriptor { + let op = statx::op(statx::Resources::ByFileDescriptor { file, - statxbuf: crate::util::submitting_box::SubmittingBox::new(buf), - }; + statxbuf: buf, + }); let inner = self.inner.as_ref().unwrap(); let (resources, result) = execute_op(op, inner.submit_side.weak(), None).await; let crate::ops::statx::Resources::ByFileDescriptor { file, statxbuf } = resources; From ba5864d3c02e9b87f2708f780930d58a2a25a835 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 7 Feb 2024 10:20:06 +0000 Subject: [PATCH 09/11] implement write operation --- tokio-epoll-uring/src/ops/mod.rs | 1 + tokio-epoll-uring/src/ops/write.rs | 63 +++++++++++++++++++ .../src/system/lifecycle/handle.rs | 23 ++++++- 3 files changed, 85 insertions(+), 2 deletions(-) create mode 100644 tokio-epoll-uring/src/ops/write.rs diff --git a/tokio-epoll-uring/src/ops/mod.rs b/tokio-epoll-uring/src/ops/mod.rs index a030801..b5e23ab 100644 --- a/tokio-epoll-uring/src/ops/mod.rs +++ b/tokio-epoll-uring/src/ops/mod.rs @@ -8,3 +8,4 @@ pub mod nop; pub mod open_at; pub mod read; pub mod statx; +pub mod write; diff --git a/tokio-epoll-uring/src/ops/write.rs b/tokio-epoll-uring/src/ops/write.rs new file mode 100644 index 0000000..0b43001 --- /dev/null +++ b/tokio-epoll-uring/src/ops/write.rs @@ -0,0 +1,63 @@ +use std::os::fd::AsRawFd; + +use uring_common::{buf::BoundedBuf, io_fd::IoFd, io_uring}; + +use crate::system::submission::op_fut::Op; + +pub struct WriteOp +where + F: IoFd + Send, + B: BoundedBuf + Send, +{ + pub(crate) file: F, + pub(crate) offset: u64, + pub(crate) buf: B, +} + +impl crate::sealed::Sealed for WriteOp +where + F: IoFd + Send, + B: BoundedBuf + Send, +{ +} + +impl Op for WriteOp +where + F: IoFd + Send, + B: BoundedBuf + Send, +{ + type Resources = (F, B); + type Success = usize; + type Error = std::io::Error; + + fn make_sqe(&mut self) -> io_uring::squeue::Entry { + io_uring::opcode::Write::new( + io_uring::types::Fd( + // SAFETY: we hold `F` in self, and if `self` is dropped, we hand the fd to the + // `System` to keep it live until the operation completes. + #[allow(unused_unsafe)] + unsafe { + self.file.as_fd().as_raw_fd() + }, + ), + self.buf.stable_ptr(), + self.buf.bytes_init() as _, + ) + .offset(self.offset) + .build() + } + + fn on_failed_submission(self) -> Self::Resources { + (self.file, self.buf) + } + + fn on_op_completion(self, res: i32) -> (Self::Resources, Result) { + // https://man.archlinux.org/man/extra/liburing/io_uring_prep_write.3.en + let res = if res < 0 { + Err(std::io::Error::from_raw_os_error(-res)) + } else { + Ok(res as usize) + }; + ((self.file, self.buf), res) + } +} diff --git a/tokio-epoll-uring/src/system/lifecycle/handle.rs b/tokio-epoll-uring/src/system/lifecycle/handle.rs index 0957a2f..c21e04f 100644 --- a/tokio-epoll-uring/src/system/lifecycle/handle.rs +++ b/tokio-epoll-uring/src/system/lifecycle/handle.rs @@ -2,10 +2,13 @@ use futures::FutureExt; use std::{os::fd::OwnedFd, path::Path, task::ready}; -use uring_common::{buf::BoundedBufMut, io_fd::IoFd}; +use uring_common::{ + buf::{BoundedBuf, BoundedBufMut}, + io_fd::IoFd, +}; use crate::{ - ops::{fsync::FsyncOp, open_at::OpenAtOp, read::ReadOp, statx}, + ops::{fsync::FsyncOp, open_at::OpenAtOp, read::ReadOp, statx, write::WriteOp}, system::submission::{op_fut::execute_op, SubmitSide}, }; @@ -200,4 +203,20 @@ impl crate::SystemHandle { Err(e) => (file, Err(e)), } } + + pub fn write( + &self, + file: F, + offset: u64, + buf: B, + ) -> impl std::future::Future< + Output = ( + (F, B), + Result>, + ), + > { + let op = WriteOp { file, offset, buf }; + let inner = self.inner.as_ref().unwrap(); + execute_op(op, inner.submit_side.weak(), None) + } } From 752eba60de10dbaec054ef453f0997417f92bef2 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 8 Feb 2024 10:50:10 +0000 Subject: [PATCH 10/11] add test --- tokio-epoll-uring/src/system/tests.rs | 33 +++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tokio-epoll-uring/src/system/tests.rs b/tokio-epoll-uring/src/system/tests.rs index 4564898..99d6511 100644 --- a/tokio-epoll-uring/src/system/tests.rs +++ b/tokio-epoll-uring/src/system/tests.rs @@ -4,6 +4,7 @@ use std::{ time::Duration, }; +use tempfile::{tempdir, tempfile}; use tokio_util::sync::CancellationToken; use crate::{ @@ -264,3 +265,35 @@ async fn test_statx() { // TODO: once we add statx with pathname instead of file descriptor, // ensure we get NotFound back when the file doesn't exist. } + +#[tokio::test] +async fn test_write() { + let system = System::launch().await.unwrap(); + + let tempdir = tempfile::tempdir().unwrap(); + + let file_path = tempdir.path().join("some_file"); + let std_file = std::fs::File::create(&file_path).unwrap(); + let fd = OwnedFd::from(std_file); + + let write1 = b"some"; + let write2 = b"content"; + let ((fd, _), res) = system.write(fd, 0, write1.to_vec()).await; + res.unwrap(); + + assert_eq!(&write1[..], &std::fs::read(&file_path).unwrap()); + + // make sure there's no hidden file cursor underneath, i.e., that it's really write_at + let ((fd, _), res) = system.write(fd, 2, write2.to_vec()).await; + res.unwrap(); + + assert_eq!( + { + let mut expect = vec![]; + expect.extend_from_slice(&write1[0..2]); + expect.extend(write2); + expect + }, + std::fs::read(&file_path).unwrap() + ); +} From a3d8f0380c4a6bb2762e1a66e1b1d783151f8f2c Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 8 Feb 2024 10:53:05 +0000 Subject: [PATCH 11/11] fix warnings --- tokio-epoll-uring/src/system/tests.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tokio-epoll-uring/src/system/tests.rs b/tokio-epoll-uring/src/system/tests.rs index 99d6511..0168ef1 100644 --- a/tokio-epoll-uring/src/system/tests.rs +++ b/tokio-epoll-uring/src/system/tests.rs @@ -4,7 +4,6 @@ use std::{ time::Duration, }; -use tempfile::{tempdir, tempfile}; use tokio_util::sync::CancellationToken; use crate::{ @@ -296,4 +295,6 @@ async fn test_write() { }, std::fs::read(&file_path).unwrap() ); + + drop(fd); }