diff --git a/Cargo.lock b/Cargo.lock index 0cb5efd24..2863e13b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5058,6 +5058,7 @@ name = "zenoh" version = "1.0.0-dev" dependencies = [ "ahash", + "async-channel 2.3.1", "async-trait", "bytes", "flume", diff --git a/Cargo.toml b/Cargo.toml index de23d2eea..dca6ce200 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ async-global-executor = "2.4.1" async-io = "2.3.4" async-std = { version = "1.6.5", features = ["tokio1"] } async-trait = "0.1.82" +async-channel = "2.3.1" base64 = "0.22.1" bincode = "1.3.3" bytes = "1.7.1" diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 6a998b296..b5f1e8a3f 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -68,6 +68,7 @@ unstable = ["internal_config", "zenoh-keyexpr/unstable", "zenoh-config/unstable" internal_config = [] [dependencies] +async-channel = { workspace = true } tokio = { workspace = true, features = ["rt", "macros", "time"] } tokio-util = { workspace = true } ahash = { workspace = true } diff --git a/zenoh/src/api/builders/close.rs b/zenoh/src/api/builders/close.rs index 63011e488..f7244d9f9 100644 --- a/zenoh/src/api/builders/close.rs +++ b/zenoh/src/api/builders/close.rs @@ -131,10 +131,10 @@ impl IntoFuture for BackgroundCloseBuilder { fn into_future(self) -> Self::IntoFuture { Box::pin( async move { - let (tx, rx) = flume::bounded::(1); + let (tx, rx) = async_channel::bounded::(1); ZRuntime::Application.spawn(async move { - tx.send_async(self.inner.await) + tx.send(self.inner.await) .await .expect("BackgroundCloseBuilder: critical error sending the result") }); @@ -148,12 +148,12 @@ impl IntoFuture for BackgroundCloseBuilder { #[cfg(all(feature = "unstable", feature = "internal"))] #[doc(hidden)] pub struct NolocalJoinHandle { - rx: flume::Receiver, + rx: async_channel::Receiver, } #[cfg(all(feature = "unstable", feature = "internal"))] impl NolocalJoinHandle { - fn new(rx: flume::Receiver) -> Self { + fn new(rx: async_channel::Receiver) -> Self { Self { rx } } } @@ -167,7 +167,7 @@ impl Resolvable for NolocalJoinHandle { impl Wait for NolocalJoinHandle { fn wait(self) -> Self::To { self.rx - .recv() + .recv_blocking() .expect("NolocalJoinHandle: critical error receiving the result") } } @@ -181,7 +181,7 @@ impl IntoFuture for NolocalJoinHandle { Box::pin( async move { self.rx - .recv_async() + .recv() .await .expect("NolocalJoinHandle: critical error receiving the result") }