Skip to content

Commit

Permalink
batch-size-range
Browse files Browse the repository at this point in the history
  • Loading branch information
pragmaxim committed Jun 11, 2024
1 parent 5fb2c23 commit 4998e20
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 22 deletions.
20 changes: 11 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
## min-batch

![Build status](https://github.com/pragmaxim/min-batch/workflows/Rust/badge.svg)
[![Cargo](https://img.shields.io/crates/v/futures-batch.svg)](https://crates.io/crates/futures-batch)
[![Documentation](https://docs.rs/futures-batch/badge.svg)](https://docs.rs/futures-batch)
[![Cargo](https://img.shields.io/crates/v/min-batch.svg)](https://crates.io/crates/min-batch)
[![Documentation](https://docs.rs/min-batch/badge.svg)](https://docs.rs/min-batch)

An adapter that turns elements into a batch and its minimal size is computed by given closure.
An adapter that turns elements into a batch and its size is computed by given closure.
It is needed for efficient work parallelization so that following tasks running in parallel
are all processing a batch of at least `min_batch_size` to avoid context switching overhead
of cpu intensive workloads. Otherwise we usually need to introduce some kind of publish/subscribe
model with dedicated long-running thread for each consumer, broadcasting messages to them
and establishing back-pressure through [barrier](https://docs.rs/tokio/latest/tokio/sync/struct.Barrier.html).
are all processing a batch of at least `min_batch_size` but optimally to `optimal_batch_size`
to avoid context switching overhead of cpu intensive workloads. Otherwise we usually need to
introduce some kind of publish/subscribe model with dedicated long-running thread for each
consumer, broadcasting messages to them and establishing back-pressure through
[barrier](https://docs.rs/tokio/latest/tokio/sync/struct.Barrier.html).

## Usage

Expand All @@ -28,14 +29,15 @@ struct BlockOfTxs {
#[tokio::main]
async fn main() {
let mut block_names: Vec<char> = vec!['a', 'b', 'c', 'd'];
let min_match_size = 3;
let min_batch_size = 2;
let optimal_batch_size = 3;
let batches: Vec<Vec<BlockOfTxs>> =
stream::iter(1..=4)
.map(|x| BlockOfTxs {
name: block_names[x - 1],
txs_count: x,
})
.min_batch(min_match_size, |block: &BlockOfTxs| block.txs_count)
.min_batch(min_batch_size, optimal_batch_size, |block: &BlockOfTxs| block.txs_count)
.collect()
.await;

Expand Down
2 changes: 1 addition & 1 deletion benches/min_batch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use min_batch::MinBatchExt;
use tokio::runtime::Runtime;

async fn batch(stream: impl Stream<Item = i32>) {
let _ = stream.min_batch(1000, |i| *i as usize);
let _ = stream.min_batch(50, 1000, |i| *i as usize);
}

fn criterion_benchmark(c: &mut Criterion) {
Expand Down
37 changes: 25 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
//! #[tokio::main]
//! async fn main() {
//! let mut block_names: Vec<char> = vec!['a', 'b', 'c', 'd'];
//! let min_match_size = 3;
//! let min_batch_size = 2;
//! let optimal_batch_size = 3;
//! let batches: Vec<Vec<BlockOfTxs>> = stream::iter(1..=4)
//! .map(|x| BlockOfTxs {
//! name: block_names[x - 1],
//! txs_count: x,
//! })
//! .min_batch(min_match_size, |block: &BlockOfTxs| block.txs_count)
//! .min_batch(min_batch_size, optimal_batch_size, |block: &BlockOfTxs| block.txs_count)
//! .collect()
//! .await;
//!
Expand Down Expand Up @@ -91,6 +92,7 @@ pin_project! {
current_batch_size: usize,
items: Vec<S::Item>,
min_batch_size: usize,
optimal_batch_size: usize,
count_fn: F,
}
}
Expand All @@ -99,12 +101,13 @@ where
S: Stream<Item = T>,
F: Fn(&T) -> usize,
{
pub fn new(stream: S, batch_size: usize, count_fn: F) -> Self {
pub fn new(stream: S, min_batch_size: usize, optimal_batch_size: usize, count_fn: F) -> Self {
MinBatch {
stream: stream.fuse(),
current_batch_size: 0,
items: Vec::with_capacity(batch_size),
min_batch_size: batch_size,
items: Vec::with_capacity(min_batch_size),
min_batch_size,
optimal_batch_size,
count_fn,
}
}
Expand All @@ -122,7 +125,12 @@ where
loop {
match me.stream.as_mut().poll_next(cx) {
Poll::Pending => {
return Poll::Pending;
if !me.items.is_empty() && *me.current_batch_size >= *me.min_batch_size {
*me.current_batch_size = 0;
return Poll::Ready(Some(std::mem::take(me.items)));
} else {
return Poll::Pending;
}
}
Poll::Ready(Some(item)) => {
if me.items.is_empty() {
Expand All @@ -131,7 +139,7 @@ where
let new_count = (me.count_fn)(&item);
me.items.push(item);
*me.current_batch_size += new_count;
if me.current_batch_size >= me.min_batch_size {
if me.current_batch_size >= me.optimal_batch_size {
*me.current_batch_size = 0;
return Poll::Ready(Some(std::mem::take(me.items)));
}
Expand All @@ -151,12 +159,17 @@ where
}

pub trait MinBatchExt: Stream {
fn min_batch<F>(self, min_batch_size: usize, count_fn: F) -> MinBatch<Self, F, Self::Item>
fn min_batch<F>(
self,
min_batch_size: usize,
optimal_batch_size: usize,
count_fn: F,
) -> MinBatch<Self, F, Self::Item>
where
Self: Sized,
F: Fn(&Self::Item) -> usize,
{
MinBatch::new(self, min_batch_size, count_fn)
MinBatch::new(self, min_batch_size, optimal_batch_size, count_fn)
}
}

Expand Down Expand Up @@ -189,7 +202,7 @@ mod tests {
.map(|_| queue.pop_front().unwrap())
.collect::<Vec<char>>()
})
.min_batch(3, |xs: &Vec<char>| xs.len())
.min_batch(3, 3, |xs: &Vec<char>| xs.len())
.collect()
.await;

Expand All @@ -216,7 +229,7 @@ mod tests {
.map(|_| queue.pop_front().unwrap())
.collect::<Vec<char>>()
})
.min_batch(3, |xs: &Vec<char>| xs.len())
.min_batch(3, 3, |xs: &Vec<char>| xs.len())
.collect()
.await;

Expand All @@ -234,7 +247,7 @@ mod tests {
name: queue.pop_front().unwrap(),
txs_count: if x >= 4 { 1 } else { x },
})
.min_batch(3, |block: &BlockOfTxs| block.txs_count)
.min_batch(3, 3, |block: &BlockOfTxs| block.txs_count)
.collect()
.await;

Expand Down

0 comments on commit 4998e20

Please sign in to comment.