Skip to content

Latest commit

 

History

History
81 lines (70 loc) · 2.24 KB

README.md

File metadata and controls

81 lines (70 loc) · 2.24 KB

broadcast-sink

Build status Cargo Documentation

A stream adapter that broadcasts each element to consumers which execute it in parallel. Each consumer is represented by an element-consuming task with back-pressure established through barrier so that next element is polled when last element is processed by all consumers.

Usage

Let's implement Consumer interface such that each consumer mutates shared state for each element in parallel. Note that consumers usually have some State or Database so they can write to it.

use futures::stream;
use std::sync::{Arc, RwLock};
use tokio::sync::Mutex;
use broadcast_sink::{Consumer, StreamBroadcastSinkExt};

#[derive(Debug)]
struct State {
    x: RwLock<u64>,
    y: RwLock<u64>,
}
struct MultiplyX {
    state: Arc<State>,
}
impl MultiplyX {
    fn new(state: Arc<State>) -> Self {
        Self { state }
    }
}
impl Consumer<u64> for MultiplyX {
    fn consume(&self, _: &u64) {
        let mut x = self.state.x.write().unwrap();
        *x *= 5;
        println!("Consumer X processed item");
    }
}
struct MultiplyY {
    state: Arc<State>,
}
impl MultiplyY {
    fn new(state: Arc<State>) -> Self {
        Self { state }
    }
}
impl Consumer<u64> for MultiplyY {
    fn consume(&self, _: &u64) {
        let mut y = self.state.y.write().unwrap();
        *y *= 10;
        println!("Consumer Y processed item");
    }
}

let state = Arc::new(State {
    x: RwLock::new(1),
    y: RwLock::new(1),
});

let consumers = stream::iter(1..=5)
    .broadcast(
        100,
        vec![
            Arc::new(Mutex::new(MultiplyX::new(Arc::clone(&state)))),
            Arc::new(Mutex::new(MultiplyY::new(Arc::clone(&state)))),
        ],
    )
    .await;

assert_eq!(*state.x.read().unwrap(), 3125);
assert_eq!(*state.y.read().unwrap(), 100000);

stream::iter(1..=5).broadcast(100, consumers).await;

assert_eq!(*state.x.read().unwrap(), 9765625);
assert_eq!(*state.y.read().unwrap(), 10000000000);