Skip to content

Commit

Permalink
Don't cache head/tail index in Consumer/Producer
Browse files Browse the repository at this point in the history
  • Loading branch information
mgeier committed Dec 16, 2021
1 parent d88a736 commit b2a9b1a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 26 deletions.
16 changes: 8 additions & 8 deletions src/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl<T> Producer<T> {
/// For a safe alternative that provides mutable slices of [`Default`]-initialized slots,
/// see [`Producer::write_chunk()`].
pub fn write_chunk_uninit(&mut self, n: usize) -> Result<WriteChunkUninit<'_, T>, ChunkError> {
let tail = self.cached_tail.get();
let tail = self.buffer.tail.load(Ordering::Acquire);

// Check if the queue has *possibly* not enough slots.
if self.buffer.capacity - self.buffer.distance(self.cached_head.get(), tail) < n {
Expand Down Expand Up @@ -286,7 +286,7 @@ impl<T> Consumer<T> {
///
/// See the documentation of the [`chunks`](crate::chunks#examples) module.
pub fn read_chunk(&mut self, n: usize) -> Result<ReadChunk<'_, T>, ChunkError> {
let head = self.cached_head.get();
let head = self.buffer.head.load(Ordering::Acquire);

// Check if the queue has *possibly* not enough slots.
if self.buffer.distance(head, self.cached_tail.get()) < n {
Expand Down Expand Up @@ -481,9 +481,9 @@ impl<T> WriteChunkUninit<'_, T> {

unsafe fn commit_unchecked(self, n: usize) -> usize {
let p = self.producer;
let tail = p.buffer.increment(p.cached_tail.get(), n);
let tail = p.buffer.tail.load(Ordering::Acquire);
let tail = p.buffer.increment(tail, n);
p.buffer.tail.store(tail, Ordering::Release);
p.cached_tail.set(tail);
n
}

Expand Down Expand Up @@ -681,9 +681,9 @@ impl<T> ReadChunk<'_, T> {
self.second_ptr.add(i).drop_in_place();
}
let c = self.consumer;
let head = c.buffer.increment(c.cached_head.get(), n);
let head = c.buffer.head.load(Ordering::Acquire);
let head = c.buffer.increment(head, n);
c.buffer.head.store(head, Ordering::Release);
c.cached_head.set(head);
n
}

Expand Down Expand Up @@ -735,9 +735,9 @@ impl<'a, T> Drop for ReadChunkIntoIter<'a, T> {
/// Non-iterated items remain in the ring buffer and are *not* dropped.
fn drop(&mut self) {
let c = &self.chunk.consumer;
let head = c.buffer.increment(c.cached_head.get(), self.iterated);
let head = c.buffer.head.load(Ordering::Acquire);
let head = c.buffer.increment(head, self.iterated);
c.buffer.head.store(head, Ordering::Release);
c.cached_head.set(head);
}
}

Expand Down
24 changes: 6 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,9 @@ impl<T> RingBuffer<T> {
let p = Producer {
buffer: buffer.clone(),
cached_head: Cell::new(0),
cached_tail: Cell::new(0),
};
let c = Consumer {
buffer,
cached_head: Cell::new(0),
cached_tail: Cell::new(0),
};
(p, c)
Expand Down Expand Up @@ -282,11 +280,6 @@ pub struct Producer<T> {
///
/// This value can be stale and sometimes needs to be resynchronized with `buffer.head`.
cached_head: Cell<usize>,

/// A copy of `buffer.tail` for quick access.
///
/// This value is always in sync with `buffer.tail`.
cached_tail: Cell<usize>,
}

unsafe impl<T: Send> Send for Producer<T> {}
Expand Down Expand Up @@ -318,7 +311,6 @@ impl<T> Producer<T> {
}
let tail = self.buffer.increment1(tail);
self.buffer.tail.store(tail, Ordering::Release);
self.cached_tail.set(tail);
Ok(())
} else {
Err(PushError::Full(value))
Expand Down Expand Up @@ -346,7 +338,8 @@ impl<T> Producer<T> {
pub fn slots(&self) -> usize {
let head = self.buffer.head.load(Ordering::Acquire);
self.cached_head.set(head);
self.buffer.capacity - self.buffer.distance(head, self.cached_tail.get())
let tail = self.buffer.tail.load(Ordering::Acquire);
self.buffer.capacity - self.buffer.distance(head, tail)
}

/// Returns `true` if there are currently no slots available for writing.
Expand Down Expand Up @@ -441,7 +434,7 @@ impl<T> Producer<T> {
/// This is a strict subset of the functionality implemented in `write_chunk_uninit()`.
/// For performance, this special case is immplemented separately.
fn next_tail(&self) -> Option<usize> {
let tail = self.cached_tail.get();
let tail = self.buffer.tail.load(Ordering::Acquire);

// Check if the queue is *possibly* full.
if self.buffer.distance(self.cached_head.get(), tail) == self.buffer.capacity {
Expand Down Expand Up @@ -483,11 +476,6 @@ pub struct Consumer<T> {
/// A reference to the ring buffer.
buffer: Arc<RingBuffer<T>>,

/// A copy of `buffer.head` for quick access.
///
/// This value is always in sync with `buffer.head`.
cached_head: Cell<usize>,

/// A copy of `buffer.tail` for quick access.
///
/// This value can be stale and sometimes needs to be resynchronized with `buffer.tail`.
Expand Down Expand Up @@ -531,7 +519,6 @@ impl<T> Consumer<T> {
let value = unsafe { self.buffer.slot_ptr(head).read() };
let head = self.buffer.increment1(head);
self.buffer.head.store(head, Ordering::Release);
self.cached_head.set(head);
Ok(value)
} else {
Err(PopError::Empty)
Expand Down Expand Up @@ -583,9 +570,10 @@ impl<T> Consumer<T> {
/// assert_eq!(c.slots(), 0);
/// ```
pub fn slots(&self) -> usize {
let head = self.buffer.head.load(Ordering::Acquire);
let tail = self.buffer.tail.load(Ordering::Acquire);
self.cached_tail.set(tail);
self.buffer.distance(self.cached_head.get(), tail)
self.buffer.distance(head, tail)
}

/// Returns `true` if there are currently no slots available for reading.
Expand Down Expand Up @@ -679,7 +667,7 @@ impl<T> Consumer<T> {
/// This is a strict subset of the functionality implemented in `read_chunk()`.
/// For performance, this special case is immplemented separately.
fn next_head(&self) -> Option<usize> {
let head = self.cached_head.get();
let head = self.buffer.head.load(Ordering::Acquire);

// Check if the queue is *possibly* empty.
if head == self.cached_tail.get() {
Expand Down

0 comments on commit b2a9b1a

Please sign in to comment.