Skip to content

Commit

Permalink
Redis PubSub, RedisConn methods no longer require mutable, custom bac…
Browse files Browse the repository at this point in the history
…kdoor to underlying redis commands in a batch (#57)

* No more mutable requirement for redis conns, much more ergonomic

* Custom fallbacks to underlying redis interface without having to leave higher level interface

* Redis pubsub

* Error resistant, more tests, lazy clone.
  • Loading branch information
zakstucke authored Aug 19, 2024
1 parent 67948d3 commit b04ca82
Show file tree
Hide file tree
Showing 17 changed files with 1,164 additions and 205 deletions.
10 changes: 5 additions & 5 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ features = ["sync"]
# These are included on top of above features when not wasm:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio]
version = "1"
features = ["time", "fs", "process", "rt", "io-util"]
features = ["time", "fs", "process", "rt", "io-util", "macros"]

[target.'cfg(target_arch = "wasm32")'.dependencies]
tracing-subscriber-wasm = "0.1.0"
Expand All @@ -144,10 +144,10 @@ rstest = "0.18"
criterion = { version = "0.3", features = ["html_reports", "async_tokio"] }
tokio = { version = '1', features = ["full"] }

# When adding new benches, they should be added like this with the name of the file in benches/: (obviously uncommented)
# [[bench]]
# name = "bench_tester"
# harness = false
# When adding new benches, they should be added like this with the name of the file in benches/:
[[bench]]
name = "bench_default"
harness = false

[features]
collector = ["dep:reqwest", "dep:tempfile", "tarball"]
Expand Down
56 changes: 56 additions & 0 deletions rust/benches/bench_default.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#![allow(unused_imports)]
use criterion::{black_box, criterion_group, criterion_main, Criterion};

// <--- EXAMPLE:

fn fibonacci(n: u64) -> u64 {
let mut a = 0;
let mut b = 1;

match n {
0 => b,
_ => {
for _ in 0..n {
let c = a + b;
a = b;
b = c;
}
b
}
}
}

async fn async_fibonacci(n: u64) -> u64 {
fibonacci(n)
}

// SYNC EXAMPLE
pub fn bench_sync(c: &mut Criterion) {
c.bench_function("sync: fib 20", |b| b.iter(|| fibonacci(black_box(20))));
}

// ASYNC EXAMPLE
pub fn bench_async(c: &mut Criterion) {
c.bench_function("async: fib 20", |b| {
b.to_async(&get_tokio_rt())
.iter(|| async_fibonacci(black_box(20)))
});
}

// CUSTOM CONFIG EXAMPLE
pub fn bench_config(c: &mut Criterion) {
let mut group = c.benchmark_group("small-sample-size");
group.sample_size(10).significance_level(0.01);
group.bench_function("config: fib 20", |b| b.iter(|| fibonacci(black_box(20))));
group.finish();
}

criterion_group!(benches, bench_sync, bench_async, bench_config);
criterion_main!(benches);

fn get_tokio_rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
}
25 changes: 0 additions & 25 deletions rust/benches/bench_tester.rs

This file was deleted.

