Skip to content

Commit

Permalink
feat(rust): improve ockam udp transport implementation robustness
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Dec 9, 2024
1 parent 675ceea commit 9cad546
Show file tree
Hide file tree
Showing 10 changed files with 445 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use core::fmt::{Debug, Formatter};
use minicbor::{CborLen, Decode, Encode};
use rand::random;
use std::cmp::Ordering;
use std::ops::{Add, AddAssign, Sub};

/// Number of the [`UdpRoutingMessage`]. Each [`UdpRoutingMessage`] is assigned a value, which
/// helps the receiver to assemble the message. Start with a random value and uses overflowing
Expand All @@ -22,7 +23,7 @@ impl RoutingNumber {
}

pub fn increment(&mut self) {
self.0 = self.0.overflowing_add(1).0;
*self += 1;
}
}

Expand Down Expand Up @@ -57,6 +58,30 @@ impl Ord for RoutingNumber {
}
}

impl AddAssign<u16> for RoutingNumber {
fn add_assign(&mut self, rhs: u16) {
self.0 = self.0.overflowing_add(rhs).0;
}
}

impl Add<u16> for RoutingNumber {
type Output = u16;

fn add(self, rhs: u16) -> Self::Output {
let mut s = self;
s += rhs;
s.0
}
}

impl Sub<Self> for RoutingNumber {
type Output = u16;

fn sub(self, rhs: Self) -> Self::Output {
self.0.overflowing_sub(rhs.0).0
}
}

