Skip to content

Commit

Permalink
Add deficit counter in TransmissionPipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry73204 committed Jan 17, 2024
1 parent ce565e8 commit df60642
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 48 deletions.
2 changes: 1 addition & 1 deletion examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use clap::Parser;
use std::io::{stdin, Read};
use std::time::{Duration, Instant};
use std::time::Instant;
use zenoh::config::Config;
use zenoh::prelude::sync::*;
use zenoh_examples::CommonArgs;
Expand Down
165 changes: 165 additions & 0 deletions io/zenoh-transport/src/common/drr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use itertools::{chain, Itertools};

pub struct DRR {
curr_slot: usize,
consumed: bool,
iter: Box<dyn Iterator<Item = usize> + Sync + Send>,
quotas: Vec<usize>,
weights: Vec<usize>,
}

impl DRR {
pub fn new(weights: &[usize]) -> Self {
let weights: Vec<_> = weights.to_vec();
let w_min = weights
.iter()
.cloned()
.min()
.expect("weights must not be empty");
assert!(w_min > 0, "weights must be nonzero");

let mut quotas = vec![0; weights.len()];

let mut indices: Vec<usize> = (0..weights.len()).collect();
indices.sort_unstable_by_key(|&idx| (weights[idx], idx));

let sorted_weights: Vec<_> = indices.iter().map(|&idx| weights[idx]).collect();
indices.reverse();

let iter = chain!([0], sorted_weights)
.tuple_windows()
.zip((1..=weights.len()).rev())
.map(|((prev, next), slots)| {
let rounds = next - prev;
(slots, rounds)
})
.filter(|&(_, rounds)| rounds > 0)
.flat_map(move |(slots, rounds)| {
let mut selected: Vec<_> = indices[0..slots].to_vec();
selected.sort_unstable();
selected.into_iter().cycle().take(rounds * slots)
})
.cycle();
let mut iter: Box<dyn Iterator<Item = usize> + Sync + Send> = Box::new(iter);

let curr_slot = iter.next().unwrap();
quotas[curr_slot] += weights[curr_slot];

Self {
iter,
quotas,
weights,
curr_slot,
consumed: false,
}
}

pub fn next(&mut self) -> Next<'_> {
if self.quotas[self.curr_slot] == 0 {
self.step();
}

Next { drr: Some(self) }
}

fn step(&mut self) {
self.curr_slot = self.iter.next().unwrap();
self.quotas[self.curr_slot] += self.weights[self.curr_slot];
self.consumed = false;
}
}

pub struct Next<'a> {
drr: Option<&'a mut DRR>,
}

impl<'a> Next<'a> {
#[must_use]
pub fn try_consume(mut self, amount: usize) -> bool {
let drr = self.drr.take().unwrap();
let quota = &mut drr.quotas[drr.curr_slot];

let Some(remain) = quota.checked_sub(amount) else {
drr.step();
return false;
};

drr.consumed = true;
*quota = remain;

true
}

pub fn give_up(mut self) {
let drr = self.drr.take().unwrap();

if drr.consumed {
let quota = &mut drr.quotas[drr.curr_slot];
*quota = 0;
}

drr.step();
}

pub fn slot(&self) -> usize {
self.drr.as_ref().unwrap().curr_slot
}
}

impl<'a> Drop for Next<'a> {
fn drop(&mut self) {
let Some(drr) = self.drr.take() else {
return;
};

if drr.consumed {
let quota = &mut drr.quotas[drr.curr_slot];
*quota = 0;
}

drr.step();
}
}

