Skip to content

Commit

Permalink
Update CHANGELOG.md
Browse files Browse the repository at this point in the history
  • Loading branch information
ar37-rs committed Nov 29, 2021
1 parent e2a6b6e commit 4f862e2
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 12 deletions.
11 changes: 8 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# Changelog

## [1.8.1] - 2021-11-29
- Internal: rename SendAsync to AsyncSuspender
- Internal: add missing field inside impl Debug
- Add tokio_full.rs example.

## [1.8.0] - 2021-11-29
- fix(Flower): polling mutex only if needed inside try_recv(|value| {...}, |result| {...}) which is introduced in version 1.0.0, now everyting's working as expected.
- feat(FlowerHandle): send_async() support can be used from any async runtime.
- feat(Flower): fn is_canceled() to check cancelation added.
- Fix(Flower): polling mutex only if needed inside try_recv(|value| {...}, |result| {...}) which is introduced in version 1.0.0, now everyting's working as expected.
- Feat(FlowerHandle): send_async() support can be used from any async runtime.
- Feat(Flower): fn is_canceled() to check cancelation added.

## [1.0.0] - 2021-11-28
- Improvement(Flower): instead of polling the Mutex over and over, poll the mutex inside fn try_recv(|value| {...}, |result| {...}) only if needed.
Expand Down
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "flowync"
version = "1.8.0"
version = "1.8.1"
authors = ["Ar37-rs <[email protected]>"]
edition = "2018"
description = "A simple utility for multithreading a/synchronization"
Expand All @@ -10,3 +10,6 @@ keywords = ["async", "sync", "std", "multi-thread", "non-blocking"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/Ar37-rs/flowync"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn main() {
// Send current value through channel, will block the spawned thread
// until the option value successfully being polled in the main thread.
handle.send(i);
// or handle.send_async(i).await; can be used from any async runtime,
// or handle.send_async(i).await; can be used from any multithreaded async runtime,
// it won't block the other async operations.

// // Return error if the job is failure, for example:
Expand Down
69 changes: 69 additions & 0 deletions examples/tokio_full.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// [dependencies]
// # Make sure to enable tokio "full" features (multithreaded support) like so:
// tokio = { version = "1", features = ["full"] }

use flowync::Flower;
use std::{io::Error, time::Instant};

#[tokio::main]
async fn main() {
let instant: Instant = Instant::now();
let flower: Flower<String, u32> = Flower::new(1);
tokio::spawn({
let handle = flower.handle();
async move {
let id = handle.id();
let result =
Ok::<String, Error>(format!("the flower with id: {} is flowing", id).into());

match result {
Ok(value) => {
// Send current flower progress.
handle.send_async(value).await;
}
Err(e) => {
// Return error immediately if something not right, for example:
return handle.err(e.to_string());
}
}

// Explicit Cancelation example:
// Check if the current flower should be canceled
if handle.should_cancel() {
let value = format!("canceling the flower with id: {}", id);
handle.send_async(value).await;
return handle.err(format!("the flower with id: {} canceled", id));
}

return handle.ok(instant.elapsed().subsec_micros());
}
});

let mut done = false;

loop {
flower.try_recv(
|value| {
println!("{}\n", value);
},
|result| {
match result {
Ok(elapsed) => println!(
"the flower with id: {} finished in: {:?} microseconds \n",
flower.id(),
elapsed
),
Err(e) => println!("{}", e),
}
done = true;
},
);

// Cancel if need to
// flower.cancel();

if done {
break;
}
}
}
1 change: 1 addition & 0 deletions examples/vectored_flowers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ fn main() {
}

// Explicit cancelation example:
// Check if the current flower should be canceled
if handle.should_cancel() {
let value = format!("canceling the flower with id: {}", id);
handle.send(value);
Expand Down
1 change: 1 addition & 0 deletions examples/vectored_leapers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ fn main() {
std::thread::sleep(sleep_dur);

// Explicit cancelation example:
// Check if the current flower should be canceled
if handle.should_cancel() {
return handle.err(format!("the leaper with id: {} canceled", id));
}
Expand Down
16 changes: 9 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ where
/// // Send current value through channel, will block the spawned thread
/// // until the option value successfully being polled in the main thread.
/// handle.send(i);
/// // or handle.send_async(i).await; can be used from any async runtime,
/// // or handle.send_async(i).await; can be used from any multithreaded async runtime,
/// // it won't block the other async operations.
///
/// // // Return error if the job is failure, for example:
Expand Down Expand Up @@ -223,6 +223,7 @@ where
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Flower")
.field("state", &self.state)
.field("awaiting", &self.awaiting)
.field("id", &self.id)
.finish()
}
Expand Down Expand Up @@ -302,7 +303,7 @@ where
};

if pending {
SendAsync {
AsyncSuspender {
awaiting: self.awaiting.clone(),
}
.await;
Expand Down Expand Up @@ -338,23 +339,23 @@ where
}
}

struct SendAsync {
struct AsyncSuspender {
awaiting: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
}

impl Future for SendAsync {
impl Future for AsyncSuspender {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.awaiting.0.lock() {
Ok(mut waker) => {
if !self.awaiting.1.load(Ordering::Relaxed) {
Poll::Ready(())
} else {
*waker = Some(_cx.waker().clone());
*waker = Some(cx.waker().clone());
Poll::Pending
}
}
// Rrevent blocking if something not ok with rust std mutex.
// Prevent blocking if something not ok with rust std mutex.
_ => Poll::Ready(()),
}
}
Expand Down Expand Up @@ -399,6 +400,7 @@ where
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("FlowerHandle")
.field("state", &self.state)
.field("awaiting", &self.awaiting)
.field("id", &self.id)
.finish()
}
Expand Down

0 comments on commit 4f862e2

Please sign in to comment.