diff --git a/src/chunks.rs b/src/chunks.rs index f4bac97..4330b10 100644 --- a/src/chunks.rs +++ b/src/chunks.rs @@ -237,13 +237,13 @@ impl Producer { /// For a safe alternative that provides mutable slices of [`Default`]-initialized slots, /// see [`Producer::write_chunk()`]. pub fn write_chunk_uninit(&mut self, n: usize) -> Result, ChunkError> { - let tail = self.tail.get(); + let tail = self.buffer.tail.load(Ordering::Acquire); // Check if the queue has *possibly* not enough slots. - if self.buffer.capacity - self.buffer.distance(self.head.get(), tail) < n { + if self.buffer.capacity - self.buffer.distance(self.cached_head.get(), tail) < n { // Refresh the head ... let head = self.buffer.head.load(Ordering::Acquire); - self.head.set(head); + self.cached_head.set(head); // ... and check if there *really* are not enough slots. let slots = self.buffer.capacity - self.buffer.distance(head, tail); @@ -286,13 +286,13 @@ impl Consumer { /// /// See the documentation of the [`chunks`](crate::chunks#examples) module. pub fn read_chunk(&mut self, n: usize) -> Result, ChunkError> { - let head = self.head.get(); + let head = self.buffer.head.load(Ordering::Acquire); // Check if the queue has *possibly* not enough slots. - if self.buffer.distance(head, self.tail.get()) < n { + if self.buffer.distance(head, self.cached_tail.get()) < n { // Refresh the tail ... let tail = self.buffer.tail.load(Ordering::Acquire); - self.tail.set(tail); + self.cached_tail.set(tail); // ... and check if there *really* are not enough slots. let slots = self.buffer.distance(head, tail); @@ -465,9 +465,9 @@ impl WriteChunkUninit<'_, T> { } unsafe fn commit_unchecked(self, n: usize) -> usize { - let tail = self.producer.buffer.increment(self.producer.tail.get(), n); + let tail = self.producer.buffer.tail.load(Ordering::Acquire); + let tail = self.producer.buffer.increment(tail, n); self.producer.buffer.tail.store(tail, Ordering::Release); - self.producer.tail.set(tail); n } @@ -654,7 +654,7 @@ impl ReadChunk<'_, T> { } unsafe fn commit_unchecked(self, n: usize) -> usize { - let head = self.consumer.head.get(); + let head = self.consumer.buffer.head.load(Ordering::Acquire); // Safety: head has not yet been incremented let first_ptr = self.consumer.buffer.slot_ptr(head); let first_len = self.first_len.min(n); @@ -668,7 +668,6 @@ impl ReadChunk<'_, T> { } let head = self.consumer.buffer.increment(head, n); self.consumer.buffer.head.store(head, Ordering::Release); - self.consumer.head.set(head); n } @@ -722,9 +721,8 @@ impl<'a, T> Drop for ReadChunkIntoIter<'a, T> { let consumer = &self.chunk.consumer; let head = consumer .buffer - .increment(consumer.head.get(), self.iterated); + .increment(consumer.buffer.head.load(Ordering::Acquire), self.iterated); consumer.buffer.head.store(head, Ordering::Release); - consumer.head.set(head); } } diff --git a/src/lib.rs b/src/lib.rs index 8d1f165..0bf0f13 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -126,13 +126,11 @@ impl RingBuffer { }); let p = Producer { buffer: buffer.clone(), - head: Cell::new(0), - tail: Cell::new(0), + cached_head: Cell::new(0), }; let c = Consumer { buffer, - head: Cell::new(0), - tail: Cell::new(0), + cached_tail: Cell::new(0), }; (p, c) } @@ -281,12 +279,7 @@ pub struct Producer { /// A copy of `buffer.head` for quick access. /// /// This value can be stale and sometimes needs to be resynchronized with `buffer.head`. - head: Cell, - - /// A copy of `buffer.tail` for quick access. - /// - /// This value is always in sync with `buffer.tail`. - tail: Cell, + cached_head: Cell, } unsafe impl Send for Producer {} @@ -318,7 +311,6 @@ impl Producer { } let tail = self.buffer.increment1(tail); self.buffer.tail.store(tail, Ordering::Release); - self.tail.set(tail); Ok(()) } else { Err(PushError::Full(value)) @@ -345,8 +337,9 @@ impl Producer { /// ``` pub fn slots(&self) -> usize { let head = self.buffer.head.load(Ordering::Acquire); - self.head.set(head); - self.buffer.capacity - self.buffer.distance(head, self.tail.get()) + let tail = self.buffer.tail.load(Ordering::Acquire); + self.cached_head.set(head); + self.buffer.capacity - self.buffer.distance(head, tail) } /// Returns `true` if there are currently no slots available for writing. @@ -441,13 +434,13 @@ impl Producer { /// This is a strict subset of the functionality implemented in `write_chunk_uninit()`. /// For performance, this special case is immplemented separately. fn next_tail(&self) -> Option { - let tail = self.tail.get(); + let tail = self.buffer.tail.load(Ordering::Acquire); // Check if the queue is *possibly* full. - if self.buffer.distance(self.head.get(), tail) == self.buffer.capacity { + if self.buffer.distance(self.cached_head.get(), tail) == self.buffer.capacity { // Refresh the head ... let head = self.buffer.head.load(Ordering::Acquire); - self.head.set(head); + self.cached_head.set(head); // ... and check if it's *really* full. if self.buffer.distance(head, tail) == self.buffer.capacity { @@ -483,15 +476,10 @@ pub struct Consumer { /// A reference to the ring buffer. buffer: Arc>, - /// A copy of `buffer.head` for quick access. - /// - /// This value is always in sync with `buffer.head`. - head: Cell, - /// A copy of `buffer.tail` for quick access. /// /// This value can be stale and sometimes needs to be resynchronized with `buffer.tail`. - tail: Cell, + cached_tail: Cell, } unsafe impl Send for Consumer {} @@ -531,7 +519,6 @@ impl Consumer { let value = unsafe { self.buffer.slot_ptr(head).read() }; let head = self.buffer.increment1(head); self.buffer.head.store(head, Ordering::Release); - self.head.set(head); Ok(value) } else { Err(PopError::Empty) @@ -583,9 +570,10 @@ impl Consumer { /// assert_eq!(c.slots(), 0); /// ``` pub fn slots(&self) -> usize { + let head = self.buffer.head.load(Ordering::Acquire); let tail = self.buffer.tail.load(Ordering::Acquire); - self.tail.set(tail); - self.buffer.distance(self.head.get(), tail) + self.cached_tail.set(tail); + self.buffer.distance(head, tail) } /// Returns `true` if there are currently no slots available for reading. @@ -679,13 +667,13 @@ impl Consumer { /// This is a strict subset of the functionality implemented in `read_chunk()`. /// For performance, this special case is immplemented separately. fn next_head(&self) -> Option { - let head = self.head.get(); + let head = self.buffer.head.load(Ordering::Acquire); // Check if the queue is *possibly* empty. - if head == self.tail.get() { + if head == self.cached_tail.get() { // Refresh the tail ... let tail = self.buffer.tail.load(Ordering::Acquire); - self.tail.set(tail); + self.cached_tail.set(tail); // ... and check if it's *really* empty. if head == tail {