#[cfg(test)]
mod tests {
use crate::messages::RoutingNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ pub struct Version(#[n(0)] pub u8);
pub struct UdpTransportMessage<'a> {
#[n(0)] pub version: Version,
#[n(1)] pub routing_number: RoutingNumber,
#[n(2)] pub offset: u32,
#[n(3)] pub is_last: bool,
#[n(2)] pub offset: u16,
#[n(3)] pub total: u16,
#[b(4)] pub payload: CowBytes<'a>,
}

Expand All @@ -39,15 +39,15 @@ impl<'a> UdpTransportMessage<'a> {
pub fn new(
version: Version,
routing_number: RoutingNumber,
offset: u32,
is_last: bool,
offset: u16,
total: u16,
payload: impl Into<CowBytes<'a>>,
) -> Self {
Self {
version,
routing_number,
offset,
is_last,
total,
payload: payload.into(),
}
}
Expand All @@ -64,12 +64,12 @@ mod tests {
let msg = UdpTransportMessage::new(
Version(u8::MAX),
RoutingNumber(u16::MAX),
u32::MAX,
true,
u16::MAX,
u16::MAX,
vec![0u8; MAX_PAYLOAD_SIZE],
);

let len = minicbor::to_vec(msg).unwrap().len();
let len = ockam_core::cbor_encode_preallocate(msg).unwrap().len();

assert!(len <= MAX_ON_THE_WIRE_SIZE);
}
Expand All @@ -79,12 +79,12 @@ mod tests {
let msg = UdpTransportMessage::new(
Version(u8::MAX),
RoutingNumber(u16::MAX),
u32::MAX,
true,
u16::MAX,
u16::MAX,
vec![0u8; MAX_PAYLOAD_SIZE],
);

let len = minicbor::to_vec(msg).unwrap().len();
let len = ockam_core::cbor_encode_preallocate(msg).unwrap().len();

assert_eq!(len, MAX_ON_THE_WIRE_SIZE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ pub(crate) use addresses::*;
pub(crate) use receiver::*;
pub(crate) use sender::*;
pub(crate) use socket_split::*;

mod pending_messages;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
mod peer_pending_routing_message_storage;
mod pending_message;
mod pending_routing_message_storage;
mod transport_messages_iterator;

pub(crate) use peer_pending_routing_message_storage::*;
pub(crate) use pending_message::*;
pub(crate) use pending_routing_message_storage::*;
pub(crate) use transport_messages_iterator::*;
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use crate::messages::{RoutingNumber, UdpRoutingMessage, UdpTransportMessage};
use crate::workers::pending_messages::{PendingMessage, PendingMessageState};
use core::cmp::min;
use ockam_core::compat::collections::VecDeque;
use tracing::{error, trace};

const MAX_PENDING_MESSAGES_U16: u16 = 5;
const MAX_PENDING_MESSAGES_USIZE: usize = MAX_PENDING_MESSAGES_U16 as usize;

/// Pending routing messages for a certain peer
/// Currently, we only support messages with the right order, which means:
/// 1. If a newer routing message is received - the old one is dropped if it wasn't fully assembled
/// 2. If a part of a routing message has arrived out of order - the message is fully dropped
pub(crate) struct PeerPendingRoutingMessageStorage {
// Reusable buffers to avoid excess allocations
buffer_queue: VecDeque<Vec<u8>>,
// Oldest routing number we can accept
oldest_routing_number: RoutingNumber,
// Messages with following routing numbers:
// [self.oldest_routing_number, ..., self.oldest_routing_number + MAX_PENDING_MESSAGES_USIZE - 1]
pending_messages: [PendingMessageState; MAX_PENDING_MESSAGES_USIZE],
}

impl PeerPendingRoutingMessageStorage {
// Create given the first received message
pub(crate) fn new(routing_number: RoutingNumber) -> Self {
Self {
buffer_queue: Default::default(),
oldest_routing_number: routing_number,
pending_messages: Default::default(),
}
}

pub(crate) fn add_transport_message_and_try_assemble(
&mut self,
transport_message: UdpTransportMessage<'_>,
) -> Option<UdpRoutingMessage<'static>> {
trace!(
"Received routing message {}, offset {}",
transport_message.routing_number,
transport_message.offset
);

// self.oldest_routing_number is the oldest message we can accept,
// older than that are ignored
if transport_message.routing_number < self.oldest_routing_number {
trace!(
"Dropping routing message: {} because it arrived late. Offset {}",
transport_message.routing_number,
transport_message.offset
);

return None;
}

// We received a newer message
let diff = transport_message.routing_number - self.oldest_routing_number;

// Move self.pending_messages if needed and update the diff
let diff = if diff >= MAX_PENDING_MESSAGES_U16 {
// We received a much newer message, we need to drop one or few older messages so that
// this message fits into our self.pending_messages

// Length of the shift we need to perform on our self.pending_messages array
let shift = diff - MAX_PENDING_MESSAGES_U16 + 1;

// Drop the messages that don't fit anymore
let number_of_messages_to_drop = min(shift, MAX_PENDING_MESSAGES_U16) as usize;

for i in 0..number_of_messages_to_drop {
match self.pending_messages[i].take() {
PendingMessageState::NotReceived => {
trace!(
"Discarding old not received routing message {} because a new routing message has arrived: {}",
self.oldest_routing_number + (i as u16),
transport_message.routing_number
);
}
PendingMessageState::InProgress(pending_message) => {
trace!(
"Discarding old partially received routing message {} because a new routing message has arrived: {}",
self.oldest_routing_number + (i as u16),
transport_message.routing_number
);

// Put the buffer back to reuse in the future
let buffer = pending_message.drop_message();
self.buffer_queue.push_back(buffer);
}
PendingMessageState::FullyHandled => {}
}
}

// If we didn't drop all the messages, move the rest to the left
if shift < MAX_PENDING_MESSAGES_U16 {
let number_of_messages_to_shift = (MAX_PENDING_MESSAGES_U16 - shift) as usize;
for i in 0..number_of_messages_to_shift {
self.pending_messages[i] = self.pending_messages[i + shift as usize].take();
}
}

self.oldest_routing_number += shift;

(diff - shift) as usize
} else {
diff as usize
};

let pending_message_state = self.pending_messages[diff].take();

let mut pending_message = match pending_message_state {
PendingMessageState::NotReceived => {
let buffer = self.buffer_queue.pop_front().unwrap_or_default();

PendingMessage::new(
transport_message.routing_number,
transport_message.total,
buffer,
)
}
PendingMessageState::InProgress(m) => m,
PendingMessageState::FullyHandled => {
// Already send out.
return None;
}
};

match pending_message.add_transport_message_and_try_assemble(transport_message) {
Some(routing_message_binary) => {
let res = match minicbor::decode::<UdpRoutingMessage>(&routing_message_binary) {
Ok(routing_message) => Some(routing_message.into_owned()),
Err(err) => {
error!("Error while decoding UDP message {}", err);
None
}
};

self.pending_messages[diff] = PendingMessageState::FullyHandled;

res
}
None => {
self.pending_messages[diff] = PendingMessageState::InProgress(pending_message);
None
}
}
}
}
Loading

0 comments on commit 9cad546

Please sign in to comment.