Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement non-linear wait before drop(close) #1603

Merged
merged 8 commits into from
Nov 28, 2024
2 changes: 2 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,8 @@
drop: {
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available.
wait_before_drop: 1000,
/// The maximum deadline limit for multi-fragment messages.
max_wait_before_drop_fragments: 50000,
},
/// Behavior pushing CongestionControl::Block messages to the queue.
block: {
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ impl Default for CongestionControlDropConf {
fn default() -> Self {
Self {
wait_before_drop: 1000,
max_wait_before_drop_fragments: 50000,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ validated_struct::validator! {
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message
/// if still no batch is available.
wait_before_drop: i64,
/// The maximum deadline limit for multi-fragment messages.
max_wait_before_drop_fragments: i64,
Mallets marked this conversation as resolved.
Show resolved Hide resolved
},
/// Behavior pushing CongestionControl::Block messages to the queue.
pub block: CongestionControlBlockConf {
Expand Down
82 changes: 56 additions & 26 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,45 +128,76 @@ impl StageInMutex {
}
}

#[derive(Debug)]
struct WaitTime {
wait_time: Duration,
max_wait_time: Option<Duration>,
}

impl WaitTime {
fn new(wait_time: Duration, max_wait_time: Option<Duration>) -> Self {
Self {
wait_time,
max_wait_time,
}
}

fn advance(&mut self, instant: &mut Instant) {
match &mut self.max_wait_time {
Some(max_wait_time) => {
if let Some(new_max_wait_time) = max_wait_time.checked_sub(self.wait_time) {
*instant += self.wait_time;
*max_wait_time = new_max_wait_time;
self.wait_time *= 2;
}
}
None => {
*instant += self.wait_time;
}
}
}

fn wait_time(&self) -> Duration {
self.wait_time
}
}

#[derive(Clone)]
enum DeadlineSetting {
Immediate,
Infinite,
Finite(Instant),
}

struct LazyDeadline {
deadline: Option<DeadlineSetting>,
wait_time: Option<Duration>,
wait_time: WaitTime,
}

impl LazyDeadline {
fn new(wait_time: Option<Duration>) -> Self {
fn new(wait_time: WaitTime) -> Self {
Self {
deadline: None,
wait_time,
}
}

fn advance(&mut self) {
let wait_time = self.wait_time;
match &mut self.deadline() {
match self.deadline().to_owned() {
DeadlineSetting::Immediate => {}
DeadlineSetting::Infinite => {}
DeadlineSetting::Finite(instant) => {
*instant = instant.add(unsafe { wait_time.unwrap_unchecked() });
DeadlineSetting::Finite(mut instant) => {
self.wait_time.advance(&mut instant);
self.deadline = Some(DeadlineSetting::Finite(instant));
}
}
}

#[inline]
fn deadline(&mut self) -> &mut DeadlineSetting {
self.deadline.get_or_insert_with(|| match self.wait_time {
Some(wait_time) => match wait_time.is_zero() {
true => DeadlineSetting::Immediate,
false => DeadlineSetting::Finite(Instant::now().add(wait_time)),
},
None => DeadlineSetting::Infinite,
})
self.deadline
.get_or_insert_with(|| match self.wait_time.wait_time() {
Duration::ZERO => DeadlineSetting::Immediate,
nonzero_wait_time => DeadlineSetting::Finite(Instant::now().add(nonzero_wait_time)),
})
}
}

Expand All @@ -175,17 +206,16 @@ struct Deadline {
}

impl Deadline {
fn new(wait_time: Option<Duration>) -> Self {
fn new(wait_time: Duration, max_wait_time: Option<Duration>) -> Self {
Self {
lazy_deadline: LazyDeadline::new(wait_time),
lazy_deadline: LazyDeadline::new(WaitTime::new(wait_time, max_wait_time)),
}
}

#[inline]
fn wait(&mut self, s_ref: &StageInRefill) -> bool {
match self.lazy_deadline.deadline() {
DeadlineSetting::Immediate => false,
DeadlineSetting::Infinite => s_ref.wait(),
DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant),
}
}
Expand Down Expand Up @@ -577,7 +607,7 @@ impl StageOut {
pub(crate) struct TransmissionPipelineConf {
pub(crate) batch: BatchConfig,
pub(crate) queue_size: [usize; Priority::NUM],
pub(crate) wait_before_drop: Duration,
pub(crate) wait_before_drop: (Duration, Duration),
pub(crate) wait_before_close: Duration,
pub(crate) batching_enabled: bool,
pub(crate) batching_time_limit: Duration,
Expand Down Expand Up @@ -680,7 +710,7 @@ pub(crate) struct TransmissionPipelineProducer {
// Each priority queue has its own Mutex
stage_in: Arc<[Mutex<StageIn>]>,
active: Arc<AtomicBool>,
wait_before_drop: Duration,
wait_before_drop: (Duration, Duration),
wait_before_close: Duration,
}

Expand All @@ -695,12 +725,12 @@ impl TransmissionPipelineProducer {
(0, Priority::DEFAULT)
};
// If message is droppable, compute a deadline after which the sample could be dropped
let wait_time = if msg.is_droppable() {
self.wait_before_drop
let (wait_time, max_wait_time) = if msg.is_droppable() {
(self.wait_before_drop.0, Some(self.wait_before_drop.1))
} else {
self.wait_before_close
(self.wait_before_close, None)
};
let mut deadline = Deadline::new(Some(wait_time));
let mut deadline = Deadline::new(wait_time, max_wait_time);
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
queue.push_network_message(&mut msg, priority, &mut deadline)
Expand Down Expand Up @@ -856,7 +886,7 @@ mod tests {
},
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: Duration::from_millis(1),
wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)),
wait_before_close: Duration::from_secs(5),
batching_time_limit: Duration::from_micros(1),
};
Expand All @@ -870,7 +900,7 @@ mod tests {
},
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: Duration::from_millis(1),
wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)),
wait_before_close: Duration::from_secs(5),
batching_time_limit: Duration::from_micros(1),
};
Expand Down
38 changes: 16 additions & 22 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub struct TransportManagerConfig {
pub resolution: Resolution,
pub batch_size: BatchSize,
pub batching: bool,
pub wait_before_drop: Duration,
pub wait_before_drop: (Duration, Duration),
pub wait_before_close: Duration,
pub queue_size: [usize; Priority::NUM],
pub queue_backoff: Duration,
Expand Down Expand Up @@ -141,7 +141,7 @@ pub struct TransportManagerBuilder {
batch_size: BatchSize,
batching_enabled: bool,
batching_time_limit: Duration,
wait_before_drop: Duration,
wait_before_drop: (Duration, Duration),
wait_before_close: Duration,
queue_size: QueueSizeConf,
defrag_buff_size: usize,
Expand Down Expand Up @@ -192,7 +192,7 @@ impl TransportManagerBuilder {
self
}

pub fn wait_before_drop(mut self, wait_before_drop: Duration) -> Self {
pub fn wait_before_drop(mut self, wait_before_drop: (Duration, Duration)) -> Self {
self.wait_before_drop = wait_before_drop;
self
}
Expand Down Expand Up @@ -249,6 +249,8 @@ impl TransportManagerBuilder {
}

let link = config.transport().link();
let cc_drop = link.tx().queue().congestion_control().drop();
let cc_block = link.tx().queue().congestion_control().block();
let mut resolution = Resolution::default();
resolution.set(Field::FrameSN, *link.tx().sequence_number_resolution());
self = self.resolution(resolution);
Expand All @@ -259,22 +261,11 @@ impl TransportManagerBuilder {
));
self = self.defrag_buff_size(*link.rx().max_message_size());
self = self.link_rx_buffer_size(*link.rx().buffer_size());
self = self.wait_before_drop(duration_from_i64us(
*link
.tx()
.queue()
.congestion_control()
.drop()
.wait_before_drop(),
));
self = self.wait_before_close(duration_from_i64us(
*link
.tx()
.queue()
.congestion_control()
.block()
.wait_before_close(),
self = self.wait_before_drop((
duration_from_i64us(*cc_drop.wait_before_drop()),
duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()),
));
self = self.wait_before_close(duration_from_i64us(*cc_block.wait_before_close()));
self = self.queue_size(link.tx().queue().size().clone());
self = self.tx_threads(*link.tx().threads());
self = self.protocols(link.protocols().clone());
Expand Down Expand Up @@ -372,17 +363,20 @@ impl Default for TransportManagerBuilder {
let link_rx = LinkRxConf::default();
let queue = QueueConf::default();
let backoff = *queue.batching().time_limit();
let wait_before_drop = *queue.congestion_control().drop().wait_before_drop();
let wait_before_close = *queue.congestion_control().block().wait_before_close();
let cc_drop = queue.congestion_control().drop();
let cc_block = queue.congestion_control().block();
Self {
version: VERSION,
zid: ZenohIdProto::rand(),
whatami: zenoh_config::defaults::mode,
resolution: Resolution::default(),
batch_size: BatchSize::MAX,
batching_enabled: true,
wait_before_drop: duration_from_i64us(wait_before_drop),
wait_before_close: duration_from_i64us(wait_before_close),
wait_before_drop: (
duration_from_i64us(*cc_drop.wait_before_drop()),
duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()),
),
wait_before_close: duration_from_i64us(*cc_block.wait_before_close()),
queue_size: queue.size,
batching_time_limit: Duration::from_millis(backoff),
defrag_buff_size: *link_rx.max_message_size(),
Expand Down