diff --git a/CHANGELOG.md b/CHANGELOG.md index d805a4c..f853920 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,14 @@ # Changelog +## [1.8.1] - 2021-11-29 +- Internal: rename SendAsync to AsyncSuspender +- Internal: add missing field inside impl Debug +- Add tokio_full.rs example. + ## [1.8.0] - 2021-11-29 -- fix(Flower): polling mutex only if needed inside try_recv(|value| {...}, |result| {...}) which is introduced in version 1.0.0, now everyting's working as expected. -- feat(FlowerHandle): send_async() support can be used from any async runtime. -- feat(Flower): fn is_canceled() to check cancelation added. +- Fix(Flower): polling mutex only if needed inside try_recv(|value| {...}, |result| {...}) which is introduced in version 1.0.0, now everyting's working as expected. +- Feat(FlowerHandle): send_async() support can be used from any async runtime. +- Feat(Flower): fn is_canceled() to check cancelation added. ## [1.0.0] - 2021-11-28 - Improvement(Flower): instead of polling the Mutex over and over, poll the mutex inside fn try_recv(|value| {...}, |result| {...}) only if needed. diff --git a/Cargo.toml b/Cargo.toml index 09a45b7..2ea5228 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "flowync" -version = "1.8.0" +version = "1.8.1" authors = ["Ar37-rs "] edition = "2018" description = "A simple utility for multithreading a/synchronization" @@ -10,3 +10,6 @@ keywords = ["async", "sync", "std", "multi-thread", "non-blocking"] license = "MIT OR Apache-2.0" repository = "https://github.com/Ar37-rs/flowync" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dev-dependencies] +tokio = { version = "1", features = ["full"] } \ No newline at end of file diff --git a/README.md b/README.md index 2b0e5f5..2f94950 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ fn main() { // Send current value through channel, will block the spawned thread // until the option value successfully being polled in the main thread. handle.send(i); - // or handle.send_async(i).await; can be used from any async runtime, + // or handle.send_async(i).await; can be used from any multithreaded async runtime, // it won't block the other async operations. // // Return error if the job is failure, for example: diff --git a/examples/tokio_full.rs b/examples/tokio_full.rs new file mode 100644 index 0000000..2e87f81 --- /dev/null +++ b/examples/tokio_full.rs @@ -0,0 +1,69 @@ +// [dependencies] +// # Make sure to enable tokio "full" features (multithreaded support) like so: +// tokio = { version = "1", features = ["full"] } + +use flowync::Flower; +use std::{io::Error, time::Instant}; + +#[tokio::main] +async fn main() { + let instant: Instant = Instant::now(); + let flower: Flower = Flower::new(1); + tokio::spawn({ + let handle = flower.handle(); + async move { + let id = handle.id(); + let result = + Ok::(format!("the flower with id: {} is flowing", id).into()); + + match result { + Ok(value) => { + // Send current flower progress. + handle.send_async(value).await; + } + Err(e) => { + // Return error immediately if something not right, for example: + return handle.err(e.to_string()); + } + } + + // Explicit Cancelation example: + // Check if the current flower should be canceled + if handle.should_cancel() { + let value = format!("canceling the flower with id: {}", id); + handle.send_async(value).await; + return handle.err(format!("the flower with id: {} canceled", id)); + } + + return handle.ok(instant.elapsed().subsec_micros()); + } + }); + + let mut done = false; + + loop { + flower.try_recv( + |value| { + println!("{}\n", value); + }, + |result| { + match result { + Ok(elapsed) => println!( + "the flower with id: {} finished in: {:?} microseconds \n", + flower.id(), + elapsed + ), + Err(e) => println!("{}", e), + } + done = true; + }, + ); + + // Cancel if need to + // flower.cancel(); + + if done { + break; + } + } +} diff --git a/examples/vectored_flowers.rs b/examples/vectored_flowers.rs index a4372bf..68aab85 100644 --- a/examples/vectored_flowers.rs +++ b/examples/vectored_flowers.rs @@ -36,6 +36,7 @@ fn main() { } // Explicit cancelation example: + // Check if the current flower should be canceled if handle.should_cancel() { let value = format!("canceling the flower with id: {}", id); handle.send(value); diff --git a/examples/vectored_leapers.rs b/examples/vectored_leapers.rs index 3e27148..e3b862a 100644 --- a/examples/vectored_leapers.rs +++ b/examples/vectored_leapers.rs @@ -20,6 +20,7 @@ fn main() { std::thread::sleep(sleep_dur); // Explicit cancelation example: + // Check if the current flower should be canceled if handle.should_cancel() { return handle.err(format!("the leaper with id: {} canceled", id)); } diff --git a/src/lib.rs b/src/lib.rs index 6eb8e0b..d051748 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,7 +70,7 @@ where /// // Send current value through channel, will block the spawned thread /// // until the option value successfully being polled in the main thread. /// handle.send(i); -/// // or handle.send_async(i).await; can be used from any async runtime, +/// // or handle.send_async(i).await; can be used from any multithreaded async runtime, /// // it won't block the other async operations. /// /// // // Return error if the job is failure, for example: @@ -223,6 +223,7 @@ where fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("Flower") .field("state", &self.state) + .field("awaiting", &self.awaiting) .field("id", &self.id) .finish() } @@ -302,7 +303,7 @@ where }; if pending { - SendAsync { + AsyncSuspender { awaiting: self.awaiting.clone(), } .await; @@ -338,23 +339,23 @@ where } } -struct SendAsync { +struct AsyncSuspender { awaiting: Arc<(Mutex>, AtomicBool)>, } -impl Future for SendAsync { +impl Future for AsyncSuspender { type Output = (); - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.awaiting.0.lock() { Ok(mut waker) => { if !self.awaiting.1.load(Ordering::Relaxed) { Poll::Ready(()) } else { - *waker = Some(_cx.waker().clone()); + *waker = Some(cx.waker().clone()); Poll::Pending } } - // Rrevent blocking if something not ok with rust std mutex. + // Prevent blocking if something not ok with rust std mutex. _ => Poll::Ready(()), } } @@ -399,6 +400,7 @@ where fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("FlowerHandle") .field("state", &self.state) + .field("awaiting", &self.awaiting) .field("id", &self.id) .finish() }