Skip to content

Commit

Permalink
refactor(iroh): Remove flume from iroh gossip (#2542)
Browse files Browse the repository at this point in the history
## Description

refactor(iroh): Remove flume from iroh gossip

Yes, I know there is a PR that touches gossip. But just let me do my
purge.

## Breaking Changes

None

## Notes & open questions

None

## Change checklist

- [x] Self-review.
- [x] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
rklaehn authored Jul 24, 2024
1 parent 22314a1 commit 2964569
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions iroh-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ tokio-util = { version = "0.7.8", optional = true, features = ["codec"] }
genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] }

# dispatcher dependencies (optional)
async-channel = { version = "2.3.1", optional = true }
futures-util = { version = "0.3.30", optional = true }
flume = { version = "0.11", optional = true }

[dev-dependencies]
clap = { version = "4", features = ["derive"] }
Expand All @@ -51,7 +51,7 @@ url = "2.4.0"
[features]
default = ["net", "dispatcher"]
net = ["dep:futures-lite", "dep:iroh-net", "dep:tokio", "dep:tokio-util"]
dispatcher = ["dep:flume", "dep:futures-util"]
dispatcher = ["dep:async-channel", "dep:futures-util"]

[[example]]
name = "chat"
Expand Down
15 changes: 7 additions & 8 deletions iroh-gossip/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
proto::{DeliveryScope, TopicId},
};
use bytes::Bytes;
use futures_lite::StreamExt;
use futures_util::Stream;
use iroh_base::rpc::{RpcError, RpcResult};
use iroh_net::{key::PublicKey, util::AbortingJoinHandle, NodeId};
Expand Down Expand Up @@ -106,7 +107,7 @@ struct State {
/// Type alias for a stream of gossip updates, so we don't have to repeat all the bounds.
type CommandStream = Box<dyn Stream<Item = Command> + Send + Sync + Unpin + 'static>;
/// Type alias for a sink of gossip events.
type EventSink = flume::Sender<RpcResult<Event>>;
type EventSink = async_channel::Sender<RpcResult<Event>>;

#[derive(derive_more::Debug)]
enum TopicState {
Expand Down Expand Up @@ -214,7 +215,7 @@ impl GossipDispatcher {
/// This will not wait until the sink is full, but send a `Lagged` response if the sink is almost full.
fn try_send(send: &EventSink, event: &IrohGossipEvent) -> bool {
// If the stream is disconnected, we don't need to send to it.
if send.is_disconnected() {
if send.is_closed() {
return false;
}
// Check if the send buffer is almost full, and send a lagged response if it is.
Expand All @@ -234,7 +235,6 @@ impl GossipDispatcher {
///
/// This should not fail unless the gossip instance is faulty.
async fn dispatch_loop(mut self) -> anyhow::Result<()> {
use futures_lite::stream::StreamExt;
let stream = self.gossip.clone().subscribe_all();
tokio::pin!(stream);
while let Some(item) = stream.next().await {
Expand Down Expand Up @@ -306,7 +306,6 @@ impl GossipDispatcher {
topic: TopicId,
mut updates: CommandStream,
) -> anyhow::Result<()> {
use futures_lite::stream::StreamExt;
while let Some(update) = Pin::new(&mut updates).next().await {
match update {
Command::Broadcast(msg) => {
Expand Down Expand Up @@ -404,7 +403,7 @@ impl GossipDispatcher {
let mut update_tasks = vec![];
for (updates, event_sink) in waiting {
// if the stream is disconnected, we don't need to keep it and start the update task
if event_sink.is_disconnected() {
if event_sink.is_closed() {
continue;
}
event_sinks.push(event_sink);
Expand Down Expand Up @@ -438,9 +437,9 @@ impl GossipDispatcher {
topic: TopicId,
options: SubscribeOptions,
updates: CommandStream,
) -> impl Stream<Item = RpcResult<Event>> {
) -> impl Stream<Item = RpcResult<Event>> + Unpin {
let mut inner = self.inner.lock().unwrap();
let (send, recv) = flume::bounded(options.subscription_capacity);
let (send, recv) = async_channel::bounded(options.subscription_capacity);
match inner.current_subscriptions.entry(topic) {
Entry::Vacant(entry) => {
// There is no existing subscription, so we need to start a new one.
Expand Down Expand Up @@ -490,7 +489,7 @@ impl GossipDispatcher {
}
}
}
recv.into_stream()
recv.boxed()
}
}

Expand Down

0 comments on commit 2964569

Please sign in to comment.