Deadqueue is a dead simple async queue with back pressure support.
This crate provides three implementations:
-
Unlimited (
deadqueue::unlimited::Queue
)- Based on
crossbeam_queue::SegQueue
- Has unlimitied capacity and no back pressure on push
- Enabled via the
unlimited
feature in yourCargo.toml
- Based on
-
Resizable (
deadqueue::resizable::Queue
)- Based on
deadqueue::unlimited::Queue
- Has limited capacity with back pressure on push
- Supports resizing
- Enabled via the
resizable
feature in yourCargo.toml
- Based on
-
Limited (
deadqueue::limited::Queue
)- Based on
crossbeam_queue::ArrayQueue
- Has limit capacity with back pressure on push
- Does not support resizing
- Enabled via the
limited
feature in yourCargo.toml
- Based on
Feature | Description | Extra dependencies | Default |
---|---|---|---|
unlimited |
Enable unlimited queue implementation | – | yes |
resizable |
Enable resizable queue implementation | deadqueue/unlimited |
yes |
limited |
Enable limited queue implementation | – | yes |
use std::sync::Arc;
use tokio::time::{sleep, Duration};
const TASK_COUNT: usize = 1000;
const WORKER_COUNT: usize = 10;
type TaskQueue = deadqueue::limited::Queue<usize>;
#[tokio::main]
async fn main() {
let queue = Arc::new(TaskQueue::new(TASK_COUNT));
for i in 0..TASK_COUNT {
queue.try_push(i).unwrap();
}
for worker in 0..WORKER_COUNT {
let queue = queue.clone();
tokio::spawn(async move {
loop {
let task = queue.pop().await;
println!("worker[{}] processing task[{}] ...", worker, task);
}
});
}
while queue.len() > 0 {
println!("Waiting for workers to finish...");
sleep(Duration::from_millis(100)).await;
}
println!("All tasks done. :-)");
}
Deadqueue is by no means the only queue implementation available. It does things a little different and provides features that other implementations are lacking:
-
Resizable queue. Usually you have to pick between
limited
andunlimited
queues. This crate features aresizable
Queue which can be resized as needed. This is probably a big unique selling point of this crate. -
Introspection support. The methods
.len()
,.capacity()
and.available()
provide access the current state of the queue. -
Fair scheduling. Tasks calling
pop
will receive items in a first-come-first-serve fashion. This is mainly due to the use oftokio::sync::Semaphore
which is fair by nature. -
One struct, not two. The channels of
tokio
,async_std
andfutures-intrusive
split the queue in two structs (Sender
andReceiver
) which makes the usage sligthly more complicated. -
Bring your own
Arc
. Since there is no separation betweenSender
andReceiver
there is also no need for an internalArc
. (All implementations that split the channel into aSender
andReceiver
need some kind ofArc
internally.) -
Fully concurrent access. No need to wrap the
Receiver
part in aMutex
. All methods support concurrent accesswithout the need for an additional synchronization primitive. -
Support for
try__
methods. The methodstry_push
andtry_pop
can be used to access the queue from non-blocking synchroneous code.
Crate | Limitations | Documentation |
---|---|---|
tokio |
No resizable queue. No introspection support. Synchronization of Receiver needed. |
tokio::sync::mpsc::channel , tokio::sync::mpsc::unbounded_channel |
async-std |
No resizable or unlimited queue. No introspection support. No try_send or try_recv methods. |
async_std::sync::channel |
futures |
No resizable queue. No introspection support. | futures::channel::mpsc::channel , futures::channel::mpsc::unbounded |
Licensed under either of
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.