2 changes: 1 addition & 1 deletion rust/bitbazaar/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ mod tests {
// Sleeping after each, to try and ensure the correct debug output:
log.with_tmp_global(|| {
// On windows this needs to be really long to get static record ordering for testing:
let delay = if cfg!(windows) { 100 } else { 10 };
let delay = if cfg!(windows) { 100 } else { 30 };

debug!("BEFORE");
std::thread::sleep(std::time::Duration::from_millis(delay));
Expand Down
98 changes: 98 additions & 0 deletions rust/bitbazaar/misc/lazy_clone.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/// Efficient way to clone an item for each element in an iterator.
/// The final iteration will consume the original item, so no unnecessary clones are made.
pub trait IterWithCloneLazy {
/// The return type of the iterator.
type IterT;

/// Efficient way to pass an owned clone of an item to each element in an iterator.
/// Will pass the final item by value without cloning, so no unnecessary clones are made.
fn with_clone_lazy<ItemT: Clone>(
self,
item: ItemT,
) -> impl Iterator<Item = (ItemT, Self::IterT)>
where
Self: Sized;
}

impl<IterT, I: IntoIterator<Item = IterT>> IterWithCloneLazy for I {
type IterT = IterT;

fn with_clone_lazy<ItemT: Clone>(
self,
item: ItemT,
) -> impl Iterator<Item = (ItemT, Self::IterT)>
where
Self: Sized,
{
let mut iter = self.into_iter();
LazyCloneIter {
item: Some(item),
next_in_iter: iter.next(),
iter,
}
}
}

struct LazyCloneIter<I: Iterator, ItemT: Clone> {
// Will consume when next_in_iter is None, as on last iteration.
item: Option<ItemT>,
iter: I,
next_in_iter: Option<I::Item>,
}

impl<I: Iterator, ItemT: Clone> Iterator for LazyCloneIter<I, ItemT> {
type Item = (ItemT, I::Item);

fn next(&mut self) -> Option<Self::Item> {
self.next_in_iter.take().map(|next| {
self.next_in_iter = self.iter.next();
if self.next_in_iter.is_none() {
(self.item.take().unwrap(), next)
} else {
(self.item.clone().unwrap(), next)
}
})
}
}

#[cfg(test)]
mod tests {
use std::sync::{atomic::AtomicUsize, Arc};

use super::*;

#[test]
fn test_lazy_clone_with_clone_lazy() {
struct Test {
tot_clones: Arc<AtomicUsize>,
}
impl Clone for Test {
fn clone(&self) -> Self {
self.tot_clones
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Test {
tot_clones: self.tot_clones.clone(),
}
}
}

// Try for 0..10 iterator length, main things to check are 0, 1 and >1.
// For all but final iteration, should clone, then pass by value.
for count in 0..10 {
let tot_clones = Arc::new(AtomicUsize::new(0));
let test = Test {
tot_clones: tot_clones.clone(),
};
for (t, index) in (0..count).with_clone_lazy(test) {
assert_eq!(
t.tot_clones.load(std::sync::atomic::Ordering::Relaxed),
if index < count - 1 { index + 1 } else { index }
);
}
assert_eq!(
tot_clones.load(std::sync::atomic::Ordering::Relaxed),
count.max(1) - 1
);
}
}
}
2 changes: 2 additions & 0 deletions rust/bitbazaar/misc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod binary_search;
mod flexi_logger;
mod global_lock;
mod is_tcp_port_listening;
mod lazy_clone;
mod looper;
mod main_wrapper;
mod periodic_updater;
Expand All @@ -26,6 +27,7 @@ pub use binary_search::*;
pub use flexi_logger::*;
pub use global_lock::*;
pub use is_tcp_port_listening::is_tcp_port_listening;
pub use lazy_clone::*;
pub use looper::*;
pub use main_wrapper::*;
pub use periodic_updater::*;
Expand Down
9 changes: 9 additions & 0 deletions rust/bitbazaar/misc/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ impl<'a, E> Retry<'a, E> {
self
}

/// Never stop retrying.
pub fn until_forever(mut self) -> Self {
self.until = RetryUntil::Forever;
self
}

/// Stop retrying after the total delay reaches the given duration.
pub fn until_total_delay(mut self, max_total_delay: Duration) -> Self {
self.until = RetryUntil::TotalDelay(max_total_delay);
Expand Down Expand Up @@ -193,6 +199,8 @@ pub enum RetryUntil {
TotalDelay(Duration),
/// UNSTABLE: ONLY PUBLIC FOR MACRO USE.
Delay(Duration),
/// UNSTABLE: ONLY PUBLIC FOR MACRO USE.
Forever,
}

impl RetryUntil {
Expand Down Expand Up @@ -223,6 +231,7 @@ impl RetryUntil {
return true;
}
}
RetryUntil::Forever => return false,
}
false
}
Expand Down
Loading

0 comments on commit b04ca82

Please sign in to comment.