diff --git a/tokio-epoll-uring/src/ops/mod.rs b/tokio-epoll-uring/src/ops/mod.rs index a030801..b5e23ab 100644 --- a/tokio-epoll-uring/src/ops/mod.rs +++ b/tokio-epoll-uring/src/ops/mod.rs @@ -8,3 +8,4 @@ pub mod nop; pub mod open_at; pub mod read; pub mod statx; +pub mod write; diff --git a/tokio-epoll-uring/src/ops/write.rs b/tokio-epoll-uring/src/ops/write.rs new file mode 100644 index 0000000..0b43001 --- /dev/null +++ b/tokio-epoll-uring/src/ops/write.rs @@ -0,0 +1,63 @@ +use std::os::fd::AsRawFd; + +use uring_common::{buf::BoundedBuf, io_fd::IoFd, io_uring}; + +use crate::system::submission::op_fut::Op; + +pub struct WriteOp +where + F: IoFd + Send, + B: BoundedBuf + Send, +{ + pub(crate) file: F, + pub(crate) offset: u64, + pub(crate) buf: B, +} + +impl crate::sealed::Sealed for WriteOp +where + F: IoFd + Send, + B: BoundedBuf + Send, +{ +} + +impl Op for WriteOp +where + F: IoFd + Send, + B: BoundedBuf + Send, +{ + type Resources = (F, B); + type Success = usize; + type Error = std::io::Error; + + fn make_sqe(&mut self) -> io_uring::squeue::Entry { + io_uring::opcode::Write::new( + io_uring::types::Fd( + // SAFETY: we hold `F` in self, and if `self` is dropped, we hand the fd to the + // `System` to keep it live until the operation completes. + #[allow(unused_unsafe)] + unsafe { + self.file.as_fd().as_raw_fd() + }, + ), + self.buf.stable_ptr(), + self.buf.bytes_init() as _, + ) + .offset(self.offset) + .build() + } + + fn on_failed_submission(self) -> Self::Resources { + (self.file, self.buf) + } + + fn on_op_completion(self, res: i32) -> (Self::Resources, Result) { + // https://man.archlinux.org/man/extra/liburing/io_uring_prep_write.3.en + let res = if res < 0 { + Err(std::io::Error::from_raw_os_error(-res)) + } else { + Ok(res as usize) + }; + ((self.file, self.buf), res) + } +} diff --git a/tokio-epoll-uring/src/system/lifecycle/handle.rs b/tokio-epoll-uring/src/system/lifecycle/handle.rs index ce8beae..ed517ad 100644 --- a/tokio-epoll-uring/src/system/lifecycle/handle.rs +++ b/tokio-epoll-uring/src/system/lifecycle/handle.rs @@ -2,10 +2,13 @@ use futures::FutureExt; use std::{mem::MaybeUninit, os::fd::OwnedFd, path::Path, task::ready}; -use uring_common::{buf::BoundedBufMut, io_fd::IoFd}; +use uring_common::{ + buf::{BoundedBuf, BoundedBufMut}, + io_fd::IoFd, +}; use crate::{ - ops::{fsync::FsyncOp, open_at::OpenAtOp, read::ReadOp, statx}, + ops::{fsync::FsyncOp, open_at::OpenAtOp, read::ReadOp, statx, write::WriteOp}, system::submission::{op_fut::execute_op, SubmitSide}, }; @@ -209,4 +212,20 @@ impl crate::SystemHandle { Err(e) => (file, Err(e)), } } + + pub fn write( + &self, + file: F, + offset: u64, + buf: B, + ) -> impl std::future::Future< + Output = ( + (F, B), + Result>, + ), + > { + let op = WriteOp { file, offset, buf }; + let inner = self.inner.as_ref().unwrap(); + execute_op(op, inner.submit_side.weak(), None) + } } diff --git a/tokio-epoll-uring/src/system/tests.rs b/tokio-epoll-uring/src/system/tests.rs index 4564898..0168ef1 100644 --- a/tokio-epoll-uring/src/system/tests.rs +++ b/tokio-epoll-uring/src/system/tests.rs @@ -264,3 +264,37 @@ async fn test_statx() { // TODO: once we add statx with pathname instead of file descriptor, // ensure we get NotFound back when the file doesn't exist. } + +#[tokio::test] +async fn test_write() { + let system = System::launch().await.unwrap(); + + let tempdir = tempfile::tempdir().unwrap(); + + let file_path = tempdir.path().join("some_file"); + let std_file = std::fs::File::create(&file_path).unwrap(); + let fd = OwnedFd::from(std_file); + + let write1 = b"some"; + let write2 = b"content"; + let ((fd, _), res) = system.write(fd, 0, write1.to_vec()).await; + res.unwrap(); + + assert_eq!(&write1[..], &std::fs::read(&file_path).unwrap()); + + // make sure there's no hidden file cursor underneath, i.e., that it's really write_at + let ((fd, _), res) = system.write(fd, 2, write2.to_vec()).await; + res.unwrap(); + + assert_eq!( + { + let mut expect = vec![]; + expect.extend_from_slice(&write1[0..2]); + expect.extend(write2); + expect + }, + std::fs::read(&file_path).unwrap() + ); + + drop(fd); +}