diff --git a/Cargo.lock b/Cargo.lock index 54b81fff6c..f591bb74ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2773,11 +2773,11 @@ name = "iroh-gossip" version = "0.21.0" dependencies = [ "anyhow", + "async-channel", "bytes", "clap", "derive_more", "ed25519-dalek", - "flume", "futures-lite 2.3.0", "futures-util", "genawaiter", diff --git a/iroh-gossip/Cargo.toml b/iroh-gossip/Cargo.toml index 4dda1173a7..f521e41f19 100644 --- a/iroh-gossip/Cargo.toml +++ b/iroh-gossip/Cargo.toml @@ -38,8 +38,8 @@ tokio-util = { version = "0.7.8", optional = true, features = ["codec"] } genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] } # dispatcher dependencies (optional) +async-channel = { version = "2.3.1", optional = true } futures-util = { version = "0.3.30", optional = true } -flume = { version = "0.11", optional = true } [dev-dependencies] clap = { version = "4", features = ["derive"] } @@ -51,7 +51,7 @@ url = "2.4.0" [features] default = ["net", "dispatcher"] net = ["dep:futures-lite", "dep:iroh-net", "dep:tokio", "dep:tokio-util"] -dispatcher = ["dep:flume", "dep:futures-util"] +dispatcher = ["dep:async-channel", "dep:futures-util"] [[example]] name = "chat" diff --git a/iroh-gossip/src/dispatcher.rs b/iroh-gossip/src/dispatcher.rs index df084e73d4..e724741ff2 100644 --- a/iroh-gossip/src/dispatcher.rs +++ b/iroh-gossip/src/dispatcher.rs @@ -10,6 +10,7 @@ use crate::{ proto::{DeliveryScope, TopicId}, }; use bytes::Bytes; +use futures_lite::StreamExt; use futures_util::Stream; use iroh_base::rpc::{RpcError, RpcResult}; use iroh_net::{key::PublicKey, util::AbortingJoinHandle, NodeId}; @@ -106,7 +107,7 @@ struct State { /// Type alias for a stream of gossip updates, so we don't have to repeat all the bounds. type CommandStream = Box + Send + Sync + Unpin + 'static>; /// Type alias for a sink of gossip events. -type EventSink = flume::Sender>; +type EventSink = async_channel::Sender>; #[derive(derive_more::Debug)] enum TopicState { @@ -214,7 +215,7 @@ impl GossipDispatcher { /// This will not wait until the sink is full, but send a `Lagged` response if the sink is almost full. fn try_send(send: &EventSink, event: &IrohGossipEvent) -> bool { // If the stream is disconnected, we don't need to send to it. - if send.is_disconnected() { + if send.is_closed() { return false; } // Check if the send buffer is almost full, and send a lagged response if it is. @@ -234,7 +235,6 @@ impl GossipDispatcher { /// /// This should not fail unless the gossip instance is faulty. async fn dispatch_loop(mut self) -> anyhow::Result<()> { - use futures_lite::stream::StreamExt; let stream = self.gossip.clone().subscribe_all(); tokio::pin!(stream); while let Some(item) = stream.next().await { @@ -306,7 +306,6 @@ impl GossipDispatcher { topic: TopicId, mut updates: CommandStream, ) -> anyhow::Result<()> { - use futures_lite::stream::StreamExt; while let Some(update) = Pin::new(&mut updates).next().await { match update { Command::Broadcast(msg) => { @@ -404,7 +403,7 @@ impl GossipDispatcher { let mut update_tasks = vec![]; for (updates, event_sink) in waiting { // if the stream is disconnected, we don't need to keep it and start the update task - if event_sink.is_disconnected() { + if event_sink.is_closed() { continue; } event_sinks.push(event_sink); @@ -438,9 +437,9 @@ impl GossipDispatcher { topic: TopicId, options: SubscribeOptions, updates: CommandStream, - ) -> impl Stream> { + ) -> impl Stream> + Unpin { let mut inner = self.inner.lock().unwrap(); - let (send, recv) = flume::bounded(options.subscription_capacity); + let (send, recv) = async_channel::bounded(options.subscription_capacity); match inner.current_subscriptions.entry(topic) { Entry::Vacant(entry) => { // There is no existing subscription, so we need to start a new one. @@ -490,7 +489,7 @@ impl GossipDispatcher { } } } - recv.into_stream() + recv.boxed() } }