Skip to content

Commit

Permalink
do not use flume channel s it is not nolocal
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Dec 5, 2024
1 parent af46792 commit 82bf8f1
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ async-global-executor = "2.4.1"
async-io = "2.3.4"
async-std = { version = "1.6.5", features = ["tokio1"] }
async-trait = "0.1.82"
async-channel = "2.3.1"
base64 = "0.22.1"
bincode = "1.3.3"
bytes = "1.7.1"
Expand Down
1 change: 1 addition & 0 deletions zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ unstable = ["internal_config", "zenoh-keyexpr/unstable", "zenoh-config/unstable"
internal_config = []

[dependencies]
async-channel = { workspace = true }
tokio = { workspace = true, features = ["rt", "macros", "time"] }
tokio-util = { workspace = true }
ahash = { workspace = true }
Expand Down
12 changes: 6 additions & 6 deletions zenoh/src/api/builders/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ impl<TOutput: Send + 'static> IntoFuture for BackgroundCloseBuilder<TOutput> {
fn into_future(self) -> Self::IntoFuture {
Box::pin(
async move {
let (tx, rx) = flume::bounded::<TOutput>(1);
let (tx, rx) = async_channel::bounded::<TOutput>(1);

ZRuntime::Application.spawn(async move {
tx.send_async(self.inner.await)
tx.send(self.inner.await)
.await
.expect("BackgroundCloseBuilder: critical error sending the result")
});
Expand All @@ -148,12 +148,12 @@ impl<TOutput: Send + 'static> IntoFuture for BackgroundCloseBuilder<TOutput> {
#[cfg(all(feature = "unstable", feature = "internal"))]
#[doc(hidden)]
pub struct NolocalJoinHandle<TOutput: Send + 'static> {
rx: flume::Receiver<TOutput>,
rx: async_channel::Receiver<TOutput>,
}

#[cfg(all(feature = "unstable", feature = "internal"))]
impl<TOutput: Send + 'static> NolocalJoinHandle<TOutput> {
fn new(rx: flume::Receiver<TOutput>) -> Self {
fn new(rx: async_channel::Receiver<TOutput>) -> Self {
Self { rx }
}
}
Expand All @@ -167,7 +167,7 @@ impl<TOutput: Send + 'static> Resolvable for NolocalJoinHandle<TOutput> {
impl<TOutput: Send + 'static> Wait for NolocalJoinHandle<TOutput> {
fn wait(self) -> Self::To {
self.rx
.recv()
.recv_blocking()
.expect("NolocalJoinHandle: critical error receiving the result")
}
}
Expand All @@ -181,7 +181,7 @@ impl<TOutput: Send + 'static> IntoFuture for NolocalJoinHandle<TOutput> {
Box::pin(
async move {
self.rx
.recv_async()
.recv()
.await
.expect("NolocalJoinHandle: critical error receiving the result")
}
Expand Down

0 comments on commit 82bf8f1

Please sign in to comment.