From 91d67e20fe5224eb8b7b99c4e84543b033a43a65 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 22 Nov 2023 08:24:20 +0100 Subject: [PATCH] vendor `IoBuf`/`IoBufMut` from `tokio-uring` (#24) In the draft PR for [`open_at` support](https://github.com/neondatabase/tokio-epoll-uring/pull/21) we need an `OpenOptions` struct. Sadly we can't re-use the one from `tokio-uring` because it doesn't have a public API for the conversion of OpenOptions into the relevant libc flags. We created [a PR asking for such an API](https://github.com/neondatabase/tokio-uring/pull/1), but, in the meantime, let's unblock ourselves by vendoring the pieces of `tokio-uring` that we need, and cusotmize them as needed. This PR starts that effort by vendoring the `IoBuf`/`IoBufMut` traits as well as `tokio-uring`'s approach to support slice-like operations. Support for `OpenOptions` will follow as part of [the PR that adds `open_at` support on top of this PR](https://github.com/neondatabase/tokio-epoll-uring/pull/25). The files that reproduce the `tokio-uring` LICENSE text at the top are copied from `tokio-uring.git:d5e90539bd6d1c518e848298564a098c300866bc`. Files without it were written by myself. To make `cargo test` pass, I had to remove the examples in the doc comments. --- Cargo.lock | 11 +- Cargo.toml | 1 + tokio-epoll-uring/Cargo.toml | 3 +- tokio-epoll-uring/src/lib.rs | 2 +- tokio-epoll-uring/src/ops/nop.rs | 2 + tokio-epoll-uring/src/ops/read.rs | 10 +- tokio-epoll-uring/src/system/completion.rs | 1 + tokio-epoll-uring/src/system/lifecycle.rs | 1 + .../src/system/lifecycle/handle.rs | 2 +- tokio-epoll-uring/src/system/slots.rs | 1 + tokio-epoll-uring/src/system/submission.rs | 1 + .../src/system/submission/op_fut.rs | 2 + .../system/test_util/shared_system_handle.rs | 3 +- uring-common/Cargo.toml | 10 + uring-common/src/buf.rs | 60 +++++ uring-common/src/buf/bounded.rs | 208 ++++++++++++++++++ uring-common/src/buf/io_buf.rs | 141 ++++++++++++ uring-common/src/buf/io_buf_mut.rs | 89 ++++++++ uring-common/src/buf/slice.rs | 178 +++++++++++++++ uring-common/src/lib.rs | 3 + 20 files changed, 718 insertions(+), 11 deletions(-) create mode 100644 uring-common/Cargo.toml create mode 100644 uring-common/src/buf.rs create mode 100644 uring-common/src/buf/bounded.rs create mode 100644 uring-common/src/buf/io_buf.rs create mode 100644 uring-common/src/buf/io_buf_mut.rs create mode 100644 uring-common/src/buf/slice.rs create mode 100644 uring-common/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 5f78f5a..8e80523 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1611,7 +1611,6 @@ version = "0.1.0" dependencies = [ "assert-panic", "futures", - "io-uring 0.6.0", "nix 0.26.2", "once_cell", "os_pipe", @@ -1619,10 +1618,10 @@ dependencies = [ "tempfile", "thiserror", "tokio", - "tokio-uring", "tokio-util", "tracing", "tracing-subscriber", + "uring-common", ] [[package]] @@ -1731,6 +1730,14 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" +[[package]] +name = "uring-common" +version = "0.1.0" +dependencies = [ + "io-uring 0.6.0", + "libc", +] + [[package]] name = "utf8parse" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 717df9d..e32cc4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,4 +3,5 @@ members = [ "tokio-epoll-uring", "benchmark", + "uring-common", ] diff --git a/tokio-epoll-uring/Cargo.toml b/tokio-epoll-uring/Cargo.toml index 8e96817..e2f3418 100644 --- a/tokio-epoll-uring/Cargo.toml +++ b/tokio-epoll-uring/Cargo.toml @@ -9,14 +9,13 @@ license = "MIT OR Apache-2.0" [dependencies] futures = "0.3.28" -io-uring = "0.6.0" once_cell = "1.18.0" scopeguard = "1.1.0" thiserror = "1.0.44" tokio = { version = "1.29.1", features = ["io-std", "full"] } -tokio-uring = "0.4.0" tokio-util = "0.7.8" tracing = "0.1.37" +uring-common = { path = "../uring-common" } [dev-dependencies] tokio = { version = "1.29.1", features = ["full"] } diff --git a/tokio-epoll-uring/src/lib.rs b/tokio-epoll-uring/src/lib.rs index fcf5136..3154c0a 100644 --- a/tokio-epoll-uring/src/lib.rs +++ b/tokio-epoll-uring/src/lib.rs @@ -80,7 +80,7 @@ pub use system::lifecycle::thread_local::{thread_local_system, Handle}; pub use system::lifecycle::System; pub use system::submission::op_fut::Error as SystemError; -pub use tokio_uring::buf::{IoBuf, IoBufMut}; +pub use uring_common::buf::{IoBuf, IoBufMut}; pub(crate) mod util; diff --git a/tokio-epoll-uring/src/ops/nop.rs b/tokio-epoll-uring/src/ops/nop.rs index a3bae42..7794650 100644 --- a/tokio-epoll-uring/src/ops/nop.rs +++ b/tokio-epoll-uring/src/ops/nop.rs @@ -1,3 +1,5 @@ +use uring_common::io_uring; + use crate::system::submission::op_fut::Op; pub struct Nop {} diff --git a/tokio-epoll-uring/src/ops/read.rs b/tokio-epoll-uring/src/ops/read.rs index 3e86172..29e7515 100644 --- a/tokio-epoll-uring/src/ops/read.rs +++ b/tokio-epoll-uring/src/ops/read.rs @@ -1,21 +1,23 @@ use std::os::fd::{AsRawFd, OwnedFd}; +use uring_common::{buf::IoBufMut, io_uring}; + use crate::system::submission::op_fut::Op; pub struct ReadOp where - B: tokio_uring::buf::IoBufMut + Send, + B: IoBufMut + Send, { pub(crate) file: OwnedFd, pub(crate) offset: u64, pub(crate) buf: B, } -impl crate::sealed::Sealed for ReadOp where B: tokio_uring::buf::IoBufMut + Send {} +impl crate::sealed::Sealed for ReadOp where B: IoBufMut + Send {} impl Op for ReadOp where - B: tokio_uring::buf::IoBufMut + Send, + B: IoBufMut + Send, { type Resources = (OwnedFd, B); type Success = usize; @@ -43,7 +45,7 @@ where let res = if res < 0 { Err(std::io::Error::from_raw_os_error(-res)) } else { - unsafe { tokio_uring::buf::IoBufMut::set_init(&mut self.buf, res as usize) }; + unsafe { IoBufMut::set_init(&mut self.buf, res as usize) }; Ok(res as usize) }; ((self.file, self.buf), res) diff --git a/tokio-epoll-uring/src/system/completion.rs b/tokio-epoll-uring/src/system/completion.rs index a12c05b..a87c146 100644 --- a/tokio-epoll-uring/src/system/completion.rs +++ b/tokio-epoll-uring/src/system/completion.rs @@ -6,6 +6,7 @@ use std::{ use io_uring::CompletionQueue; use tokio::sync::{self, broadcast, mpsc, oneshot}; use tracing::{debug, info, info_span, trace, Instrument}; +use uring_common::io_uring; use crate::{system::submission::SubmitSideInner, util::oneshot_nonconsuming}; diff --git a/tokio-epoll-uring/src/system/lifecycle.rs b/tokio-epoll-uring/src/system/lifecycle.rs index a726ff4..82d6d22 100644 --- a/tokio-epoll-uring/src/system/lifecycle.rs +++ b/tokio-epoll-uring/src/system/lifecycle.rs @@ -7,6 +7,7 @@ pub mod handle; pub mod thread_local; use io_uring::{CompletionQueue, SubmissionQueue, Submitter}; +use uring_common::io_uring; use crate::{ system::{completion::ShutdownRequestImpl, RING_SIZE}, diff --git a/tokio-epoll-uring/src/system/lifecycle/handle.rs b/tokio-epoll-uring/src/system/lifecycle/handle.rs index 7de6d20..97b1f8b 100644 --- a/tokio-epoll-uring/src/system/lifecycle/handle.rs +++ b/tokio-epoll-uring/src/system/lifecycle/handle.rs @@ -2,7 +2,7 @@ use futures::FutureExt; use std::{os::fd::OwnedFd, task::ready}; -use tokio_uring::buf::IoBufMut; +use uring_common::buf::IoBufMut; use crate::{ ops::read::ReadOp, diff --git a/tokio-epoll-uring/src/system/slots.rs b/tokio-epoll-uring/src/system/slots.rs index 35df06c..e2ae935 100644 --- a/tokio-epoll-uring/src/system/slots.rs +++ b/tokio-epoll-uring/src/system/slots.rs @@ -32,6 +32,7 @@ use std::{ use tokio::sync::oneshot; use tracing::{debug, trace}; +use uring_common::io_uring; use crate::system::submission::op_fut::Error; diff --git a/tokio-epoll-uring/src/system/submission.rs b/tokio-epoll-uring/src/system/submission.rs index 85e93f4..f1f184e 100644 --- a/tokio-epoll-uring/src/system/submission.rs +++ b/tokio-epoll-uring/src/system/submission.rs @@ -6,6 +6,7 @@ use std::{ }; use io_uring::{SubmissionQueue, Submitter}; +use uring_common::io_uring; use super::{ completion::CompletionSide, diff --git a/tokio-epoll-uring/src/system/submission/op_fut.rs b/tokio-epoll-uring/src/system/submission/op_fut.rs index 9edaa25..1e8ba9d 100644 --- a/tokio-epoll-uring/src/system/submission/op_fut.rs +++ b/tokio-epoll-uring/src/system/submission/op_fut.rs @@ -12,6 +12,8 @@ pub trait Op: crate::sealed::Sealed + Sized + Send + 'static { fn make_sqe(&mut self) -> io_uring::squeue::Entry; } +use uring_common::io_uring; + use crate::system::{ completion::ProcessCompletionsCause, slots::{self, SlotHandle}, diff --git a/tokio-epoll-uring/src/system/test_util/shared_system_handle.rs b/tokio-epoll-uring/src/system/test_util/shared_system_handle.rs index 367177b..985adaa 100644 --- a/tokio-epoll-uring/src/system/test_util/shared_system_handle.rs +++ b/tokio-epoll-uring/src/system/test_util/shared_system_handle.rs @@ -1,6 +1,7 @@ use std::sync::{Arc, RwLock}; use futures::Future; +use uring_common::buf::IoBufMut; use crate::SystemError; use crate::{System, SystemHandle}; @@ -42,7 +43,7 @@ impl SharedSystemHandle { .initiate_shutdown() } - pub fn read( + pub fn read( &self, file: std::os::fd::OwnedFd, offset: u64, diff --git a/uring-common/Cargo.toml b/uring-common/Cargo.toml new file mode 100644 index 0000000..c50ebd6 --- /dev/null +++ b/uring-common/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "uring-common" +version = "0.1.0" +edition = "2021" +description = "a subset of types from the `tokio-uring` crate" +license = "MIT" # the same as tokio-uring at the time we forked it + +[dependencies] +libc = "0.2.80" +io-uring = "0.6.0" diff --git a/uring-common/src/buf.rs b/uring-common/src/buf.rs new file mode 100644 index 0000000..847c6cf --- /dev/null +++ b/uring-common/src/buf.rs @@ -0,0 +1,60 @@ +// Copyright (c) 2021 Carl Lerche +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +// +// +// Based on tokio-uring.git:d5e90539bd6d1c518e848298564a098c300866bc + +//! Utilities for working with buffers. +//! +//! `io-uring` APIs require passing ownership of buffers to the runtime. The +//! crate defines [`IoBuf`] and [`IoBufMut`] traits which are implemented by buffer +//! types that respect the `io-uring` contract. + +// pub mod fixed; + +mod io_buf; +pub use io_buf::IoBuf; + +mod io_buf_mut; +pub use io_buf_mut::IoBufMut; + +mod slice; +pub use slice::Slice; + +mod bounded; +pub use bounded::{BoundedBuf, BoundedBufMut}; + +pub(crate) fn deref(buf: &impl IoBuf) -> &[u8] { + // Safety: the `IoBuf` trait is marked as unsafe and is expected to be + // implemented correctly. + unsafe { std::slice::from_raw_parts(buf.stable_ptr(), buf.bytes_init()) } +} + +pub(crate) fn deref_mut(buf: &mut impl IoBufMut) -> &mut [u8] { + // Safety: the `IoBufMut` trait is marked as unsafe and is expected to be + // implemented correct. + unsafe { std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_init()) } +} diff --git a/uring-common/src/buf/bounded.rs b/uring-common/src/buf/bounded.rs new file mode 100644 index 0000000..21f1e77 --- /dev/null +++ b/uring-common/src/buf/bounded.rs @@ -0,0 +1,208 @@ +// Copyright (c) 2021 Carl Lerche +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +// +// +// Based on tokio-uring.git:d5e90539bd6d1c518e848298564a098c300866bc + +use super::{IoBuf, IoBufMut, Slice}; + +use std::ops; +use std::ptr; + +/// A possibly bounded view into an owned [`IoBuf`] buffer. +/// +/// Because buffers are passed by ownership to the runtime, Rust's slice API +/// (`&buf[..]`) cannot be used. Instead, `tokio-uring` provides an owned slice +/// API: [`.slice()`]. The method takes ownership of the buffer and returns a +/// [`Slice`] value that tracks the requested range. +/// +/// This trait provides a generic way to use buffers and `Slice` views +/// into such buffers with `io-uring` operations. +/// +/// [`.slice()`]: BoundedBuf::slice +pub trait BoundedBuf: Unpin + 'static { + /// The type of the underlying buffer. + type Buf: IoBuf; + + /// The type representing the range bounds of the view. + type Bounds: ops::RangeBounds; + + /// Returns a view of the buffer with the specified range. + /// + /// This method is similar to Rust's slicing (`&buf[..]`), but takes + /// ownership of the buffer. The range bounds are specified against + /// the possibly offset beginning of the `self` view into the buffer + /// and the end bound, if specified, must not exceed the view's total size. + /// Note that the range may extend into the uninitialized part of the + /// buffer, but it must start (if so bounded) in the initialized part + /// or immediately adjacent to it. + /// + /// # Panics + /// + /// If the range is invalid with regard to the recipient's total size or + /// the length of its initialized part, the implementation of this method + /// should panic. + fn slice(self, range: impl ops::RangeBounds) -> Slice; + + /// Returns a `Slice` with the view's full range. + /// + /// This method is to be used by the `tokio-uring` runtime and it is not + /// expected for users to call it directly. + fn slice_full(self) -> Slice; + + /// Gets a reference to the underlying buffer. + fn get_buf(&self) -> &Self::Buf; + + /// Returns the range bounds for this view. + fn bounds(&self) -> Self::Bounds; + + /// Constructs a view from an underlying buffer and range bounds. + fn from_buf_bounds(buf: Self::Buf, bounds: Self::Bounds) -> Self; + + /// Like [`IoBuf::stable_ptr`], + /// but possibly offset to the view's starting position. + fn stable_ptr(&self) -> *const u8; + + /// Number of initialized bytes available via this view. + fn bytes_init(&self) -> usize; + + /// Total size of the view, including uninitialized memory, if any. + fn bytes_total(&self) -> usize; +} + +impl BoundedBuf for T { + type Buf = Self; + type Bounds = ops::RangeFull; + + fn slice(self, range: impl ops::RangeBounds) -> Slice { + use ops::Bound; + + let begin = match range.start_bound() { + Bound::Included(&n) => n, + Bound::Excluded(&n) => n.checked_add(1).expect("out of range"), + Bound::Unbounded => 0, + }; + + assert!(begin < self.bytes_total()); + + let end = match range.end_bound() { + Bound::Included(&n) => n.checked_add(1).expect("out of range"), + Bound::Excluded(&n) => n, + Bound::Unbounded => self.bytes_total(), + }; + + assert!(end <= self.bytes_total()); + assert!(begin <= self.bytes_init()); + + Slice::new(self, begin, end) + } + + fn slice_full(self) -> Slice { + let end = self.bytes_total(); + Slice::new(self, 0, end) + } + + fn get_buf(&self) -> &Self { + self + } + + fn bounds(&self) -> Self::Bounds { + .. + } + + fn from_buf_bounds(buf: Self, _: ops::RangeFull) -> Self { + buf + } + + fn stable_ptr(&self) -> *const u8 { + IoBuf::stable_ptr(self) + } + + fn bytes_init(&self) -> usize { + IoBuf::bytes_init(self) + } + + fn bytes_total(&self) -> usize { + IoBuf::bytes_total(self) + } +} + +/// A possibly bounded view into an owned [`IoBufMut`] buffer. +/// +/// This trait provides a generic way to use mutable buffers and `Slice` views +/// into such buffers with `io-uring` operations. +pub trait BoundedBufMut: BoundedBuf { + /// The type of the underlying buffer. + type BufMut: IoBufMut; + + /// Like [`IoBufMut::stable_mut_ptr`], + /// but possibly offset to the view's starting position. + fn stable_mut_ptr(&mut self) -> *mut u8; + + /// Like [`IoBufMut::set_init`], + /// but the position is possibly offset to the view's starting position. + /// + /// # Safety + /// + /// The caller must ensure that all bytes starting at `stable_mut_ptr()` up + /// to `pos` are initialized and owned by the buffer. + unsafe fn set_init(&mut self, pos: usize); + + /// Copies the given byte slice into the buffer, starting at + /// this view's offset. + /// + /// # Panics + /// + /// If the slice's length exceeds the destination's total capacity, + /// this method panics. + fn put_slice(&mut self, src: &[u8]) { + assert!(self.bytes_total() >= src.len()); + let dst = self.stable_mut_ptr(); + + // Safety: + // dst pointer validity is ensured by stable_mut_ptr; + // the length is checked to not exceed the view's total capacity; + // src (immutable) and dst (mutable) cannot point to overlapping memory; + // after copying the amount of bytes given by the slice, it's safe + // to mark them as initialized in the buffer. + unsafe { + ptr::copy_nonoverlapping(src.as_ptr(), dst, src.len()); + self.set_init(src.len()); + } + } +} + +impl BoundedBufMut for T { + type BufMut = T; + + fn stable_mut_ptr(&mut self) -> *mut u8 { + IoBufMut::stable_mut_ptr(self) + } + + unsafe fn set_init(&mut self, pos: usize) { + IoBufMut::set_init(self, pos) + } +} diff --git a/uring-common/src/buf/io_buf.rs b/uring-common/src/buf/io_buf.rs new file mode 100644 index 0000000..1ca2c75 --- /dev/null +++ b/uring-common/src/buf/io_buf.rs @@ -0,0 +1,141 @@ +// Copyright (c) 2021 Carl Lerche +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +// +// +// Based on tokio-uring.git:d5e90539bd6d1c518e848298564a098c300866bc + +/// An `io-uring` compatible buffer. +/// +/// The `IoBuf` trait is implemented by buffer types that can be used with +/// io-uring operations. Users will not need to use this trait directly. +/// The [`BoundedBuf`] trait provides some useful methods including `slice`. +/// +/// # Safety +/// +/// Buffers passed to `io-uring` operations must reference a stable memory +/// region. While the runtime holds ownership to a buffer, the pointer returned +/// by `stable_ptr` must remain valid even if the `IoBuf` value is moved. +/// +/// [`BoundedBuf`]: crate::buf::BoundedBuf +pub unsafe trait IoBuf: Unpin + 'static { + /// Returns a raw pointer to the vector’s buffer. + /// + /// This method is to be used by the `tokio-uring` runtime and it is not + /// expected for users to call it directly. + /// + /// The implementation must ensure that, while the `tokio-uring` runtime + /// owns the value, the pointer returned by `stable_ptr` **does not** + /// change. + fn stable_ptr(&self) -> *const u8; + + /// Number of initialized bytes. + /// + /// This method is to be used by the `tokio-uring` runtime and it is not + /// expected for users to call it directly. + /// + /// For `Vec`, this is identical to `len()`. + fn bytes_init(&self) -> usize; + + /// Total size of the buffer, including uninitialized memory, if any. + /// + /// This method is to be used by the `tokio-uring` runtime and it is not + /// expected for users to call it directly. + /// + /// For `Vec`, this is identical to `capacity()`. + fn bytes_total(&self) -> usize; +} + +unsafe impl IoBuf for Vec { + fn stable_ptr(&self) -> *const u8 { + self.as_ptr() + } + + fn bytes_init(&self) -> usize { + self.len() + } + + fn bytes_total(&self) -> usize { + self.capacity() + } +} + +unsafe impl IoBuf for &'static [u8] { + fn stable_ptr(&self) -> *const u8 { + self.as_ptr() + } + + fn bytes_init(&self) -> usize { + <[u8]>::len(self) + } + + fn bytes_total(&self) -> usize { + self.bytes_init() + } +} + +unsafe impl IoBuf for &'static str { + fn stable_ptr(&self) -> *const u8 { + self.as_ptr() + } + + fn bytes_init(&self) -> usize { + ::len(self) + } + + fn bytes_total(&self) -> usize { + self.bytes_init() + } +} + +#[cfg(feature = "bytes")] +unsafe impl IoBuf for bytes::Bytes { + fn stable_ptr(&self) -> *const u8 { + self.as_ptr() + } + + fn bytes_init(&self) -> usize { + self.len() + } + + fn bytes_total(&self) -> usize { + self.len() + } +} + +#[cfg(feature = "bytes")] +unsafe impl IoBuf for bytes::BytesMut { + fn stable_ptr(&self) -> *const u8 { + self.as_ptr() + } + + fn bytes_init(&self) -> usize { + self.len() + } + + fn bytes_total(&self) -> usize { + self.capacity() + } +} diff --git a/uring-common/src/buf/io_buf_mut.rs b/uring-common/src/buf/io_buf_mut.rs new file mode 100644 index 0000000..5e661f9 --- /dev/null +++ b/uring-common/src/buf/io_buf_mut.rs @@ -0,0 +1,89 @@ +// Copyright (c) 2021 Carl Lerche +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +// +// +// Based on tokio-uring.git:d5e90539bd6d1c518e848298564a098c300866bc + +use crate::buf::IoBuf; + +/// A mutable`io-uring` compatible buffer. +/// +/// The `IoBufMut` trait is implemented by buffer types that can be used with +/// io-uring operations. Users will not need to use this trait directly. +/// +/// # Safety +/// +/// Buffers passed to `io-uring` operations must reference a stable memory +/// region. While the runtime holds ownership to a buffer, the pointer returned +/// by `stable_mut_ptr` must remain valid even if the `IoBufMut` value is moved. +pub unsafe trait IoBufMut: IoBuf { + /// Returns a raw mutable pointer to the vector’s buffer. + /// + /// This method is to be used by the `tokio-uring` runtime and it is not + /// expected for users to call it directly. + /// + /// The implementation must ensure that, while the `tokio-uring` runtime + /// owns the value, the pointer returned by `stable_mut_ptr` **does not** + /// change. + fn stable_mut_ptr(&mut self) -> *mut u8; + + /// Updates the number of initialized bytes. + /// + /// If the specified `pos` is greater than the value returned by + /// [`IoBuf::bytes_init`], it becomes the new water mark as returned by + /// `IoBuf::bytes_init`. + /// + /// # Safety + /// + /// The caller must ensure that all bytes starting at `stable_mut_ptr()` up + /// to `pos` are initialized and owned by the buffer. + unsafe fn set_init(&mut self, pos: usize); +} + +unsafe impl IoBufMut for Vec { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.as_mut_ptr() + } + + unsafe fn set_init(&mut self, init_len: usize) { + if self.len() < init_len { + self.set_len(init_len); + } + } +} + +#[cfg(feature = "bytes")] +unsafe impl IoBufMut for bytes::BytesMut { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.as_mut_ptr() + } + + unsafe fn set_init(&mut self, init_len: usize) { + if self.len() < init_len { + self.set_len(init_len); + } + } +} diff --git a/uring-common/src/buf/slice.rs b/uring-common/src/buf/slice.rs new file mode 100644 index 0000000..095c765 --- /dev/null +++ b/uring-common/src/buf/slice.rs @@ -0,0 +1,178 @@ +// Copyright (c) 2021 Carl Lerche +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +// +// +// Based on tokio-uring.git:d5e90539bd6d1c518e848298564a098c300866bc + +use super::{BoundedBuf, BoundedBufMut, IoBuf, IoBufMut}; + +use std::cmp; +use std::ops; + +/// An owned view into a contiguous sequence of bytes. +/// +/// This is similar to Rust slices (`&buf[..]`) but owns the underlying buffer. +/// This type is useful for performing io-uring read and write operations using +/// a subset of a buffer. +/// +/// Slices are created using [`BoundedBuf::slice`]. +pub struct Slice { + buf: T, + begin: usize, + end: usize, +} + +impl Slice { + pub(crate) fn new(buf: T, begin: usize, end: usize) -> Slice { + Slice { buf, begin, end } + } + + /// Offset in the underlying buffer at which this slice starts. + + pub fn begin(&self) -> usize { + self.begin + } + + /// Ofset in the underlying buffer at which this slice ends. + pub fn end(&self) -> usize { + self.end + } + + /// Gets a reference to the underlying buffer. + /// + /// This method escapes the slice's view. + pub fn get_ref(&self) -> &T { + &self.buf + } + + /// Gets a mutable reference to the underlying buffer. + /// + /// This method escapes the slice's view. + pub fn get_mut(&mut self) -> &mut T { + &mut self.buf + } + + /// Unwraps this `Slice`, returning the underlying buffer. + pub fn into_inner(self) -> T { + self.buf + } +} + +impl ops::Deref for Slice { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + let buf_bytes = super::deref(&self.buf); + let end = cmp::min(self.end, buf_bytes.len()); + &buf_bytes[self.begin..end] + } +} + +impl ops::DerefMut for Slice { + fn deref_mut(&mut self) -> &mut [u8] { + let buf_bytes = super::deref_mut(&mut self.buf); + let end = cmp::min(self.end, buf_bytes.len()); + &mut buf_bytes[self.begin..end] + } +} + +impl BoundedBuf for Slice { + type Buf = T; + type Bounds = ops::Range; + + fn slice(self, range: impl ops::RangeBounds) -> Slice { + use ops::Bound; + + let begin = match range.start_bound() { + Bound::Included(&n) => self.begin.checked_add(n).expect("out of range"), + Bound::Excluded(&n) => self + .begin + .checked_add(n) + .and_then(|x| x.checked_add(1)) + .expect("out of range"), + Bound::Unbounded => self.begin, + }; + + assert!(begin <= self.end); + + let end = match range.end_bound() { + Bound::Included(&n) => self + .begin + .checked_add(n) + .and_then(|x| x.checked_add(1)) + .expect("out of range"), + Bound::Excluded(&n) => self.begin.checked_add(n).expect("out of range"), + Bound::Unbounded => self.end, + }; + + assert!(end <= self.end); + assert!(begin <= self.buf.bytes_init()); + + Slice::new(self.buf, begin, end) + } + + fn slice_full(self) -> Slice { + self + } + + fn get_buf(&self) -> &T { + &self.buf + } + + fn bounds(&self) -> Self::Bounds { + self.begin..self.end + } + + fn from_buf_bounds(buf: T, bounds: Self::Bounds) -> Self { + assert!(bounds.start <= buf.bytes_init()); + assert!(bounds.end <= buf.bytes_total()); + Slice::new(buf, bounds.start, bounds.end) + } + + fn stable_ptr(&self) -> *const u8 { + super::deref(&self.buf)[self.begin..].as_ptr() + } + + fn bytes_init(&self) -> usize { + ops::Deref::deref(self).len() + } + + fn bytes_total(&self) -> usize { + self.end - self.begin + } +} + +impl BoundedBufMut for Slice { + type BufMut = T; + + fn stable_mut_ptr(&mut self) -> *mut u8 { + super::deref_mut(&mut self.buf)[self.begin..].as_mut_ptr() + } + + unsafe fn set_init(&mut self, pos: usize) { + self.buf.set_init(self.begin + pos); + } +} diff --git a/uring-common/src/lib.rs b/uring-common/src/lib.rs new file mode 100644 index 0000000..b07ba19 --- /dev/null +++ b/uring-common/src/lib.rs @@ -0,0 +1,3 @@ +pub mod buf; + +pub use io_uring;