#[cfg(test)]
mod tests {
use super::DRR;

#[test]
fn drr_test() {
let mut drr = DRR::new(&[1, 1, 1]);

macro_rules! check_consume {
($slot:expr, $amount:expr, $result:expr) => {{
let next = drr.next();
assert_eq!(next.slot(), $slot);
assert_eq!(next.try_consume($amount), $result);
}};
}

macro_rules! check_giveup {
($slot:expr, $amount:expr) => {{
let next = drr.next();
assert_eq!(next.slot(), $slot);
next.give_up();
}};
}

check_consume!(0, 1, true);
check_consume!(1, 1, true);
check_consume!(2, 1, true);

check_consume!(0, 1, true);
check_consume!(1, 1, true);
check_consume!(2, 1, true);

check_giveup!(0, 1);
check_consume!(1, 1, true);
check_consume!(2, 1, true);

check_consume!(0, 1, true);
check_consume!(0, 1, true);
check_consume!(1, 1, true);
check_consume!(2, 1, true);
}
}
1 change: 1 addition & 0 deletions io/zenoh-transport/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//
pub mod batch;
pub(crate) mod defragmentation;
mod drr;
pub(crate) mod pipeline;
pub(crate) mod priority;
pub(crate) mod seq_num;
Expand Down
68 changes: 21 additions & 47 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,16 @@ use crate::common::batch::BatchConfig;
//
use super::{
batch::{Encode, WBatch},
drr::DRR,
priority::{TransportChannelTx, TransportPriorityTx},
};
use async_std::prelude::FutureExt;
use flume::{bounded, Receiver, Sender};
use itertools::{chain, Itertools};
use ringbuffer_spsc::{RingBuffer, RingBufferReader, RingBufferWriter};
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread;
use std::time::Duration;
use std::{
iter,
sync::{Arc, Mutex, MutexGuard},
};
use std::{
iter::Peekable,
sync::atomic::{AtomicBool, AtomicU16, Ordering},
};
use zenoh_buffers::{
reader::{HasReader, Reader},
writer::HasWriter,
Expand Down Expand Up @@ -580,9 +574,10 @@ impl TransmissionPipeline {
}

let active = Arc::new(AtomicBool::new(true));
let prio_iter: Box<dyn Iterator<Item = Vec<usize>> + Send + Sync> = {

let drr = {
let weights = vec![1; stage_out.len()];
Box::new(iwrr_vec_iter(&weights))
DRR::new(&weights)
};

let producer = TransmissionPipelineProducer {
Expand All @@ -593,7 +588,7 @@ impl TransmissionPipeline {
stage_out: stage_out.into_boxed_slice(),
n_out_r,
active,
prio_iter: prio_iter.peekable(),
drr,
};

(producer, consumer)
Expand Down Expand Up @@ -655,29 +650,38 @@ pub(crate) struct TransmissionPipelineConsumer {
stage_out: Box<[StageOut]>,
n_out_r: Receiver<()>,
active: Arc<AtomicBool>,
prio_iter: Peekable<Box<dyn Iterator<Item = Vec<usize>> + Send + Sync>>,
drr: DRR,
}

impl TransmissionPipelineConsumer {
pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> {
while self.active.load(Ordering::Relaxed) {
let prios = self.prio_iter.next().unwrap();
const ATTEMPTS: usize = 8;

while self.active.load(Ordering::Relaxed) {
// Calculate the backoff maximum
let mut bo = NanoSeconds::MAX;

for prio in prios {
for _ in 0..ATTEMPTS {
let next = self.drr.next();
let prio = next.slot();

let queue = &mut self.stage_out[prio];

match queue.try_pull() {
Pull::Some(batch) => {
let ok = next.try_consume(1);
debug_assert!(ok);
return Some((batch, prio));
}
Pull::Backoff(b) => {
if b < bo {
bo = b;
}
next.give_up();
}
Pull::None => {
next.give_up();
}
Pull::None => {}
}
}

Expand Down Expand Up @@ -722,36 +726,6 @@ impl TransmissionPipelineConsumer {
}
}

fn iwrr_vec_iter(weights: &[usize]) -> impl Iterator<Item = Vec<usize>> + Send + Sync {
let w_min = weights
.iter()
.cloned()
.min()
.expect("weights must not be empty");
assert!(w_min > 0, "weights must be nonzero");

let mut indices: Vec<usize> = (0..weights.len()).collect();
indices.sort_unstable_by_key(|&idx| (weights[idx], idx));

let sorted_weights: Vec<_> = indices.iter().map(|&idx| weights[idx]).collect();
indices.reverse();

chain!([0], sorted_weights)
.tuple_windows()
.zip((1..=weights.len()).rev())
.map(|((prev, next), slots)| {
let rounds = next - prev;
(slots, rounds)
})
.filter(|&(_, rounds)| rounds > 0)
.flat_map(move |(slots, rounds)| {
let mut selected: Vec<_> = indices[0..slots].to_vec();
selected.sort_unstable();
iter::repeat(selected).take(rounds)
})
.cycle()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit df60642

Please sign in to comment.