From 334ef33c3635d712cc46f8d4b7c0448b92e3c67a Mon Sep 17 00:00:00 2001 From: Benjamin Saunders Date: Sat, 30 Sep 2023 16:33:27 -0700 Subject: [PATCH] Encapsulate triple buffer unsafety --- src/fader.rs | 25 +++++----- src/lib.rs | 1 - src/spatial.rs | 123 ++++++++++++++++++++++++--------------------- src/swap.rs | 132 +++++++++++++++++++++++++------------------------ 4 files changed, 145 insertions(+), 136 deletions(-) diff --git a/src/fader.rs b/src/fader.rs index 0cf1dc5..807f235 100644 --- a/src/fader.rs +++ b/src/fader.rs @@ -1,7 +1,7 @@ use alloc::sync::Arc; use core::mem; -use crate::{frame, math::Float, Frame, Signal, Swap}; +use crate::{frame, math::Float, swap, Frame, Signal}; /// Cross-fades smoothly between dynamically-supplied signals /// @@ -9,19 +9,20 @@ use crate::{frame, math::Float, Frame, Signal, Swap}; /// perceived loudness pub struct Fader { progress: f32, - next: Arc>>>, + next: swap::Receiver>>, inner: T, } -impl Fader { +impl Fader { /// Create a fader initially wrapping `inner` pub fn new(inner: T) -> (FaderControl, Self) { + let (send, recv) = swap::swap(|| None); let signal = Self { progress: 1.0, - next: Arc::new(Swap::new(|| None)), + next: recv, inner, }; - let control = FaderControl(signal.next.clone()); + let control = FaderControl(send); (control, signal) } } @@ -45,7 +46,7 @@ where } } - let next = unsafe { (*self.next.received()).as_mut().unwrap() }; + let next = (*self.next.received()).as_mut().unwrap(); let increment = interval / next.duration; while !out.is_empty() { let mut buffer = [(); 1024].map(|()| T::Frame::ZERO); @@ -76,19 +77,17 @@ where } /// Thread-safe control for a [`Fader`] filter -pub struct FaderControl(Arc>>>); +pub struct FaderControl(swap::Sender>>); impl FaderControl { /// Crossfade to `signal` over `duration`. If a fade is already in progress, it will complete /// before a fading to the new signal begins. If another signal is already waiting for a current /// fade to complete, the waiting signal is replaced. pub fn fade_to(&mut self, signal: T, duration: f32) { - unsafe { - *self.0.pending() = Some(Command { - fade_to: signal, - duration, - }); - } + *self.0.pending() = Some(Command { + fade_to: signal, + duration, + }); self.0.flush() } } diff --git a/src/lib.rs b/src/lib.rs index 1ebb069..b9d9478 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,7 +79,6 @@ pub use smooth::{Interpolate, Smoothed}; pub use spatial::*; pub use speed::{Speed, SpeedControl}; pub use stream::{Stream, StreamControl}; -pub use swap::Swap; pub use tanh::Tanh; /// Unitless instantaneous sound wave amplitude measurement diff --git a/src/spatial.rs b/src/spatial.rs index d361c8d..80c4338 100644 --- a/src/spatial.rs +++ b/src/spatial.rs @@ -5,8 +5,7 @@ use crate::{ math::{add, dot, invert_quat, mix, norm, rotate, scale, sub, Float}, ring::Ring, set::{set, Set, SetHandle}, - swap::Swap, - Sample, Seek, Signal, + swap, Sample, Seek, Signal, }; type ErasedSpatialBuffered = Box + Send>>; @@ -33,19 +32,23 @@ impl SpatialSignalBuffered { velocity: mint::Vector3, max_delay: f32, radius: f32, - ) -> Self { + ) -> (swap::Sender, Self) { let mut queue = Ring::new((max_delay * rate as f32).ceil() as usize + 1); queue.delay( rate, (norm(position.into()) / SPEED_OF_SOUND).min(max_delay), ); - Self { - rate, - max_delay, - common: Common::new(radius, position, velocity), - queue: queue, - inner, - } + let (send, recv) = Common::new(radius, position, velocity); + ( + send, + Self { + rate, + max_delay, + common: recv, + queue, + inner, + }, + ) } } @@ -61,17 +64,21 @@ impl SpatialSignal { position: mint::Point3, velocity: mint::Vector3, radius: f32, - ) -> Self { - Self { - common: Common::new(radius, position, velocity), - inner, - } + ) -> (swap::Sender, Self) { + let (send, recv) = Common::new(radius, position, velocity); + ( + send, + Self { + common: recv, + inner, + }, + ) } } struct Common { radius: f32, - motion: Arc>, + motion: swap::Receiver, state: State, /// How long ago the signal finished, if it did finished_for: Option, @@ -79,23 +86,31 @@ struct Common { } impl Common { - fn new(radius: f32, position: mint::Point3, velocity: mint::Vector3) -> Self { - Self { - radius, - motion: Arc::new(Swap::new(|| Motion { - position, - velocity, - discontinuity: false, - })), - state: State::new(position), - finished_for: None, - stopped: false, - } + fn new( + radius: f32, + position: mint::Point3, + velocity: mint::Vector3, + ) -> (swap::Sender, Self) { + let (send, recv) = swap::swap(|| Motion { + position, + velocity, + discontinuity: false, + }); + ( + send, + Self { + radius, + motion: recv, + state: State::new(position), + finished_for: None, + stopped: false, + }, + ) } } /// Control for updating the motion of a spatial signal -pub struct Spatial(Arc>); +pub struct Spatial(swap::Sender); impl Spatial { /// Update the position and velocity of the signal @@ -115,20 +130,18 @@ impl Spatial { velocity: mint::Vector3, discontinuity: bool, ) { - unsafe { - *self.0.pending() = Motion { - position, - velocity, - discontinuity, - }; - } + *self.0.pending() = Motion { + position, + velocity, + discontinuity, + }; self.0.flush(); } } /// [`Signal`] for stereo output from a spatial scene pub struct SpatialScene { - rot: Arc>>, + rot: swap::Receiver>, recv_buffered: Set, recv: Set, } @@ -140,17 +153,17 @@ impl SpatialScene { pub fn new() -> (SpatialSceneControl, Self) { let (seek_handle, seek_set) = set(); let (buffered_handle, buffered_set) = set(); - let rot = Arc::new(Swap::new(|| mint::Quaternion { + let (rot_send, rot_recv) = swap::swap(|| mint::Quaternion { s: 1.0, v: [0.0; 3].into(), - })); + }); let control = SpatialSceneControl { - rot: rot.clone(), + rot: rot_send, seek: seek_handle, buffered: buffered_handle, }; let signal = SpatialScene { - rot, + rot: rot_recv, recv_buffered: buffered_set, recv: seek_set, }; @@ -177,7 +190,7 @@ fn walk_set( let prev_position; let next_position; - unsafe { + { // Compute the signal's smoothed start/end positions over the sampled period // TODO: Use historical positions let state = &mut common.state; @@ -236,7 +249,7 @@ fn walk_set( /// Control for modifying a [`SpatialSceneSignal`] pub struct SpatialSceneControl { - rot: Arc>>, + rot: swap::Sender>, seek: SetHandle, buffered: SetHandle, } @@ -260,13 +273,10 @@ impl SpatialSceneControl { where S: Seek + Send + 'static, { - let signal = Box::new(SpatialSignal::new( - signal, - options.position, - options.velocity, - options.radius, - )); - let handle = Spatial(signal.common.motion.clone()); + let (send, recv) = + SpatialSignal::new(signal, options.position, options.velocity, options.radius); + let signal = Box::new(recv); + let handle = Spatial(send); self.seek.insert(signal); handle } @@ -292,15 +302,16 @@ impl SpatialSceneControl { where S: Signal + Send + 'static, { - let signal = Box::new(SpatialSignalBuffered::new( + let (send, recv) = SpatialSignalBuffered::new( rate, signal, options.position, options.velocity, max_distance / SPEED_OF_SOUND + buffer_duration, options.radius, - )); - let handle = Spatial(signal.common.motion.clone()); + ); + let signal = Box::new(recv); + let handle = Spatial(send); self.buffered.insert(signal); handle } @@ -310,9 +321,7 @@ impl SpatialSceneControl { /// An unrotated listener faces -Z, with +X to the right and +Y up. pub fn set_listener_rotation(&mut self, rotation: mint::Quaternion) { let signal_rotation = invert_quat(&rotation); - unsafe { - *self.rot.pending() = signal_rotation; - } + *self.rot.pending() = signal_rotation; self.rot.flush(); } } @@ -347,7 +356,7 @@ impl Signal for SpatialScene { set.update(); // Update listener rotation - let (prev_rot, rot) = unsafe { + let (prev_rot, rot) = { let prev = *self.rot.received(); self.rot.refresh(); (prev, *self.rot.received()) diff --git a/src/swap.rs b/src/swap.rs index 206b5cf..28c7694 100644 --- a/src/swap.rs +++ b/src/swap.rs @@ -3,101 +3,103 @@ use core::{ sync::atomic::{AtomicUsize, Ordering}, }; +use alloc::sync::Arc; + /// SPSC queue that only retains the last element sent /// /// Useful for custom controllable signals. -pub struct Swap { - slots: [UnsafeCell; 3], - send: Cell, - shared: AtomicUsize, - recv: Cell, +pub fn swap(mut init: impl FnMut() -> T) -> (Sender, Receiver) { + let shared = Arc::new(Shared { + slots: [ + UnsafeCell::new(init()), + UnsafeCell::new(init()), + UnsafeCell::new(init()), + ], + index: AtomicUsize::new(1), + }); + ( + Sender { + index: 0, + shared: shared.clone(), + }, + Receiver { index: 2, shared }, + ) } -impl Swap { - /// Create a channel initially holding `x` - pub fn new(mut init: impl FnMut() -> T) -> Self { - Self { - slots: [ - UnsafeCell::new(init()), - UnsafeCell::new(init()), - UnsafeCell::new(init()), - ], - send: Cell::new(0), - shared: AtomicUsize::new(1), - recv: Cell::new(2), - } - } +pub struct Sender { + index: usize, + shared: Arc>, +} - /// Access the value that will be sent next. Producer only. - pub fn pending(&self) -> *mut T { - self.slots[self.send.get()].get() +impl Sender { + /// Access the value that will be sent next + pub fn pending(&mut self) -> &mut T { + unsafe { &mut *self.shared.slots[self.index].get() } } - /// Send the value from `pending`. Producer only. - pub fn flush(&self) { - self.send.set( - self.shared - .swap(self.send.get() | FRESH_BIT, Ordering::AcqRel) - & INDEX_MASK, - ); + /// Send the value from `pending` + pub fn flush(&mut self) { + self.index = self + .shared + .index + .swap(self.index | FRESH_BIT, Ordering::AcqRel) + & INDEX_MASK; } +} +pub struct Receiver { + index: usize, + shared: Arc>, +} + +impl Receiver { /// Update the value exposed by `recv`. Returns whether new data was obtained. Consumer only. - pub fn refresh(&self) -> bool { - if self.shared.load(Ordering::Relaxed) & FRESH_BIT == 0 { + pub fn refresh(&mut self) -> bool { + if self.shared.index.load(Ordering::Relaxed) & FRESH_BIT == 0 { return false; } - self.recv - .set(self.shared.swap(self.recv.get(), Ordering::AcqRel) & INDEX_MASK); + self.index = self.shared.index.swap(self.index, Ordering::AcqRel) & INDEX_MASK; true } /// Access the most recent data as of the last `refresh` call. Consumer only. - pub fn received(&self) -> *mut T { - self.slots[self.recv.get()].get() + pub fn received(&mut self) -> &mut T { + unsafe { &mut *self.shared.slots[self.index].get() } } } -impl Default for Swap { - fn default() -> Self { - Self { - slots: Default::default(), - send: Cell::new(0), - shared: AtomicUsize::new(1), - recv: Cell::new(2), - } - } +struct Shared { + slots: [UnsafeCell; 3], + index: AtomicUsize, } -unsafe impl Send for Swap {} -unsafe impl Sync for Swap {} +unsafe impl Send for Shared {} +unsafe impl Sync for Shared {} const FRESH_BIT: usize = 0b100; const INDEX_MASK: usize = 0b011; #[cfg(test)] mod tests { - use super::Swap; + use super::*; #[test] fn smoke() { - let s = Swap::new(|| 0); - unsafe { - *s.pending() = 1; - assert_eq!(*s.received(), 0); - s.flush(); - assert_eq!(*s.received(), 0); - assert!(s.refresh()); - assert_eq!(*s.received(), 1); - assert!(!s.refresh()); - assert_eq!(*s.received(), 1); - *s.pending() = 2; - assert!(!s.refresh()); - assert_eq!(*s.received(), 1); - s.flush(); - assert_eq!(*s.received(), 1); - assert!(s.refresh()); - assert_eq!(*s.received(), 2); - } + let (mut s, mut r) = swap(|| 0); + *s.pending() = 1; + assert_eq!(*r.received(), 0); + s.flush(); + assert_eq!(*r.received(), 0); + assert!(r.refresh()); + assert_eq!(*r.received(), 1); + assert!(!r.refresh()); + assert_eq!(*r.received(), 1); + *s.pending() = 2; + assert!(!r.refresh()); + assert_eq!(*r.received(), 1); + s.flush(); + assert_eq!(*r.received(), 1); + assert!(r.refresh()); + assert_eq!(*r.received(), 2); } }