Skip to content

Commit

Permalink
Use "Relaxed" for reading tail in producer and head in consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
mgeier committed Dec 6, 2021
1 parent b0fbd44 commit e94a9d6
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
18 changes: 11 additions & 7 deletions src/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ impl<T> Producer<T> {
/// For a safe alternative that provides mutable slices of [`Default`]-initialized slots,
/// see [`Producer::write_chunk()`].
pub fn write_chunk_uninit(&mut self, n: usize) -> Result<WriteChunkUninit<'_, T>, ChunkError> {
let tail = self.buffer.tail.load(Ordering::Acquire);
// "tail" is only ever written by the producer thread, "Relaxed" is enough
let tail = self.buffer.tail.load(Ordering::Relaxed);

// Check if the queue has *possibly* not enough slots.
if self.buffer.capacity - self.buffer.distance(self.cached_head.get(), tail) < n {
Expand Down Expand Up @@ -286,7 +287,8 @@ impl<T> Consumer<T> {
///
/// See the documentation of the [`chunks`](crate::chunks#examples) module.
pub fn read_chunk(&mut self, n: usize) -> Result<ReadChunk<'_, T>, ChunkError> {
let head = self.buffer.head.load(Ordering::Acquire);
// "head" is only ever written by the consumer thread, "Relaxed" is enough
let head = self.buffer.head.load(Ordering::Relaxed);

// Check if the queue has *possibly* not enough slots.
if self.buffer.distance(head, self.cached_tail.get()) < n {
Expand Down Expand Up @@ -465,7 +467,8 @@ impl<T> WriteChunkUninit<'_, T> {
}

unsafe fn commit_unchecked(self, n: usize) -> usize {
let tail = self.producer.buffer.tail.load(Ordering::Acquire);
// "tail" is only ever written by the producer thread, "Relaxed" is enough
let tail = self.producer.buffer.tail.load(Ordering::Relaxed);
let tail = self.producer.buffer.increment(tail, n);
self.producer.buffer.tail.store(tail, Ordering::Release);
n
Expand Down Expand Up @@ -654,7 +657,8 @@ impl<T> ReadChunk<'_, T> {
}

unsafe fn commit_unchecked(self, n: usize) -> usize {
let head = self.consumer.buffer.head.load(Ordering::Acquire);
// "head" is only ever written by the consumer thread, "Relaxed" is enough
let head = self.consumer.buffer.head.load(Ordering::Relaxed);
// Safety: head has not yet been incremented
let first_ptr = self.consumer.buffer.slot_ptr(head);
let first_len = self.first_len.min(n);
Expand Down Expand Up @@ -719,9 +723,9 @@ impl<'a, T> Drop for ReadChunkIntoIter<'a, T> {
/// Non-iterated items remain in the ring buffer and are *not* dropped.
fn drop(&mut self) {
let consumer = &self.chunk.consumer;
let head = consumer
.buffer
.increment(consumer.buffer.head.load(Ordering::Acquire), self.iterated);
// "head" is only ever written by the consumer thread, "Relaxed" is enough
let head = consumer.buffer.head.load(Ordering::Relaxed);
let head = consumer.buffer.increment(head, self.iterated);
consumer.buffer.head.store(head, Ordering::Release);
}
}
Expand Down
12 changes: 8 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,9 @@ impl<T> Producer<T> {
/// ```
pub fn slots(&self) -> usize {
let head = self.buffer.head.load(Ordering::Acquire);
let tail = self.buffer.tail.load(Ordering::Acquire);
self.cached_head.set(head);
// "tail" is only ever written by the producer thread, "Relaxed" is enough
let tail = self.buffer.tail.load(Ordering::Relaxed);
self.buffer.capacity - self.buffer.distance(head, tail)
}

Expand Down Expand Up @@ -434,7 +435,8 @@ impl<T> Producer<T> {
/// This is a strict subset of the functionality implemented in `write_chunk_uninit()`.
/// For performance, this special case is immplemented separately.
fn next_tail(&self) -> Option<usize> {
let tail = self.buffer.tail.load(Ordering::Acquire);
// "tail" is only ever written by the producer thread, "Relaxed" is enough
let tail = self.buffer.tail.load(Ordering::Relaxed);

// Check if the queue is *possibly* full.
if self.buffer.distance(self.cached_head.get(), tail) == self.buffer.capacity {
Expand Down Expand Up @@ -570,9 +572,10 @@ impl<T> Consumer<T> {
/// assert_eq!(c.slots(), 0);
/// ```
pub fn slots(&self) -> usize {
let head = self.buffer.head.load(Ordering::Acquire);
let tail = self.buffer.tail.load(Ordering::Acquire);
self.cached_tail.set(tail);
// "head" is only ever written by the consumer thread, "Relaxed" is enough
let head = self.buffer.head.load(Ordering::Relaxed);
self.buffer.distance(head, tail)
}

Expand Down Expand Up @@ -667,7 +670,8 @@ impl<T> Consumer<T> {
/// This is a strict subset of the functionality implemented in `read_chunk()`.
/// For performance, this special case is immplemented separately.
fn next_head(&self) -> Option<usize> {
let head = self.buffer.head.load(Ordering::Acquire);
// "head" is only ever written by the consumer thread, "Relaxed" is enough
let head = self.buffer.head.load(Ordering::Relaxed);

// Check if the queue is *possibly* empty.
if head == self.cached_tail.get() {
Expand Down

0 comments on commit e94a9d6

Please sign in to comment.