diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 0000000..c4468fe --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1 @@ +github: pragmaxim diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..c11601f --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,8 @@ +version: 2 +updates: +- package-ecosystem: cargo + directory: "/" + schedule: + interval: daily + time: "04:00" + open-pull-requests-limit: 10 diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml new file mode 100644 index 0000000..0e9e362 --- /dev/null +++ b/.github/workflows/bench.yml @@ -0,0 +1,15 @@ +on: + - pull_request + - workflow_dispatch + +name: criterion benchmark +jobs: + runBenchmark: + name: run benchmark + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: boa-dev/criterion-compare-action@v3 + with: + # Use the base branch name as the branch to compare with + branchName: ${{ github.base_ref }} diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..0a440f9 --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,53 @@ +name: Rust + +on: + pull_request: + push: + branches: + - master + workflow_dispatch: + schedule: + - cron: "00 04 * * *" + +jobs: + ci: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest, macOS-latest] + rust: + - stable + - beta + - nightly + steps: + - uses: actions/checkout@v1 + + - name: Install Rust + uses: actions-rs/toolchain/@v1 + with: + profile: minimal + toolchain: ${{ matrix.rust }} + override: true + components: clippy + + - name: Build + uses: actions-rs/cargo@v1 + with: + command: build + + - name: Run Tests + uses: actions-rs/cargo@v1 + with: + command: test + + - name: Audit for Security Vulnerabilities + uses: actions-rs/audit-check@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Generate Docs + uses: actions-rs/cargo@v1 + with: + command: doc + args: --all-features --no-deps diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8847d1b --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +# Generated by Cargo +# will have compiled files and executables +/target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here http://doc.crates.io/guide.html#cargotoml-vs-cargolock +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +.idea +/.vscode/ \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..9c11077 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,29 @@ +[package] +authors = ["Jakub Liska "] +description = "A stream adapter that broadcasts elements into parallel tasks and returns future." +license = "MIT OR Apache-2.0" +name = "broadcast-sink" +version = "0.1.0" +edition = "2021" +repository = "https://github.com/pragmaxim-com/broadcast-sink.rs" + +[lib] +bench = false + +[dependencies] +futures = { version = "0.3", features = ["async-await"] } +tokio = { version = "1.38.0", features = ["full"] } +tokio-stream = { version = "0.1.15", features = ["sync"] } +pin-project-lite = "0.2" + +[dev-dependencies] +tokio = { version = "1.38.0", features = ["full"] } +criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] } +doctest = "0.1" + +[dev-dependencies.doc-comment] +version = "0.3" + +[[bench]] +name = "broadcast_sink_benchmark" +harness = false diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..8dada3e --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b7d7524 --- /dev/null +++ b/Makefile @@ -0,0 +1,26 @@ +# Needed SHELL since I'm using zsh +SHELL := /bin/bash + +.PHONY: help +help: ## This help message + @echo -e "$$(grep -hE '^\S+:.*##' $(MAKEFILE_LIST) | sed -e 's/:.*##\s*/:/' -e 's/^\(.\+\):\(.*\)/\\x1b[36m\1\\x1b[m:\2/' | column -c2 -t -s :)" + +.PHONY: build +build: ## Build Rust code locally + cargo build + +.PHONY: docs +docs: ## Generate and show documentation + cargo doc --open + +.PHONY: test +test: ## Run tests + cargo test + +.PHONY: lint +lint: ## Run linter + cargo clippy --all-targets --all-features -- -D warnings + +.PHONY: publish +publish: ## Publish to crates.io + cargo publish \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..2358ba3 --- /dev/null +++ b/README.md @@ -0,0 +1,69 @@ +## broadcast-sink + +![Build status](https://github.com/pragmaxim-com/broadcast-sink.rs/workflows/Rust/badge.svg) +[![Cargo](https://img.shields.io/crates/v/broadcast-sink.svg)](https://crates.io/crates/broadcast-sink) +[![Documentation](https://docs.rs/broadcast-sink/badge.svg)](https://docs.rs/broadcast-sink) + +A stream adapter that broadcasts each element to consumers which execute it in parallel. +Each consumer is represented by an element-consuming task with back-pressure established through +[barrier](https://docs.rs/tokio/latest/tokio/sync/struct.Barrier.html) so that next element +is polled when last element is processed by all consumers. + +## Usage + +Let's implement `Consumer` interface such that each consumer mutates shared state for each element +in parallel. Note that consumers usually have some `State` or `Database` so they can write to it. + +```rust +use futures::stream; +use std::sync::{Arc, RwLock}; +use broadcast_sink::{Consumer, StreamBroadcastSinkExt}; + +#[derive(Debug)] +struct State { + x: RwLock, + y: RwLock, +} +struct MultiplyX { + state: Arc, +} +impl MultiplyX { + fn new(state: Arc) -> Self { + Self { state } + } +} +impl Consumer for MultiplyX { + fn consume(&self, _: &u64) { + let mut x = self.state.x.write().unwrap(); + *x *= 5; + println!("Consumer X processed item"); + } +} +struct MultiplyY { + state: Arc, +} +impl MultiplyY { + fn new(state: Arc) -> Self { + Self { state } + } +} +impl Consumer for MultiplyY { + fn consume(&self, _: &u64) { + let mut y = self.state.y.write().unwrap(); + *y *= 10; + println!("Consumer Y processed item"); + } +} + +let state = Arc::new(State { + x: RwLock::new(1), + y: RwLock::new(1), +}); +let consumers: Vec>> = vec![ + Arc::new(MultiplyX::new(Arc::clone(&state))), + Arc::new(MultiplyY::new(Arc::clone(&state))), +]; +stream::iter(1..=5).broadcast(100, consumers).await; +assert_eq!(*state.x.read().unwrap(), 3125); +assert_eq!(*state.y.read().unwrap(), 100000); +``` diff --git a/benches/broadcast_sink_benchmark.rs b/benches/broadcast_sink_benchmark.rs new file mode 100644 index 0000000..56dc1d6 --- /dev/null +++ b/benches/broadcast_sink_benchmark.rs @@ -0,0 +1,81 @@ +use broadcast_sink::{Consumer, StreamBroadcastSinkExt}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use futures::stream::{self, Stream}; +use std::sync::{Arc, RwLock}; +use tokio::runtime::Runtime; + +#[derive(Debug)] +struct State { + x: RwLock, + y: RwLock, +} + +struct MultiplyX { + state: Arc, +} + +impl MultiplyX { + fn new(state: Arc) -> Self { + Self { state } + } +} + +impl Consumer for MultiplyX { + fn consume(&self, _: &u64) { + let mut x = self.state.x.write().unwrap(); + *x *= 5; + println!("Consumer 1 processed item"); + } +} + +struct MultiplyY { + state: Arc, +} + +impl MultiplyY { + fn new(state: Arc) -> Self { + Self { state } + } +} + +impl Consumer for MultiplyY { + fn consume(&self, _: &u64) { + let mut y = self.state.y.write().unwrap(); + *y *= 10; + println!("Consumer 2 processed item"); + } +} + +async fn batch(stream: impl Stream) { + let state = Arc::new(State { + x: RwLock::new(1), + y: RwLock::new(1), + }); + + let consumers: Vec>> = vec![ + Arc::new(MultiplyX::new(Arc::clone(&state))), + Arc::new(MultiplyY::new(Arc::clone(&state))), + ]; + + let _ = stream.broadcast(100, consumers); +} + +fn criterion_benchmark(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + let mut group = c.benchmark_group("broadcast_sink"); + for &size in &[10, 100, 1000, 10_000, 100_000] { + group.throughput(Throughput::Bytes(size as u64)); + group.bench_with_input( + BenchmarkId::from_parameter(size), + &size, + |bencher, &size| { + bencher.to_async(&rt).iter(|| batch(stream::iter(0..size))); + }, + ); + } + group.finish(); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/config.backup b/config.backup new file mode 100644 index 0000000..f758764 --- /dev/null +++ b/config.backup @@ -0,0 +1,15 @@ +[core] + repositoryformatversion = 0 + filemode = true + bare = false + logallrefupdates = true +[remote "origin"] + url = git@pragmaxim.github.com:pragmaxim-com/broadcast-sink.rs.git + fetch = +refs/heads/*:refs/remotes/origin/* +[user] + name = pragmaxim + email = pragmaxim@gmail.com + signingkey = pragmaxim@gmail.com +[commit] + gpgsign = true + diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..80d186d --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,188 @@ +#[cfg(test)] +#[macro_use] +extern crate doc_comment; + +#[cfg(test)] +doctest!("../README.md"); + +use core::future::Future; +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::sync::atomic::{AtomicUsize, Ordering}; +use core::task::{Context, Poll}; +use futures::ready; +use futures::{Stream, StreamExt}; +use pin_project_lite::pin_project; +use std::sync::Arc; +use tokio::sync::{broadcast, Barrier}; +use tokio::task; +use tokio_stream::wrappers::BroadcastStream; + +pub trait Consumer: Send + Sync { + fn consume(&self, item: &T); +} + +pin_project! { + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct BroadcastSink + where + St: Stream, + { + #[pin] + stream: St, + #[pin] + tx: broadcast::Sender>, + active_count: Arc, + consumer_count: usize, + #[pin] + _pin: PhantomPinned, + } +} + +impl BroadcastSink +where + St: Stream, + T: Clone + Send + Sync + 'static, +{ + fn new(stream: St, capacity: usize, consumers: Vec>>) -> Self { + let (tx, _rx) = broadcast::channel::>(capacity); + let consumer_count = consumers.len(); + let barrier = Arc::new(Barrier::new(consumer_count)); + let active_count = Arc::new(AtomicUsize::new(0)); + + for consumer in consumers.into_iter() { + let barrier_clone = Arc::clone(&barrier); + let rx = tx.subscribe(); + let active_count_clone = Arc::clone(&active_count); + + task::spawn(async move { + let mut stream = BroadcastStream::new(rx); + while let Some(Ok(item)) = stream.next().await { + consumer.consume(&item); + barrier_clone.wait().await; + active_count_clone.fetch_sub(1, Ordering::SeqCst); + } + }); + } + Self { + stream, + tx, + active_count, + consumer_count, + _pin: PhantomPinned, + } + } +} + +impl Future for BroadcastSink +where + St: Stream, + T: Clone + Send + Sync + 'static, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut me = self.project(); + + loop { + match ready!(me.stream.as_mut().poll_next(cx)) { + Some(item) => { + let next_arc = Arc::new(item); + me.active_count + .fetch_add(*me.consumer_count, Ordering::SeqCst); + let _ = me.tx.send(next_arc); // TODO handle error + } + None => { + let active_count = me.active_count.load(Ordering::SeqCst); + if active_count == 0 { + return Poll::Ready(()); + } + } + }; + } + } +} + +pub trait StreamBroadcastSinkExt: Stream { + fn broadcast( + self, + capacity: usize, + consumers: Vec>>, + ) -> BroadcastSink + where + Self: Sized + Stream, + T: Clone + Send + Sync + 'static, + { + BroadcastSink::new(self, capacity, consumers) + } +} + +impl StreamBroadcastSinkExt for T where T: Stream {} + +#[cfg(test)] +mod tests { + use super::*; + use futures::stream; + use std::sync::RwLock; + + #[derive(Debug)] + struct State { + x: RwLock, + y: RwLock, + } + + struct MultiplyX { + state: Arc, + } + + impl MultiplyX { + fn new(state: Arc) -> Self { + Self { state } + } + } + + impl Consumer for MultiplyX { + fn consume(&self, _: &u64) { + let mut x = self.state.x.write().unwrap(); + *x *= 5; + println!("Consumer X processed item"); + } + } + + struct MultiplyY { + state: Arc, + } + + impl MultiplyY { + fn new(state: Arc) -> Self { + Self { state } + } + } + + impl Consumer for MultiplyY { + fn consume(&self, _: &u64) { + let mut y = self.state.y.write().unwrap(); + *y *= 10; + println!("Consumer Y processed item"); + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_stream_broadcast_ext() { + let state = Arc::new(State { + x: RwLock::new(1), + y: RwLock::new(1), + }); + + let consumers: Vec>> = vec![ + Arc::new(MultiplyX::new(Arc::clone(&state))), + Arc::new(MultiplyY::new(Arc::clone(&state))), + ]; + + stream::iter(1..=5).broadcast(100, consumers).await; + + assert_eq!(*state.x.read().unwrap(), 3125); + assert_eq!(*state.y.read().unwrap(), 100000); + } +}