Skip to content

Commit

Permalink
removed flume
Browse files Browse the repository at this point in the history
  • Loading branch information
andrieshiemstra committed Mar 5, 2024
1 parent b601bac commit 482d19c
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 15 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.7.3

* removed flume again, see if std mpsc works better

# 0.7.2

* use flume for channels
Expand Down
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hirofa_utils"
version = "0.7.2"
version = "0.7.3"
authors = ["HiRoFa <[email protected]>"]
edition = "2018"
description = "Utils project which is depended on by several other projects"
Expand All @@ -12,7 +12,6 @@ repository = "https://github.com/HiRoFa/utils"
[dependencies]
lazy_static = "1.4.0"
log = "0.4"
simple-logging = "2"
thread-id = "4"
num_cpus = "1"
linked-hash-map = "0.5"
Expand All @@ -22,7 +21,6 @@ string_cache = "0.8"
serde = "1"
serde_json = "1"
rand = "0.8"
flume = "0.10"
parking_lot = "0.12"
anyhow = "1"

Expand All @@ -33,6 +31,7 @@ features = ["precommit-hook", "run-cargo-test", "run-cargo-clippy"]

[dev-dependencies]
criterion = "0.5"
simple-logging = "2"

[[bench]]
name = "benchmarks"
Expand Down
11 changes: 6 additions & 5 deletions src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::future::Future;
use std::ops::Add;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};

Expand All @@ -21,7 +22,7 @@ fn next_id() -> usize {

/// the EventLoop struct is a single thread event queue
pub struct EventLoop {
tx: flume::Sender<Box<dyn FnOnce() + Send + 'static>>,
tx: Sender<Box<dyn FnOnce() + Send + 'static>>,
join_handle: Option<JoinHandle<()>>,
id: usize,
}
Expand Down Expand Up @@ -50,7 +51,7 @@ impl EventLoop {
/// init a new EventLoop
pub fn new() -> Self {
// todo settable buffer size
let (tx, rx) = flume::bounded(256);
let (tx, rx) = channel();

let id = next_id();

Expand All @@ -74,7 +75,7 @@ impl EventLoop {
let mut next_deadline = Instant::now().add(Duration::from_secs(10));
loop {
// recv may fail on timeout
let recv_res = rx.recv_deadline(next_deadline);
let recv_res = rx.recv_timeout(next_deadline.duration_since(Instant::now()));
if recv_res.is_ok() {
let fut: Box<dyn FnOnce() + Send + 'static> = recv_res.ok().unwrap();
// this seems redundant.. i could just run the task closure
Expand Down Expand Up @@ -226,7 +227,7 @@ impl EventLoop {
if Self::is_my_pool_thread(self) {
task()
} else {
let (tx, rx) = flume::bounded::<R>(1);
let (tx, rx) = channel();
self.add_void(move || tx.send(task()).expect("could not send"));
rx.recv().expect("could not recv")
}
Expand All @@ -249,7 +250,7 @@ impl EventLoop {
&self,
fut: F,
) -> impl Future<Output = R> {
let (tx, rx) = flume::bounded(1);
let (tx, rx) = channel();
self.add_void(move || {
let res_fut = Self::add_local_future(fut);
tx.send(res_fut).expect("send failed");
Expand Down
28 changes: 21 additions & 7 deletions src/resolvable_future.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::debug_mutex::DebugMutex;
use flume::{Receiver, SendError, Sender};
use futures::task::{Context, Poll, Waker};
use std::future::Future;
use std::pin::Pin;
use std::sync::mpsc::{channel, Receiver, SendError, Sender};
use std::sync::Arc;

pub struct ResolvableFutureResolver<R> {
Expand All @@ -20,13 +20,27 @@ impl<R> ResolvableFutureResolver<R> {
pub fn resolve(&self, resolution: R) -> Result<(), SendError<R>> {
log::trace!("ResolvableFutureResolver.resolve");
let waker_opt = &mut *self.waker.lock("resolve").unwrap();
self.sender.send(resolution)?;

if let Some(waker) = waker_opt.take() {
log::trace!("ResolvableFutureResolver.resolve has waker, waking");
waker.wake();
match self.sender.send(resolution) {
Ok(_) => {
if let Some(waker) = waker_opt.take() {
log::trace!("ResolvableFutureResolver.resolve has waker, waking");
waker.wake();
}
Ok(())
}
Err(se) => {
if let Some(waker) = waker_opt.take() {
log::error!(
"ResolvableFutureResolver::could not send response ({:?}), had waker so waking", se
);
waker.wake();
} else {
log::error!("ResolvableFutureResolver::could not send response ({:?}), had no waker so was possibly already resolved", se);
}
Err(se)
}
}
Ok(())
}
}

Expand All @@ -36,7 +50,7 @@ pub struct ResolvableFuture<R> {
}
impl<R> ResolvableFuture<R> {
pub fn new() -> Self {
let (tx, rx) = flume::bounded(1);
let (tx, rx) = channel();

Self {
result: rx,
Expand Down

0 comments on commit 482d19c

Please sign in to comment.