Skip to content

Commit

Permalink
Embrace unsafe_op_in_unsafe_fn (#116)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgeier authored Apr 3, 2024
1 parent 757ea3a commit d14aa63
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 55 deletions.
95 changes: 51 additions & 44 deletions src/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,7 @@
//! slice[..mid].copy_to_uninit(first);
//! slice[mid..].copy_to_uninit(second);
//! // SAFETY: All slots have been initialized
//! unsafe {
//! chunk.commit_all();
//! }
//! unsafe { chunk.commit_all() };
//! Ok(())
//! } else {
//! Err(slice)
Expand Down Expand Up @@ -136,9 +134,7 @@
//! slice[..mid].copy_to_uninit(first);
//! slice[mid..end].copy_to_uninit(second);
//! // SAFETY: All slots have been initialized
//! unsafe {
//! chunk.commit_all();
//! }
//! unsafe { chunk.commit_all() };
//! end
//! }
//! ```
Expand Down Expand Up @@ -256,6 +252,7 @@ impl<T> Producer<T> {
let tail = self.buffer.collapse_position(tail);
let first_len = n.min(self.buffer.capacity - tail);
Ok(WriteChunkUninit {
// SAFETY: tail has been updated to a valid position.
first_ptr: unsafe { self.buffer.data_ptr.add(tail) },
first_len,
second_ptr: self.buffer.data_ptr,
Expand Down Expand Up @@ -307,6 +304,7 @@ impl<T> Consumer<T> {
let head = self.buffer.collapse_position(head);
let first_len = n.min(self.buffer.capacity - head);
Ok(ReadChunk {
// SAFETY: head has been updated to a valid position.
first_ptr: unsafe { self.buffer.data_ptr.add(head) },
first_len,
second_ptr: self.buffer.data_ptr,
Expand All @@ -330,11 +328,9 @@ impl<T> Drop for WriteChunk<'_, T> {
fn drop(&mut self) {
// NB: If `commit()` or `commit_all()` has been called, `self.0` is `None`.
if let Some(mut chunk) = self.0.take() {
// No part of the chunk has been committed, all slots are dropped.
// SAFETY: All slots have been initialized in From::from().
unsafe {
// No part of the chunk has been committed, all slots are dropped.
chunk.drop_suffix(0);
}
unsafe { chunk.drop_suffix(0) };
}
}
}
Expand All @@ -346,14 +342,12 @@ where
/// Fills all slots with the [`Default`] value.
fn from(chunk: WriteChunkUninit<'a, T>) -> Self {
for i in 0..chunk.first_len {
unsafe {
chunk.first_ptr.add(i).write(Default::default());
}
// SAFETY: i is in a valid range.
unsafe { chunk.first_ptr.add(i).write(Default::default()) };
}
for i in 0..chunk.second_len {
unsafe {
chunk.second_ptr.add(i).write(Default::default());
}
// SAFETY: i is in a valid range.
unsafe { chunk.second_ptr.add(i).write(Default::default()) };
}
WriteChunk(Some(chunk), PhantomData)
}
Expand All @@ -380,7 +374,8 @@ where
pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
// self.0 is always Some(chunk).
let chunk = self.0.as_ref().unwrap();
// SAFETY: All slots have been initialized in From::from().
// SAFETY: The pointers and lengths have been computed correctly in write_chunk_uninit()
// and all slots have been initialized in From::from().
unsafe {
(
core::slice::from_raw_parts_mut(chunk.first_ptr, chunk.first_len),
Expand Down Expand Up @@ -414,9 +409,7 @@ where
// self.0 is always Some(chunk).
let chunk = self.0.take().unwrap();
// SAFETY: All slots have been initialized in From::from().
unsafe {
chunk.commit_all();
}
unsafe { chunk.commit_all() };
// `self` is dropped here, with `self.0` being set to `None`.
}

Expand Down Expand Up @@ -447,7 +440,7 @@ pub struct WriteChunkUninit<'a, T> {
producer: &'a Producer<T>,
}

// WriteChunkUninit only exists while a unique reference to the Producer is held.
// SAFETY: WriteChunkUninit only exists while a unique reference to the Producer is held.
// It is therefore safe to move it to another thread.
unsafe impl<T: Send> Send for WriteChunkUninit<'_, T> {}

Expand All @@ -467,6 +460,7 @@ impl<T> WriteChunkUninit<'_, T> {
/// they will *not* become available for reading and
/// they will be leaked (which is only relevant if `T` implements [`Drop`]).
pub fn as_mut_slices(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) {
// SAFETY: The pointers and lengths have been computed correctly in write_chunk_uninit().
unsafe {
(
core::slice::from_raw_parts_mut(self.first_ptr.cast(), self.first_len),
Expand All @@ -483,20 +477,22 @@ impl<T> WriteChunkUninit<'_, T> {
///
/// # Safety
///
/// The user must make sure that the first `n` elements have been initialized.
/// The caller must make sure that the first `n` elements have been initialized.
pub unsafe fn commit(self, n: usize) {
assert!(n <= self.len(), "cannot commit more than chunk size");
self.commit_unchecked(n);
// SAFETY: Delegated to the caller.
unsafe { self.commit_unchecked(n) };
}

/// Makes the whole chunk available for reading.
///
/// # Safety
///
/// The user must make sure that all elements have been initialized.
/// The caller must make sure that all elements have been initialized.
pub unsafe fn commit_all(self) {
let slots = self.len();
self.commit_unchecked(slots);
// SAFETY: Delegated to the caller.
unsafe { self.commit_unchecked(slots) };
}

unsafe fn commit_unchecked(self, n: usize) -> usize {
Expand Down Expand Up @@ -569,9 +565,7 @@ impl<T> WriteChunkUninit<'_, T> {
match iter.next() {
Some(item) => {
// SAFETY: It is allowed to write to this memory slot
unsafe {
ptr.add(i).write(item);
}
unsafe { ptr.add(i).write(item) };
iterated += 1;
}
None => break 'outer,
Expand Down Expand Up @@ -600,10 +594,12 @@ impl<T> WriteChunkUninit<'_, T> {
unsafe fn drop_suffix(&mut self, n: usize) {
// NB: If n >= self.len(), the loops are not entered.
for i in n..self.first_len {
self.first_ptr.add(i).drop_in_place();
// SAFETY: The caller must make sure that all slots are initialized.
unsafe { self.first_ptr.add(i).drop_in_place() };
}
for i in n.saturating_sub(self.first_len)..self.second_len {
self.second_ptr.add(i).drop_in_place();
// SAFETY: The caller must make sure that all slots are initialized.
unsafe { self.second_ptr.add(i).drop_in_place() };
}
}
}
Expand All @@ -622,7 +618,7 @@ pub struct ReadChunk<'a, T> {
consumer: &'a Consumer<T>,
}

// ReadChunk only exists while a unique reference to the Consumer is held.
// SAFETY: ReadChunk only exists while a unique reference to the Consumer is held.
// It is therefore safe to move it to another thread.
unsafe impl<T: Send> Send for ReadChunk<'_, T> {}

Expand All @@ -640,10 +636,13 @@ impl<T> ReadChunk<'_, T> {
/// You can "peek" at the contained values by simply not calling any of the "commit" methods.
#[must_use]
pub fn as_slices(&self) -> (&[T], &[T]) {
(
unsafe { core::slice::from_raw_parts(self.first_ptr, self.first_len) },
unsafe { core::slice::from_raw_parts(self.second_ptr, self.second_len) },
)
// SAFETY: The pointers and lengths have been computed correctly in read_chunk().
unsafe {
(
core::slice::from_raw_parts(self.first_ptr, self.first_len),
core::slice::from_raw_parts(self.second_ptr, self.second_len),
)
}
}

/// Returns two mutable slices for reading from the requested slots.
Expand All @@ -659,10 +658,13 @@ impl<T> ReadChunk<'_, T> {
/// (e.g. streaming decryption), in which case this version can be used.
#[must_use]
pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
(
unsafe { core::slice::from_raw_parts_mut(self.first_ptr, self.first_len) },
unsafe { core::slice::from_raw_parts_mut(self.second_ptr, self.second_len) },
)
// SAFETY: The pointers and lengths have been computed correctly in read_chunk().
unsafe {
(
core::slice::from_raw_parts_mut(self.first_ptr, self.first_len),
core::slice::from_raw_parts_mut(self.second_ptr, self.second_len),
)
}
}

/// Drops the first `n` slots of the chunk, making the space available for writing again.
Expand Down Expand Up @@ -719,23 +721,27 @@ impl<T> ReadChunk<'_, T> {
/// ```
pub fn commit(self, n: usize) {
assert!(n <= self.len(), "cannot commit more than chunk size");
// SAFETY: self.len() initialized elements have been obtained in read_chunk().
unsafe { self.commit_unchecked(n) };
}

/// Drops all slots of the chunk, making the space available for writing again.
pub fn commit_all(self) {
let slots = self.len();
// SAFETY: self.len() initialized elements have been obtained in read_chunk().
unsafe { self.commit_unchecked(slots) };
}

unsafe fn commit_unchecked(self, n: usize) -> usize {
let first_len = self.first_len.min(n);
for i in 0..first_len {
self.first_ptr.add(i).drop_in_place();
// SAFETY: The caller must make sure that there are n initialized elements.
unsafe { self.first_ptr.add(i).drop_in_place() };
}
let second_len = self.second_len.min(n - first_len);
for i in 0..second_len {
self.second_ptr.add(i).drop_in_place();
// SAFETY: The caller must make sure that there are n initialized elements.
unsafe { self.second_ptr.add(i).drop_in_place() };
}
let c = self.consumer;
// "head" is only ever written by the consumer thread, "Relaxed" is enough
Expand Down Expand Up @@ -806,8 +812,10 @@ impl<'a, T> Iterator for ReadChunkIntoIter<'a, T> {
#[inline]
fn next(&mut self) -> Option<Self::Item> {
let ptr = if self.iterated < self.chunk.first_len {
// SAFETY: first_len is valid.
unsafe { self.chunk.first_ptr.add(self.iterated) }
} else if self.iterated < self.chunk.first_len + self.chunk.second_len {
// SAFETY: first_len and second_len are valid.
unsafe {
self.chunk
.second_ptr
Expand All @@ -817,6 +825,7 @@ impl<'a, T> Iterator for ReadChunkIntoIter<'a, T> {
return None;
};
self.iterated += 1;
// SAFETY: ptr points to an initialized slot.
Some(unsafe { ptr.read() })
}

Expand Down Expand Up @@ -848,9 +857,7 @@ impl std::io::Write for Producer<u8> {
buf[..mid].copy_to_uninit(first);
buf[mid..end].copy_to_uninit(second);
// SAFETY: All slots have been initialized
unsafe {
chunk.commit_all();
}
unsafe { chunk.commit_all() };
Ok(end)
}

Expand Down
31 changes: 20 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
#![cfg_attr(not(feature = "std"), no_std)]
#![warn(rust_2018_idioms)]
#![deny(missing_docs, missing_debug_implementations)]
#![deny(unsafe_op_in_unsafe_fn)]
#![warn(clippy::undocumented_unsafe_blocks, clippy::unnecessary_safety_comment)]

extern crate alloc;

Expand All @@ -58,7 +60,7 @@ use core::marker::PhantomData;
use core::mem::{ManuallyDrop, MaybeUninit};
use core::sync::atomic::{AtomicUsize, Ordering};

#[allow(dead_code)]
#[allow(dead_code, clippy::undocumented_unsafe_blocks)]
mod cache_padded;
use cache_padded::CachePadded;

Expand Down Expand Up @@ -169,7 +171,9 @@ impl<T> RingBuffer<T> {
/// If `pos == 0 && capacity == 0`, the returned pointer must not be dereferenced!
unsafe fn slot_ptr(&self, pos: usize) -> *mut T {
debug_assert!(pos == 0 || pos < 2 * self.capacity);
self.data_ptr.add(self.collapse_position(pos))
let pos = self.collapse_position(pos);
// SAFETY: The caller must ensure a valid pos.
unsafe { self.data_ptr.add(pos) }
}

/// Increments a position by going `n` slots forward.
Expand Down Expand Up @@ -217,16 +221,14 @@ impl<T> Drop for RingBuffer<T> {

// Loop over all slots that hold a value and drop them.
while head != tail {
unsafe {
self.slot_ptr(head).drop_in_place();
}
// SAFETY: All slots between head and tail have been initialized.
unsafe { self.slot_ptr(head).drop_in_place() };
head = self.increment1(head);
}

// Finally, deallocate the buffer, but don't run any destructors.
unsafe {
Vec::from_raw_parts(self.data_ptr, 0, self.capacity);
}
// SAFETY: data_ptr and capacity are still valid from the original initialization.
unsafe { Vec::from_raw_parts(self.data_ptr, 0, self.capacity) };
}
}

Expand Down Expand Up @@ -283,6 +285,8 @@ pub struct Producer<T> {
cached_head: Cell<usize>,
}

// SAFETY: After moving a Producer to another thread, there is still only a single thread
// that can access the producer side of the queue.
unsafe impl<T: Send> Send for Producer<T> {}

impl<T> Producer<T> {
Expand All @@ -307,9 +311,8 @@ impl<T> Producer<T> {
/// ```
pub fn push(&mut self, value: T) -> Result<(), PushError<T>> {
if let Some(tail) = self.next_tail() {
unsafe {
self.buffer.slot_ptr(tail).write(value);
}
// SAFETY: tail points to an empty slot.
unsafe { self.buffer.slot_ptr(tail).write(value) };
let tail = self.buffer.increment1(tail);
self.buffer.tail.store(tail, Ordering::Release);
Ok(())
Expand Down Expand Up @@ -485,6 +488,8 @@ pub struct Consumer<T> {
cached_tail: Cell<usize>,
}

// SAFETY: After moving a Consumer to another thread, there is still only a single thread
// that can access the consumer side of the queue.
unsafe impl<T: Send> Send for Consumer<T> {}

impl<T> Consumer<T> {
Expand Down Expand Up @@ -519,6 +524,7 @@ impl<T> Consumer<T> {
/// ```
pub fn pop(&mut self) -> Result<T, PopError> {
if let Some(head) = self.next_head() {
// SAFETY: head points to an initialized slot.
let value = unsafe { self.buffer.slot_ptr(head).read() };
let head = self.buffer.increment1(head);
self.buffer.head.store(head, Ordering::Release);
Expand Down Expand Up @@ -548,6 +554,7 @@ impl<T> Consumer<T> {
/// ```
pub fn peek(&self) -> Result<&T, PeekError> {
if let Some(head) = self.next_head() {
// SAFETY: head points to an initialized slot.
Ok(unsafe { &*self.buffer.slot_ptr(head) })
} else {
Err(PeekError::Empty)
Expand Down Expand Up @@ -718,6 +725,8 @@ impl<T: Copy> CopyToUninit<T> for [T] {
"source slice length does not match destination slice length"
);
let dst_ptr = dst.as_mut_ptr().cast();
// SAFETY: The lengths have been checked to be equal and
// the mutable reference makes sure that there is no overlap.
unsafe {
self.as_ptr().copy_to_nonoverlapping(dst_ptr, self.len());
core::slice::from_raw_parts_mut(dst_ptr, self.len())
Expand Down

0 comments on commit d14aa63

Please sign in to comment.