From d10089e14e50b78635c230f4135144fc5416e93f Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Sat, 11 Dec 2021 21:54:25 +0100 Subject: [PATCH 01/16] Bump MSRV to 1.44 --- .clippy.toml | 2 +- .github/workflows/main.yml | 2 +- README.md | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.clippy.toml b/.clippy.toml index 0a54853..e585a46 100644 --- a/.clippy.toml +++ b/.clippy.toml @@ -1 +1 @@ -msrv = "1.36.0" +msrv = "1.44.0" diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 8446583..f9ce0b7 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -40,7 +40,7 @@ jobs: msrv: strategy: matrix: - rust-version: [1.36.0] + rust-version: [1.44.0] runs-on: ubuntu-latest steps: - name: Clone Git repository diff --git a/README.md b/README.md index 6992a66..071f99d 100644 --- a/README.md +++ b/README.md @@ -78,7 +78,7 @@ You might have to adapt the `--target` option to your system (see e.g. `rustup s Minimum Supported `rustc` Version --------------------------------- -This crate's minimum supported `rustc` version (MSRV) is `1.36.0`. +This crate's minimum supported `rustc` version (MSRV) is `1.44.0`. The MSRV is not expected to be updated frequently, but if it is, there will be (at least) a *minor* version bump. From a467d5a85ec4c7ce5f8daf7a306fe1b5165ef715 Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Sat, 11 Dec 2021 19:49:15 +0100 Subject: [PATCH 02/16] Use a single dynamic allocation (making RingBuffer a DST) --- src/chunks.rs | 231 ++++++++++++++++++++++++++------------------------ src/lib.rs | 217 +++++++++++++++++++++++++++++++++-------------- 2 files changed, 276 insertions(+), 172 deletions(-) diff --git a/src/chunks.rs b/src/chunks.rs index 01ff6c7..02d56bf 100644 --- a/src/chunks.rs +++ b/src/chunks.rs @@ -239,26 +239,30 @@ impl Producer { /// see [`Producer::write_chunk()`]. pub fn write_chunk_uninit(&mut self, n: usize) -> Result, ChunkError> { let tail = self.cached_tail.get(); + let buffer = self.buffer(); // Check if the queue has *possibly* not enough slots. - if self.buffer.capacity - self.buffer.distance(self.cached_head.get(), tail) < n { + if buffer.capacity() - buffer.distance(self.cached_head.get(), tail) < n { // Refresh the head ... - let head = self.buffer.head.load(Ordering::Acquire); + let head = buffer.head.load(Ordering::Acquire); self.cached_head.set(head); // ... and check if there *really* are not enough slots. - let slots = self.buffer.capacity - self.buffer.distance(head, tail); + let slots = buffer.capacity() - buffer.distance(head, tail); if slots < n { return Err(ChunkError::TooFewSlots(slots)); } } - let tail = self.buffer.collapse_position(tail); - let first_len = n.min(self.buffer.capacity - tail); + let tail = buffer.collapse_position(tail); + let first_len = n.min(buffer.capacity() - tail); + let slice_ptr = buffer.slots.get(); + // SAFETY: All indices are valid. Since we know we have exclusive access + // to the sub-slices and they are non-overlapping, we can make them mutable. + let first_slice = unsafe { (*slice_ptr).get_unchecked_mut(tail..tail + first_len) }; + let second_slice = unsafe { (*slice_ptr).get_unchecked_mut(0..n - first_len) }; Ok(WriteChunkUninit { - first_ptr: unsafe { self.buffer.data_ptr.add(tail) }, - first_len, - second_ptr: self.buffer.data_ptr, - second_len: n - first_len, + first_slice, + second_slice, producer: self, }) } @@ -288,27 +292,30 @@ impl Consumer { /// See the documentation of the [`chunks`](crate::chunks#examples) module. pub fn read_chunk(&mut self, n: usize) -> Result, ChunkError> { let head = self.cached_head.get(); + let buffer = self.buffer(); // Check if the queue has *possibly* not enough slots. - if self.buffer.distance(head, self.cached_tail.get()) < n { + if buffer.distance(head, self.cached_tail.get()) < n { // Refresh the tail ... - let tail = self.buffer.tail.load(Ordering::Acquire); + let tail = buffer.tail.load(Ordering::Acquire); self.cached_tail.set(tail); // ... and check if there *really* are not enough slots. - let slots = self.buffer.distance(head, tail); + let slots = buffer.distance(head, tail); if slots < n { return Err(ChunkError::TooFewSlots(slots)); } } - - let head = self.buffer.collapse_position(head); - let first_len = n.min(self.buffer.capacity - head); + let head = buffer.collapse_position(head); + let first_len = n.min(buffer.capacity() - head); + let slice_ptr = buffer.slots.get(); + // SAFETY: All indices are valid. Since we know we have exclusive access + // to the sub-slices and they are non-overlapping, we can make them mutable. + let first_slice = unsafe { (*slice_ptr).get_unchecked_mut(head..head + first_len) }; + let second_slice = unsafe { (*slice_ptr).get_unchecked_mut(0..n - first_len) }; Ok(ReadChunk { - first_ptr: unsafe { self.buffer.data_ptr.add(head) }, - first_len, - second_ptr: self.buffer.data_ptr, - second_len: n - first_len, + first_slice, + second_slice, consumer: self, }) } @@ -343,14 +350,13 @@ where { /// Fills all slots with the [`Default`] value. fn from(chunk: WriteChunkUninit<'a, T>) -> Self { - for i in 0..chunk.first_len { - unsafe { - chunk.first_ptr.add(i).write(Default::default()); - } - } - for i in 0..chunk.second_len { + for slot in chunk + .first_slice + .iter_mut() + .chain(chunk.second_slice.iter_mut()) + { unsafe { - chunk.second_ptr.add(i).write(Default::default()); + slot.as_mut_ptr().write(Default::default()); } } WriteChunk(Some(chunk), PhantomData) @@ -377,12 +383,12 @@ where /// they will be leaked (which is only relevant if `T` implements [`Drop`]). pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) { // self.0 is always Some(chunk). - let chunk = self.0.as_ref().unwrap(); + let chunk = self.0.as_mut().unwrap(); // SAFETY: All slots have been initialized in From::from(). unsafe { ( - core::slice::from_raw_parts_mut(chunk.first_ptr, chunk.first_len), - core::slice::from_raw_parts_mut(chunk.second_ptr, chunk.second_len), + &mut *(chunk.first_slice as *mut _ as *mut _), + &mut *(chunk.second_slice as *mut _ as *mut _), ) } } @@ -436,12 +442,10 @@ where /// Structure for writing into multiple (uninitialized) slots in one go. /// /// This is returned from [`Producer::write_chunk_uninit()`]. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug)] pub struct WriteChunkUninit<'a, T> { - first_ptr: *mut T, - first_len: usize, - second_ptr: *mut T, - second_len: usize, + first_slice: &'a mut [MaybeUninit], + second_slice: &'a mut [MaybeUninit], producer: &'a Producer, } @@ -449,6 +453,14 @@ pub struct WriteChunkUninit<'a, T> { // It is therefore safe to move it to another thread. unsafe impl Send for WriteChunkUninit<'_, T> {} +impl PartialEq for WriteChunkUninit<'_, T> { + fn eq(&self, other: &Self) -> bool { + core::ptr::eq(self, other) + } +} + +impl Eq for WriteChunkUninit<'_, T> {} + impl WriteChunkUninit<'_, T> { /// Returns two slices for writing to the requested slots. /// @@ -465,12 +477,7 @@ impl WriteChunkUninit<'_, T> { /// they will *not* become available for reading and /// they will be leaked (which is only relevant if `T` implements [`Drop`]). pub fn as_mut_slices(&mut self) -> (&mut [MaybeUninit], &mut [MaybeUninit]) { - unsafe { - ( - core::slice::from_raw_parts_mut(self.first_ptr as *mut _, self.first_len), - core::slice::from_raw_parts_mut(self.second_ptr as *mut _, self.second_len), - ) - } + (self.first_slice, self.second_slice) } /// Makes the first `n` slots of the chunk available for reading. @@ -499,8 +506,8 @@ impl WriteChunkUninit<'_, T> { unsafe fn commit_unchecked(self, n: usize) -> usize { let p = self.producer; - let tail = p.buffer.increment(p.cached_tail.get(), n); - p.buffer.tail.store(tail, Ordering::Release); + let tail = p.buffer().increment(p.cached_tail.get(), n); + p.buffer().tail.store(tail, Ordering::Release); p.cached_tail.set(tail); n } @@ -556,25 +563,18 @@ impl WriteChunkUninit<'_, T> { where I: IntoIterator, { - let mut iter = iter.into_iter(); - let mut iterated = 0; - 'outer: for &(ptr, len) in &[ - (self.first_ptr, self.first_len), - (self.second_ptr, self.second_len), - ] { - for i in 0..len { - match iter.next() { - Some(item) => { - // SAFETY: It is allowed to write to this memory slot - unsafe { - ptr.add(i).write(item); - } - iterated += 1; - } - None => break 'outer, + let iterated = self + .first_slice + .iter_mut() + .chain(self.second_slice.iter_mut()) + .zip(iter) + .map(|(slot, item)| { + // SAFETY: It is allowed to write to this memory slot + unsafe { + slot.as_mut_ptr().write(item); } - } - } + }) + .count(); // SAFETY: iterated slots have been initialized above unsafe { self.commit_unchecked(iterated) } } @@ -582,13 +582,13 @@ impl WriteChunkUninit<'_, T> { /// Returns the number of slots in the chunk. #[must_use] pub fn len(&self) -> usize { - self.first_len + self.second_len + self.first_slice.len() + self.second_slice.len() } /// Returns `true` if the chunk contains no slots. #[must_use] pub fn is_empty(&self) -> bool { - self.first_len == 0 + self.first_slice.len() == 0 } /// Drops all elements starting from index `n`. @@ -596,11 +596,17 @@ impl WriteChunkUninit<'_, T> { /// All of those slots must be initialized. unsafe fn drop_suffix(&mut self, n: usize) { // NB: If n >= self.len(), the loops are not entered. - for i in n..self.first_len { - self.first_ptr.add(i).drop_in_place(); + for i in n..self.first_slice.len() { + self.first_slice + .get_unchecked_mut(i) + .as_mut_ptr() + .drop_in_place(); } - for i in n.saturating_sub(self.first_len)..self.second_len { - self.second_ptr.add(i).drop_in_place(); + for i in n.saturating_sub(self.first_slice.len())..self.second_slice.len() { + self.second_slice + .get_unchecked_mut(i) + .as_mut_ptr() + .drop_in_place(); } } } @@ -608,14 +614,12 @@ impl WriteChunkUninit<'_, T> { /// Structure for reading from multiple slots in one go. /// /// This is returned from [`Consumer::read_chunk()`]. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug)] pub struct ReadChunk<'a, T> { - // Must be "mut" for drop_in_place() - first_ptr: *mut T, - first_len: usize, - // Must be "mut" for drop_in_place() - second_ptr: *mut T, - second_len: usize, + // Must be mutable for drop_in_place() + first_slice: &'a mut [MaybeUninit], + // Must be mutable for drop_in_place() + second_slice: &'a mut [MaybeUninit], consumer: &'a Consumer, } @@ -623,6 +627,14 @@ pub struct ReadChunk<'a, T> { // It is therefore safe to move it to another thread. unsafe impl Send for ReadChunk<'_, T> {} +impl PartialEq for ReadChunk<'_, T> { + fn eq(&self, other: &Self) -> bool { + core::ptr::eq(self, other) + } +} + +impl Eq for ReadChunk<'_, T> {} + impl ReadChunk<'_, T> { /// Returns two slices for reading from the requested slots. /// @@ -637,10 +649,13 @@ impl ReadChunk<'_, T> { /// You can "peek" at the contained values by simply not calling any of the "commit" methods. #[must_use] pub fn as_slices(&self) -> (&[T], &[T]) { - ( - unsafe { core::slice::from_raw_parts(self.first_ptr, self.first_len) }, - unsafe { core::slice::from_raw_parts(self.second_ptr, self.second_len) }, - ) + // SAFETY: All slots are initialized. + unsafe { + ( + &*(self.first_slice as *const _ as *const _), + &*(self.second_slice as *const _ as *const _), + ) + } } /// Drops the first `n` slots of the chunk, making the space available for writing again. @@ -707,17 +722,17 @@ impl ReadChunk<'_, T> { } unsafe fn commit_unchecked(self, n: usize) -> usize { - let first_len = self.first_len.min(n); - for i in 0..first_len { - self.first_ptr.add(i).drop_in_place(); - } - let second_len = self.second_len.min(n - first_len); - for i in 0..second_len { - self.second_ptr.add(i).drop_in_place(); + for slot in self + .first_slice + .iter_mut() + .chain(self.second_slice.iter_mut()) + .take(n) + { + slot.as_mut_ptr().drop_in_place(); } let c = self.consumer; - let head = c.buffer.increment(c.cached_head.get(), n); - c.buffer.head.store(head, Ordering::Release); + let head = c.buffer().increment(c.cached_head.get(), n); + c.buffer().head.store(head, Ordering::Release); c.cached_head.set(head); n } @@ -725,13 +740,13 @@ impl ReadChunk<'_, T> { /// Returns the number of slots in the chunk. #[must_use] pub fn len(&self) -> usize { - self.first_len + self.second_len + self.first_slice.len() + self.second_slice.len() } /// Returns `true` if the chunk contains no slots. #[must_use] pub fn is_empty(&self) -> bool { - self.first_len == 0 + self.first_slice.len() == 0 } } @@ -745,8 +760,12 @@ impl<'a, T> IntoIterator for ReadChunk<'a, T> { /// Non-iterated items remain in the ring buffer. fn into_iter(self) -> Self::IntoIter { Self::IntoIter { - chunk: self, - iterated: 0, + chunk_size: self.len(), + iter: self + .first_slice + .iter_mut() + .chain(self.second_slice.iter_mut()), + consumer: self.consumer, } } } @@ -760,8 +779,12 @@ impl<'a, T> IntoIterator for ReadChunk<'a, T> { /// Non-iterated items remain in the ring buffer. #[derive(Debug)] pub struct ReadChunkIntoIter<'a, T> { - chunk: ReadChunk<'a, T>, - iterated: usize, + chunk_size: usize, + iter: core::iter::Chain< + core::slice::IterMut<'a, MaybeUninit>, + core::slice::IterMut<'a, MaybeUninit>, + >, + consumer: &'a mut Consumer, } impl<'a, T> Drop for ReadChunkIntoIter<'a, T> { @@ -769,9 +792,10 @@ impl<'a, T> Drop for ReadChunkIntoIter<'a, T> { /// /// Non-iterated items remain in the ring buffer and are *not* dropped. fn drop(&mut self) { - let c = &self.chunk.consumer; - let head = c.buffer.increment(c.cached_head.get(), self.iterated); - c.buffer.head.store(head, Ordering::Release); + let iterated = self.chunk_size - self.len(); + let c = &self.consumer; + let head = c.buffer().increment(c.cached_head.get(), iterated); + c.buffer().head.store(head, Ordering::Release); c.cached_head.set(head); } } @@ -780,24 +804,11 @@ impl<'a, T> Iterator for ReadChunkIntoIter<'a, T> { type Item = T; fn next(&mut self) -> Option { - let ptr = if self.iterated < self.chunk.first_len { - unsafe { self.chunk.first_ptr.add(self.iterated) } - } else if self.iterated < self.chunk.first_len + self.chunk.second_len { - unsafe { - self.chunk - .second_ptr - .add(self.iterated - self.chunk.first_len) - } - } else { - return None; - }; - self.iterated += 1; - Some(unsafe { ptr.read() }) + self.iter.next().map(|slot| unsafe { slot.as_ptr().read() }) } fn size_hint(&self) -> (usize, Option) { - let remaining = self.chunk.first_len + self.chunk.second_len - self.iterated; - (remaining, Some(remaining)) + self.iter.size_hint() } } diff --git a/src/lib.rs b/src/lib.rs index 037a4ee..f535ea4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,13 +50,12 @@ extern crate alloc; -use alloc::sync::Arc; -use alloc::vec::Vec; -use core::cell::Cell; +use core::cell::{Cell, UnsafeCell}; use core::fmt; use core::marker::PhantomData; -use core::mem::{ManuallyDrop, MaybeUninit}; -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::mem::MaybeUninit; +use core::ptr::NonNull; +use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use cache_padded::CachePadded; @@ -73,6 +72,7 @@ use chunks::WriteChunkUninit; /// /// *See also the [crate-level documentation](crate).* #[derive(Debug)] +#[repr(C)] pub struct RingBuffer { /// The head of the queue. /// @@ -84,14 +84,14 @@ pub struct RingBuffer { /// This integer is in range `0 .. 2 * capacity`. tail: CachePadded, - /// The buffer holding slots. - data_ptr: *mut T, - - /// The queue capacity. - capacity: usize, + /// `true` if one of producer/consumer has been dropped. + is_abandoned: AtomicBool, /// Indicates that dropping a `RingBuffer` may drop elements of type `T`. _marker: PhantomData, + + /// Storage for the ring buffer elements (dynamically sized). + slots: UnsafeCell<[MaybeUninit]>, } impl RingBuffer { @@ -117,22 +117,55 @@ impl RingBuffer { #[allow(clippy::new_ret_no_self)] #[must_use] pub fn new(capacity: usize) -> (Producer, Consumer) { - let buffer = Arc::new(RingBuffer { - head: CachePadded::new(AtomicUsize::new(0)), - tail: CachePadded::new(AtomicUsize::new(0)), - data_ptr: ManuallyDrop::new(Vec::with_capacity(capacity)).as_mut_ptr(), - capacity, - _marker: PhantomData, - }); + use alloc::alloc::Layout; + // Start with an empty layout ... + let layout = Layout::new::<()>(); + // ... and add all fields from RingBuffer, which must have #[repr(C)] for this to work. + let (layout, head_offset) = layout + .extend(Layout::new::>()) + .unwrap(); + assert_eq!(head_offset, 0); + let (layout, tail_offset) = layout + .extend(Layout::new::>()) + .unwrap(); + let (layout, is_abandoned_offset) = layout.extend(Layout::new::()).unwrap(); + let (layout, _slots_offset) = layout + .extend(Layout::array::(capacity).unwrap()) + .unwrap(); + let layout = layout.pad_to_align(); + + let buffer = unsafe { + let ptr = alloc::alloc::alloc(layout); + if ptr.is_null() { + alloc::alloc::handle_alloc_error(layout); + } + ptr.add(head_offset) + .cast::>() + .write(CachePadded::new(AtomicUsize::new(0))); + ptr.add(tail_offset) + .cast::>() + .write(CachePadded::new(AtomicUsize::new(0))); + ptr.add(is_abandoned_offset) + .cast::() + .write(AtomicBool::new(false)); + // Create a (fat) pointer to a slice ... + let ptr: *mut [T] = core::ptr::slice_from_raw_parts_mut(ptr.cast::(), capacity); + // ... and coerce it into our own dynamically sized type: + let ptr = ptr as *mut RingBuffer; + // Safety: Null check has been done above + NonNull::new_unchecked(ptr) + }; let p = Producer { - buffer: buffer.clone(), + buffer, cached_head: Cell::new(0), cached_tail: Cell::new(0), + _marker: PhantomData, }; let c = Consumer { buffer, cached_head: Cell::new(0), cached_tail: Cell::new(0), + _marker: PhantomData, }; (p, c) } @@ -151,32 +184,37 @@ impl RingBuffer { /// assert_eq!(producer.buffer(), consumer.buffer()); /// ``` pub fn capacity(&self) -> usize { - self.capacity + let slice_ptr = self.slots.get(); + // Safety: We are only accessing the length, which never changes. Shared access is OK. + unsafe { (*slice_ptr).len() } } /// Wraps a position from the range `0 .. 2 * capacity` to `0 .. capacity`. fn collapse_position(&self, pos: usize) -> usize { - debug_assert!(pos == 0 || pos < 2 * self.capacity); - if pos < self.capacity { + debug_assert!(pos == 0 || pos < 2 * self.capacity()); + if pos < self.capacity() { pos } else { - pos - self.capacity + pos - self.capacity() } } - /// Returns a pointer to the slot at position `pos`. + /// Returns a pointer to the (possibly uninitialized) slot at position `pos`. /// /// If `pos == 0 && capacity == 0`, the returned pointer must not be dereferenced! unsafe fn slot_ptr(&self, pos: usize) -> *mut T { - debug_assert!(pos == 0 || pos < 2 * self.capacity); - self.data_ptr.add(self.collapse_position(pos)) + debug_assert!(pos == 0 || pos < 2 * self.capacity()); + let slice_ptr = self.slots.get(); + (*slice_ptr) + .get_unchecked_mut(self.collapse_position(pos)) + .as_mut_ptr() } /// Increments a position by going `n` slots forward. fn increment(&self, pos: usize, n: usize) -> usize { - debug_assert!(pos == 0 || pos < 2 * self.capacity); - debug_assert!(n <= self.capacity); - let threshold = 2 * self.capacity - n; + debug_assert!(pos == 0 || pos < 2 * self.capacity()); + debug_assert!(n <= self.capacity()); + let threshold = 2 * self.capacity() - n; if pos < threshold { pos + n } else { @@ -188,9 +226,9 @@ impl RingBuffer { /// /// This is more efficient than self.increment(..., 1). fn increment1(&self, pos: usize) -> usize { - debug_assert_ne!(self.capacity, 0); - debug_assert!(pos < 2 * self.capacity); - if pos < 2 * self.capacity - 1 { + debug_assert_ne!(self.capacity(), 0); + debug_assert!(pos < 2 * self.capacity()); + if pos < 2 * self.capacity() - 1 { pos + 1 } else { 0 @@ -199,19 +237,51 @@ impl RingBuffer { /// Returns the distance between two positions. fn distance(&self, a: usize, b: usize) -> usize { - debug_assert!(a == 0 || a < 2 * self.capacity); - debug_assert!(b == 0 || b < 2 * self.capacity); + debug_assert!(a == 0 || a < 2 * self.capacity()); + debug_assert!(b == 0 || b < 2 * self.capacity()); if a <= b { b - a } else { - 2 * self.capacity - a + b + 2 * self.capacity() - a + b } } } +#[inline] +unsafe fn abandon(buffer: NonNull>) { + // The two threads (producer and consumer) must observe the same order of accesses + // to `is_abandoned`. This is accomplished with Acquire/Release. + if buffer + .as_ref() + .is_abandoned + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + // The flag wasn't set before, so we are the first to abandon the RingBuffer + // and it should not be dropped yet. + } else { + // The flag was already set, i.e. the other thread has already abandoned + // the RingBuffer and it can be dropped now. + drop_slow(buffer); + } +} + +/// Non-inlined part of `abandon()`. +#[inline(never)] +unsafe fn drop_slow(buffer: NonNull>) { + // Turn the pointer into a Box and immediately drop it, + // which deallocates the memory allocated in `RingBuffer::new()`. + // + // Safety: This is allowed because the RingBuffer has been allocated with the + // Global allocator (see `RingBuffer::new()`). + let _ = alloc::boxed::Box::from_raw(buffer.as_ptr()); +} + impl Drop for RingBuffer { /// Drops all non-empty slots. fn drop(&mut self) { + // The threads have already been synchronized in `abandon()`, + // Relaxed ordering is sufficient here. let mut head = self.head.load(Ordering::Relaxed); let tail = self.tail.load(Ordering::Relaxed); @@ -222,11 +292,6 @@ impl Drop for RingBuffer { } head = self.increment1(head); } - - // Finally, deallocate the buffer, but don't run any destructors. - unsafe { - Vec::from_raw_parts(self.data_ptr, 0, self.capacity); - } } } @@ -274,8 +339,8 @@ impl Eq for RingBuffer {} /// [`RingBuffer::drop()`] will be called, freeing the allocated memory. #[derive(Debug, PartialEq, Eq)] pub struct Producer { - /// A reference to the ring buffer. - buffer: Arc>, + /// A (fat) pointer to the ring buffer. + buffer: NonNull>, /// A copy of `buffer.head` for quick access. /// @@ -286,10 +351,21 @@ pub struct Producer { /// /// This value is always in sync with `buffer.tail`. cached_tail: Cell, + + /// Indicates that dropping a `Producer` may drop a `RingBuffer`. + _marker: PhantomData>, } unsafe impl Send for Producer {} +impl Drop for Producer { + #[inline] + fn drop(&mut self) { + // Safety: The pointer is valid until after the second call to `abandon()`. + unsafe { abandon(self.buffer) }; + } +} + impl Producer { /// Attempts to push an element into the queue. /// @@ -312,11 +388,12 @@ impl Producer { /// ``` pub fn push(&mut self, value: T) -> Result<(), PushError> { if let Some(tail) = self.next_tail() { + let buffer = self.buffer(); unsafe { - self.buffer.slot_ptr(tail).write(value); + buffer.slot_ptr(tail).write(value); } - let tail = self.buffer.increment1(tail); - self.buffer.tail.store(tail, Ordering::Release); + let tail = buffer.increment1(tail); + buffer.tail.store(tail, Ordering::Release); self.cached_tail.set(tail); Ok(()) } else { @@ -343,9 +420,10 @@ impl Producer { /// assert_eq!(p.slots(), 1024); /// ``` pub fn slots(&self) -> usize { - let head = self.buffer.head.load(Ordering::Acquire); + let buffer = self.buffer(); + let head = buffer.head.load(Ordering::Acquire); self.cached_head.set(head); - self.buffer.capacity - self.buffer.distance(head, self.cached_tail.get()) + buffer.capacity() - buffer.distance(head, self.cached_tail.get()) } /// Returns `true` if there are currently no slots available for writing. @@ -427,12 +505,13 @@ impl Producer { /// } /// ``` pub fn is_abandoned(&self) -> bool { - Arc::strong_count(&self.buffer) < 2 + self.buffer().is_abandoned.load(Ordering::Acquire) } /// Returns a read-only reference to the ring buffer. pub fn buffer(&self) -> &RingBuffer { - &self.buffer + // Safety: The pointer is always valid. + unsafe { self.buffer.as_ref() } } /// Get the tail position for writing the next slot, if available. @@ -441,15 +520,16 @@ impl Producer { /// For performance, this special case is immplemented separately. fn next_tail(&self) -> Option { let tail = self.cached_tail.get(); + let buffer = self.buffer(); // Check if the queue is *possibly* full. - if self.buffer.distance(self.cached_head.get(), tail) == self.buffer.capacity { + if buffer.distance(self.cached_head.get(), tail) == buffer.capacity() { // Refresh the head ... - let head = self.buffer.head.load(Ordering::Acquire); + let head = buffer.head.load(Ordering::Acquire); self.cached_head.set(head); // ... and check if it's *really* full. - if self.buffer.distance(head, tail) == self.buffer.capacity { + if buffer.distance(head, tail) == buffer.capacity() { return None; } } @@ -479,8 +559,8 @@ impl Producer { /// [`RingBuffer::drop()`] will be called, freeing the allocated memory. #[derive(Debug, PartialEq, Eq)] pub struct Consumer { - /// A reference to the ring buffer. - buffer: Arc>, + /// A (fat) pointer to the ring buffer. + buffer: NonNull>, /// A copy of `buffer.head` for quick access. /// @@ -491,10 +571,21 @@ pub struct Consumer { /// /// This value can be stale and sometimes needs to be resynchronized with `buffer.tail`. cached_tail: Cell, + + /// Indicates that dropping a `Consumer` may drop a `RingBuffer`. + _marker: PhantomData>, } unsafe impl Send for Consumer {} +impl Drop for Consumer { + #[inline] + fn drop(&mut self) { + // Safety: The pointer is valid until after the second call to `abandon()`. + unsafe { abandon(self.buffer) }; + } +} + impl Consumer { /// Attempts to pop an element from the queue. /// @@ -527,9 +618,10 @@ impl Consumer { /// ``` pub fn pop(&mut self) -> Result { if let Some(head) = self.next_head() { - let value = unsafe { self.buffer.slot_ptr(head).read() }; - let head = self.buffer.increment1(head); - self.buffer.head.store(head, Ordering::Release); + let buffer = self.buffer(); + let value = unsafe { buffer.slot_ptr(head).read() }; + let head = buffer.increment1(head); + buffer.head.store(head, Ordering::Release); self.cached_head.set(head); Ok(value) } else { @@ -557,7 +649,7 @@ impl Consumer { /// ``` pub fn peek(&self) -> Result<&T, PeekError> { if let Some(head) = self.next_head() { - Ok(unsafe { &*self.buffer.slot_ptr(head) }) + Ok(unsafe { &*self.buffer().slot_ptr(head) }) } else { Err(PeekError::Empty) } @@ -582,9 +674,9 @@ impl Consumer { /// assert_eq!(c.slots(), 0); /// ``` pub fn slots(&self) -> usize { - let tail = self.buffer.tail.load(Ordering::Acquire); + let tail = self.buffer().tail.load(Ordering::Acquire); self.cached_tail.set(tail); - self.buffer.distance(self.cached_head.get(), tail) + self.buffer().distance(self.cached_head.get(), tail) } /// Returns `true` if there are currently no slots available for reading. @@ -665,12 +757,13 @@ impl Consumer { /// } /// ``` pub fn is_abandoned(&self) -> bool { - Arc::strong_count(&self.buffer) < 2 + self.buffer().is_abandoned.load(Ordering::Acquire) } /// Returns a read-only reference to the ring buffer. pub fn buffer(&self) -> &RingBuffer { - &self.buffer + // Safety: The pointer is always valid + unsafe { self.buffer.as_ref() } } /// Get the head position for reading the next slot, if available. @@ -683,7 +776,7 @@ impl Consumer { // Check if the queue is *possibly* empty. if head == self.cached_tail.get() { // Refresh the tail ... - let tail = self.buffer.tail.load(Ordering::Acquire); + let tail = self.buffer().tail.load(Ordering::Acquire); self.cached_tail.set(tail); // ... and check if it's *really* empty. From a1cb8c11f72f8e8ee2c2ca408366e9fcd88c21db Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Thu, 16 Dec 2021 13:13:37 +0100 Subject: [PATCH 03/16] A few optimizations --- src/chunks.rs | 99 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 58 insertions(+), 41 deletions(-) diff --git a/src/chunks.rs b/src/chunks.rs index 02d56bf..542447e 100644 --- a/src/chunks.rs +++ b/src/chunks.rs @@ -350,14 +350,13 @@ where { /// Fills all slots with the [`Default`] value. fn from(chunk: WriteChunkUninit<'a, T>) -> Self { - for slot in chunk - .first_slice - .iter_mut() - .chain(chunk.second_slice.iter_mut()) - { - unsafe { - slot.as_mut_ptr().write(Default::default()); - } + // NB: Using Iterator::chain() to iterate over both slices + // led to worse optimization (with rustc 1.57). + for slot in chunk.first_slice.iter_mut() { + *slot = MaybeUninit::new(Default::default()); + } + for slot in chunk.second_slice.iter_mut() { + *slot = MaybeUninit::new(Default::default()); } WriteChunk(Some(chunk), PhantomData) } @@ -563,18 +562,33 @@ impl WriteChunkUninit<'_, T> { where I: IntoIterator, { - let iterated = self - .first_slice - .iter_mut() - .chain(self.second_slice.iter_mut()) - .zip(iter) - .map(|(slot, item)| { - // SAFETY: It is allowed to write to this memory slot - unsafe { - slot.as_mut_ptr().write(item); + let mut iter = iter.into_iter(); + let mut iterated = 0; + // NB: Iterating over slices (instead of using pointers) + // led to worse optimization (with rustc 1.57). + 'outer: for &(ptr, len) in &[ + ( + self.first_slice.as_mut_ptr().cast::(), + self.first_slice.len(), + ), + ( + self.second_slice.as_mut_ptr().cast::(), + self.second_slice.len(), + ), + ] { + for i in 0..len { + match iter.next() { + Some(item) => { + // SAFETY: It is allowed to write to this memory slot + unsafe { + ptr.add(i).write(item); + } + iterated += 1; + } + None => break 'outer, } - }) - .count(); + } + } // SAFETY: iterated slots have been initialized above unsafe { self.commit_unchecked(iterated) } } @@ -722,14 +736,13 @@ impl ReadChunk<'_, T> { } unsafe fn commit_unchecked(self, n: usize) -> usize { - for slot in self - .first_slice + self.first_slice .iter_mut() .chain(self.second_slice.iter_mut()) .take(n) - { - slot.as_mut_ptr().drop_in_place(); - } + .for_each(|slot| { + slot.as_mut_ptr().drop_in_place(); + }); let c = self.consumer; let head = c.buffer().increment(c.cached_head.get(), n); c.buffer().head.store(head, Ordering::Release); @@ -760,12 +773,8 @@ impl<'a, T> IntoIterator for ReadChunk<'a, T> { /// Non-iterated items remain in the ring buffer. fn into_iter(self) -> Self::IntoIter { Self::IntoIter { - chunk_size: self.len(), - iter: self - .first_slice - .iter_mut() - .chain(self.second_slice.iter_mut()), - consumer: self.consumer, + chunk: self, + iterated: 0, } } } @@ -779,12 +788,8 @@ impl<'a, T> IntoIterator for ReadChunk<'a, T> { /// Non-iterated items remain in the ring buffer. #[derive(Debug)] pub struct ReadChunkIntoIter<'a, T> { - chunk_size: usize, - iter: core::iter::Chain< - core::slice::IterMut<'a, MaybeUninit>, - core::slice::IterMut<'a, MaybeUninit>, - >, - consumer: &'a mut Consumer, + chunk: ReadChunk<'a, T>, + iterated: usize, } impl<'a, T> Drop for ReadChunkIntoIter<'a, T> { @@ -792,9 +797,8 @@ impl<'a, T> Drop for ReadChunkIntoIter<'a, T> { /// /// Non-iterated items remain in the ring buffer and are *not* dropped. fn drop(&mut self) { - let iterated = self.chunk_size - self.len(); - let c = &self.consumer; - let head = c.buffer().increment(c.cached_head.get(), iterated); + let c = &self.chunk.consumer; + let head = c.buffer().increment(c.cached_head.get(), self.iterated); c.buffer().head.store(head, Ordering::Release); c.cached_head.set(head); } @@ -804,11 +808,24 @@ impl<'a, T> Iterator for ReadChunkIntoIter<'a, T> { type Item = T; fn next(&mut self) -> Option { - self.iter.next().map(|slot| unsafe { slot.as_ptr().read() }) + self.chunk + .first_slice + .get(self.iterated) + .or_else(|| { + self.chunk + .second_slice + .get(self.iterated - self.chunk.first_slice.len()) + }) + .map(|slot| unsafe { slot.as_ptr().read() }) + .map(|item| { + self.iterated += 1; + item + }) } fn size_hint(&self) -> (usize, Option) { - self.iter.size_hint() + let size = self.chunk.first_slice.len() + self.chunk.second_slice.len() - self.iterated; + (size, Some(size)) } } From 6668d18feb6331e57d795cf1479d8b77b54dbadc Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Tue, 21 Dec 2021 09:26:50 +0100 Subject: [PATCH 04/16] Use Self instead of RingBuffer --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index f535ea4..97f4596 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,7 +151,7 @@ impl RingBuffer { // Create a (fat) pointer to a slice ... let ptr: *mut [T] = core::ptr::slice_from_raw_parts_mut(ptr.cast::(), capacity); // ... and coerce it into our own dynamically sized type: - let ptr = ptr as *mut RingBuffer; + let ptr = ptr as *mut Self; // Safety: Null check has been done above NonNull::new_unchecked(ptr) }; From a70811ac4f1a65bf07e1938e5992edc632890e20 Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Sat, 8 Jan 2022 16:55:29 +0100 Subject: [PATCH 05/16] Make "as *mut ..." coercions a bit more explicit --- src/chunks.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/chunks.rs b/src/chunks.rs index 542447e..32613cc 100644 --- a/src/chunks.rs +++ b/src/chunks.rs @@ -386,8 +386,8 @@ where // SAFETY: All slots have been initialized in From::from(). unsafe { ( - &mut *(chunk.first_slice as *mut _ as *mut _), - &mut *(chunk.second_slice as *mut _ as *mut _), + &mut *(chunk.first_slice as *mut [_] as *mut [T]), + &mut *(chunk.second_slice as *mut [_] as *mut [T]), ) } } @@ -666,8 +666,8 @@ impl ReadChunk<'_, T> { // SAFETY: All slots are initialized. unsafe { ( - &*(self.first_slice as *const _ as *const _), - &*(self.second_slice as *const _ as *const _), + &*(self.first_slice as *const [_] as *const [T]), + &*(self.second_slice as *const [_] as *const [T]), ) } } From 7df2f7b091b47e397f937d151968736f05ec5719 Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Sat, 22 Jan 2022 17:30:16 +0100 Subject: [PATCH 06/16] TST: check memory usage --- tests/lib.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/lib.rs b/tests/lib.rs index afac6c9..ffd1d78 100644 --- a/tests/lib.rs +++ b/tests/lib.rs @@ -156,3 +156,30 @@ fn no_race_with_is_abandoned() { } t.join().unwrap(); } + +#[test] +fn memory_usage() { + use cache_padded::CachePadded; + use std::mem::{align_of_val, size_of, size_of_val}; + + let cacheline = size_of::>(); + let fat_pointer = size_of::<&[()]>(); + + let size = 10 * cacheline; + // Memory overhead: 2 cachelines + 1 byte (+ padding) + let (p, c) = RingBuffer::::new(size - 2 * cacheline - 1); + assert_eq!(size_of_val(&p), fat_pointer + 2 * size_of::()); + assert_eq!(size_of_val(&c), fat_pointer + 2 * size_of::()); + assert_eq!(size_of_val(p.buffer()), size); + assert_eq!(align_of_val(p.buffer()), cacheline); + + let (p, _c) = RingBuffer::::new(1); + // 2 cachelines + 1 byte + 1 byte + padding + assert_eq!(size_of_val(p.buffer()), 3 * cacheline); + assert_eq!(align_of_val(p.buffer()), cacheline); + + let (p, _c) = RingBuffer::>::new(1); + // 2 cachelines + 1 byte + padding + 1 cacheline + assert_eq!(size_of_val(p.buffer()), 4 * cacheline); + assert_eq!(align_of_val(p.buffer()), cacheline); +} From 0326c77b5be758d5e7e59a6239bf202fe8bfeaa3 Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Sat, 26 Feb 2022 18:02:11 +0100 Subject: [PATCH 07/16] Remove explicit #[inline] annotations on destructor --- src/lib.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 97f4596..be3cb72 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -247,7 +247,6 @@ impl RingBuffer { } } -#[inline] unsafe fn abandon(buffer: NonNull>) { // The two threads (producer and consumer) must observe the same order of accesses // to `is_abandoned`. This is accomplished with Acquire/Release. @@ -359,7 +358,6 @@ pub struct Producer { unsafe impl Send for Producer {} impl Drop for Producer { - #[inline] fn drop(&mut self) { // Safety: The pointer is valid until after the second call to `abandon()`. unsafe { abandon(self.buffer) }; @@ -579,7 +577,6 @@ pub struct Consumer { unsafe impl Send for Consumer {} impl Drop for Consumer { - #[inline] fn drop(&mut self) { // Safety: The pointer is valid until after the second call to `abandon()`. unsafe { abandon(self.buffer) }; From f091ef31489f58a62a81f8fd482dba841d9105d3 Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Sun, 6 Mar 2022 11:26:25 +0100 Subject: [PATCH 08/16] Use fetch_or() instead of compare_exchange() ... ... and clarify the comment about Ordering::AcqRel. --- src/lib.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index be3cb72..10cf126 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -248,20 +248,21 @@ impl RingBuffer { } unsafe fn abandon(buffer: NonNull>) { - // The two threads (producer and consumer) must observe the same order of accesses - // to `is_abandoned`. This is accomplished with Acquire/Release. + // The "store" part of `fetch_or()` has to use `Release` to make sure that any previous writes + // to the ring buffer happen before it (in the thread that abandons first). + // The "load" part has to use `Acquire` to make sure that reading `head` and `tail` + // in the destructor happens after it (in the thread that drops the `RingBuffer`). if buffer .as_ref() .is_abandoned - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) - .is_ok() + .fetch_or(true, Ordering::AcqRel) { - // The flag wasn't set before, so we are the first to abandon the RingBuffer - // and it should not be dropped yet. - } else { // The flag was already set, i.e. the other thread has already abandoned // the RingBuffer and it can be dropped now. drop_slow(buffer); + } else { + // The flag wasn't set before, so we are the first to abandon the RingBuffer + // and it should not be dropped yet. } } From 49a4e8f5a59911a5b0bd6e35bae85fcfaa6ae5b2 Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Wed, 23 Mar 2022 11:20:40 +0100 Subject: [PATCH 09/16] Don't dereference pointer to slice, use slice::from_raw_parts_mut --- src/chunks.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/chunks.rs b/src/chunks.rs index 32613cc..b7e1d2c 100644 --- a/src/chunks.rs +++ b/src/chunks.rs @@ -255,11 +255,12 @@ impl Producer { } let tail = buffer.collapse_position(tail); let first_len = n.min(buffer.capacity() - tail); - let slice_ptr = buffer.slots.get(); + let slice_ptr = buffer.slots.get() as *mut MaybeUninit; // SAFETY: All indices are valid. Since we know we have exclusive access // to the sub-slices and they are non-overlapping, we can make them mutable. - let first_slice = unsafe { (*slice_ptr).get_unchecked_mut(tail..tail + first_len) }; - let second_slice = unsafe { (*slice_ptr).get_unchecked_mut(0..n - first_len) }; + let first_slice = + unsafe { core::slice::from_raw_parts_mut(slice_ptr.add(tail), first_len) }; + let second_slice = unsafe { core::slice::from_raw_parts_mut(slice_ptr, n - first_len) }; Ok(WriteChunkUninit { first_slice, second_slice, @@ -308,11 +309,12 @@ impl Consumer { } let head = buffer.collapse_position(head); let first_len = n.min(buffer.capacity() - head); - let slice_ptr = buffer.slots.get(); + let slice_ptr = buffer.slots.get() as *mut MaybeUninit; // SAFETY: All indices are valid. Since we know we have exclusive access // to the sub-slices and they are non-overlapping, we can make them mutable. - let first_slice = unsafe { (*slice_ptr).get_unchecked_mut(head..head + first_len) }; - let second_slice = unsafe { (*slice_ptr).get_unchecked_mut(0..n - first_len) }; + let first_slice = + unsafe { core::slice::from_raw_parts_mut(slice_ptr.add(head), first_len) }; + let second_slice = unsafe { core::slice::from_raw_parts_mut(slice_ptr, n - first_len) }; Ok(ReadChunk { first_slice, second_slice, From a35c5c087505c205d072a4acae4acbe7a0686ff2 Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Sun, 10 Apr 2022 19:00:47 +0200 Subject: [PATCH 10/16] Use cast() instead of "as *mut ..." --- src/chunks.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/chunks.rs b/src/chunks.rs index b7e1d2c..7238c7a 100644 --- a/src/chunks.rs +++ b/src/chunks.rs @@ -255,7 +255,7 @@ impl Producer { } let tail = buffer.collapse_position(tail); let first_len = n.min(buffer.capacity() - tail); - let slice_ptr = buffer.slots.get() as *mut MaybeUninit; + let slice_ptr = buffer.slots.get().cast::>(); // SAFETY: All indices are valid. Since we know we have exclusive access // to the sub-slices and they are non-overlapping, we can make them mutable. let first_slice = @@ -309,7 +309,7 @@ impl Consumer { } let head = buffer.collapse_position(head); let first_len = n.min(buffer.capacity() - head); - let slice_ptr = buffer.slots.get() as *mut MaybeUninit; + let slice_ptr = buffer.slots.get().cast::>(); // SAFETY: All indices are valid. Since we know we have exclusive access // to the sub-slices and they are non-overlapping, we can make them mutable. let first_slice = From 39f152407f1156d80f6b4821a5c1564d74e76c61 Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Sun, 10 Apr 2022 19:32:55 +0200 Subject: [PATCH 11/16] Use explicit drop() instead of binding to _ --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 10cf126..b40cc59 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -274,7 +274,7 @@ unsafe fn drop_slow(buffer: NonNull>) { // // Safety: This is allowed because the RingBuffer has been allocated with the // Global allocator (see `RingBuffer::new()`). - let _ = alloc::boxed::Box::from_raw(buffer.as_ptr()); + drop(alloc::boxed::Box::from_raw(buffer.as_ptr())); } impl Drop for RingBuffer { From 1e8b43a1c5503532d3d9c973f64a206d0b8b1111 Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Mon, 18 Apr 2022 08:21:37 +0200 Subject: [PATCH 12/16] Remove unnecessary turbofish from cast() --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index b40cc59..57e50db 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -149,7 +149,7 @@ impl RingBuffer { .cast::() .write(AtomicBool::new(false)); // Create a (fat) pointer to a slice ... - let ptr: *mut [T] = core::ptr::slice_from_raw_parts_mut(ptr.cast::(), capacity); + let ptr: *mut [T] = core::ptr::slice_from_raw_parts_mut(ptr.cast(), capacity); // ... and coerce it into our own dynamically sized type: let ptr = ptr as *mut Self; // Safety: Null check has been done above From 7671f4e887d96aba0c3b418d6a4cf3655167c0ca Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Sat, 9 Apr 2022 13:50:25 +0200 Subject: [PATCH 13/16] Use Acquire load only before dropping --- src/lib.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 57e50db..a2589a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -250,15 +250,23 @@ impl RingBuffer { unsafe fn abandon(buffer: NonNull>) { // The "store" part of `fetch_or()` has to use `Release` to make sure that any previous writes // to the ring buffer happen before it (in the thread that abandons first). - // The "load" part has to use `Acquire` to make sure that reading `head` and `tail` - // in the destructor happens after it (in the thread that drops the `RingBuffer`). if buffer .as_ref() .is_abandoned - .fetch_or(true, Ordering::AcqRel) + .fetch_or(true, Ordering::Release) { - // The flag was already set, i.e. the other thread has already abandoned - // the RingBuffer and it can be dropped now. + // The flag was already set, i.e. the other thread has already abandoned the RingBuffer + // and it can be dropped now. + + // However, since the load of `is_abandoned` was `Relaxed`, + // we have to use `Acquire` here to make sure that reading `head` and `tail` + // in the destructor happens after this point. + + // Ideally, we would use a memory fence like this: + //core::sync::atomic::fence(Ordering::Acquire); + // ... but as long as ThreadSanitizer doesn't support fences, + // we use load(Acquire) as a work-around to avoid false positives: + let _ = buffer.as_ref().is_abandoned.load(Ordering::Acquire); drop_slow(buffer); } else { // The flag wasn't set before, so we are the first to abandon the RingBuffer From 36d45e4b1946f188a3d16a70fc1901ea09372f7b Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Mon, 18 Apr 2022 14:38:44 +0200 Subject: [PATCH 14/16] Store is_abandoned as local variable, clarify Relaxed load --- src/lib.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a2589a6..537daf3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -248,13 +248,12 @@ impl RingBuffer { } unsafe fn abandon(buffer: NonNull>) { + let is_abandoned: &AtomicBool = &buffer.as_ref().is_abandoned; // The "store" part of `fetch_or()` has to use `Release` to make sure that any previous writes // to the ring buffer happen before it (in the thread that abandons first). - if buffer - .as_ref() - .is_abandoned - .fetch_or(true, Ordering::Release) - { + // The "load" part can be `Relaxed` for the first thread, + // but it must be `Acquire` for the second one (see below). + if is_abandoned.fetch_or(true, Ordering::Release) { // The flag was already set, i.e. the other thread has already abandoned the RingBuffer // and it can be dropped now. @@ -266,7 +265,7 @@ unsafe fn abandon(buffer: NonNull>) { //core::sync::atomic::fence(Ordering::Acquire); // ... but as long as ThreadSanitizer doesn't support fences, // we use load(Acquire) as a work-around to avoid false positives: - let _ = buffer.as_ref().is_abandoned.load(Ordering::Acquire); + let _ = is_abandoned.load(Ordering::Acquire); drop_slow(buffer); } else { // The flag wasn't set before, so we are the first to abandon the RingBuffer From e1f8d1835d19bf6c2f9cdfc4dfd03502b05d6309 Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Sat, 23 Apr 2022 17:01:57 +0200 Subject: [PATCH 15/16] Nicer formatting of from_raw_parts_mut() --- src/chunks.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/chunks.rs b/src/chunks.rs index 7238c7a..481b720 100644 --- a/src/chunks.rs +++ b/src/chunks.rs @@ -258,9 +258,12 @@ impl Producer { let slice_ptr = buffer.slots.get().cast::>(); // SAFETY: All indices are valid. Since we know we have exclusive access // to the sub-slices and they are non-overlapping, we can make them mutable. - let first_slice = - unsafe { core::slice::from_raw_parts_mut(slice_ptr.add(tail), first_len) }; - let second_slice = unsafe { core::slice::from_raw_parts_mut(slice_ptr, n - first_len) }; + let (first_slice, second_slice) = unsafe { + ( + core::slice::from_raw_parts_mut(slice_ptr.add(tail), first_len), + core::slice::from_raw_parts_mut(slice_ptr, n - first_len), + ) + }; Ok(WriteChunkUninit { first_slice, second_slice, @@ -312,9 +315,12 @@ impl Consumer { let slice_ptr = buffer.slots.get().cast::>(); // SAFETY: All indices are valid. Since we know we have exclusive access // to the sub-slices and they are non-overlapping, we can make them mutable. - let first_slice = - unsafe { core::slice::from_raw_parts_mut(slice_ptr.add(head), first_len) }; - let second_slice = unsafe { core::slice::from_raw_parts_mut(slice_ptr, n - first_len) }; + let (first_slice, second_slice) = unsafe { + ( + core::slice::from_raw_parts_mut(slice_ptr.add(head), first_len), + core::slice::from_raw_parts_mut(slice_ptr, n - first_len), + ) + }; Ok(ReadChunk { first_slice, second_slice, From 5f90525078e66a16fae14db5a7dae50d7e9cc127 Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Sun, 22 May 2022 10:37:55 +0200 Subject: [PATCH 16/16] Add comment about addr_of_mut!() --- src/lib.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 537daf3..e318937 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -152,6 +152,11 @@ impl RingBuffer { let ptr: *mut [T] = core::ptr::slice_from_raw_parts_mut(ptr.cast(), capacity); // ... and coerce it into our own dynamically sized type: let ptr = ptr as *mut Self; + + // Since Rust 1.51 addr_of_mut!((*ptr).$field_name).write(...) can be used + // to get a properly typed (and aligned) pointer for field initialization + // (instead of manually casting from `*mut u8`). + // Safety: Null check has been done above NonNull::new_unchecked(ptr) };