Skip to content

Commit

Permalink
Implemented the ordering module for laminar. (#150)
Browse files Browse the repository at this point in the history
Some changes were done to accomplish that:
- Refactoring Headers.
- Removing Channels.
- Introducing some types for processing of packets.
- Added unit tests.
- Refactoring Packet processing.
  • Loading branch information
TimonPost authored Mar 30, 2019
1 parent 28ef8ee commit 1363ffe
Show file tree
Hide file tree
Showing 39 changed files with 2,042 additions and 1,050 deletions.
8 changes: 2 additions & 6 deletions src/bin/laminar-tester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use clap::{load_yaml, App, AppSettings};
use crossbeam_channel::Sender;
use laminar::{Config, DeliveryMethod, Packet, Result, Socket, SocketEvent, ThroughputMonitoring};
use laminar::{Config, Packet, Result, Socket, SocketEvent, ThroughputMonitoring};
use log::{debug, error, info};

fn main() {
Expand Down Expand Up @@ -167,11 +167,7 @@ fn run_client(config: ClientConfiguration) -> Result<()> {
fn test_steady_stream(sender: &Sender<Packet>, config: ClientConfiguration) {
info!("Beginning steady-state test");

let test_packet = Packet::new(
config.listen_host,
config.test_name.into_bytes().into_boxed_slice(),
DeliveryMethod::ReliableUnordered,
);
let test_packet = Packet::reliable_unordered(config.listen_host, config.test_name.into_bytes());

let time_quantum = 1000 / config.packet_ps as u64;
let start_time = Instant::now();
Expand Down
16 changes: 15 additions & 1 deletion src/error/network_error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::{FragmentErrorKind, PacketErrorKind};

use crate::SocketEvent;
use crossbeam_channel::SendError;
use std::fmt::{self, Display, Formatter};
use std::io;

Expand All @@ -26,8 +28,12 @@ pub enum ErrorKind {
ConnectionPoolError(String),
/// Error occurred when joining thread.
JoiningThreadFailed,
/// There was an unexpected error caused by an poisoned lock.
/// There was an unexpected error caused by a poisoned lock.
PoisonedLock(String),
/// Could not send on `SendChannel`.
SendError(SendError<SocketEvent>),
/// Expected header but could not be read from buffer.
CouldNotReadHeader(String),
}

impl Display for ErrorKind {
Expand All @@ -44,6 +50,8 @@ impl Display for ErrorKind {
ErrorKind::ConnectionPoolError(e) => { write!(fmt, "Something went wrong with connection timeout thread. Reason: {:?}", e) },
ErrorKind::JoiningThreadFailed => { write!(fmt, "Joining thread failed.") },
ErrorKind::PoisonedLock(e) => { write!(fmt, "There was an unexpected error caused by an poisoned lock. Reason: {:?}", e) },
ErrorKind::SendError(e) => { write!(fmt, "Could not sent on channel because it was closed. Reason: {:?}", e) },
ErrorKind::CouldNotReadHeader(header) => { write!(fmt, "Expected {} header but could not be read from buffer.", header) }
}
}
}
Expand All @@ -65,3 +73,9 @@ impl From<FragmentErrorKind> for ErrorKind {
ErrorKind::FragmentError(inner)
}
}

impl From<crossbeam_channel::SendError<SocketEvent>> for ErrorKind {
fn from(inner: SendError<SocketEvent>) -> Self {
ErrorKind::SendError(inner)
}
}
11 changes: 6 additions & 5 deletions src/infrastructure.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
mod channels;
mod delivery_method;
mod acknowlegement;
mod congestion;
mod fragmenter;

pub use self::channels::Channel;
pub use self::channels::{ReliableChannel, SequencedChannel, UnreliableChannel};
pub use self::delivery_method::DeliveryMethod;
pub mod arranging;

pub use self::acknowlegement::AcknowledgementHandler;
pub use self::congestion::CongestionHandler;
pub use self::fragmenter::Fragmentation;
50 changes: 50 additions & 0 deletions src/infrastructure/acknowlegement.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use crate::net::{ExternalAcks, LocalAckRecord};

/// Type responsible for handling the acknowledgement of packets.
pub struct AcknowledgementHandler {
waiting_packets: LocalAckRecord,
their_acks: ExternalAcks,
pub seq_num: u16,
pub dropped_packets: Vec<Box<[u8]>>,
}

impl AcknowledgementHandler {
/// Constructs a new `AcknowledgementHandler` with which you can perform acknowledgement operations.
pub fn new() -> AcknowledgementHandler {
AcknowledgementHandler {
seq_num: 0,
waiting_packets: Default::default(),
their_acks: Default::default(),
dropped_packets: Vec::new(),
}
}
}

impl AcknowledgementHandler {
/// Returns the bit mask that contains the packets who are acknowledged.
pub fn bit_mask(&self) -> u32 {
self.their_acks.field
}

/// Returns the last acknowledged sequence number by the other endpoint.
pub fn last_seq(&self) -> u16 {
self.their_acks.last_seq
}

/// Process the incoming sequence number.
///
/// - Acknowledge the incoming sequence number
/// - Update dropped packets
pub fn process_incoming(&mut self, incoming_seq: u16) {
self.their_acks.ack(incoming_seq);

let dropped_packets = self.waiting_packets.ack(incoming_seq, self.bit_mask());

self.dropped_packets = dropped_packets.into_iter().map(|(_, p)| p).collect();
}

/// Enqueue the outgoing packet for acknowledgement.
pub fn process_outgoing(&mut self, payload: &[u8]) {
self.waiting_packets.enqueue(self.seq_num, &payload);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@
//! The game developer can indicate on which stream he can order his packets and how he wants to arrange them.
//! For example, the game developer can say: "Let me set all chat messages to 'stream 1' and all motion packets to 'stream 2'.
pub mod ordering;
pub mod sequencing;
mod ordering;
mod sequencing;

/// A trait which could be implemented for arranging operations.
pub use self::ordering::{IterMut, OrderingStream, OrderingSystem};
pub use self::sequencing::{SequencingStream, SequencingSystem};

/// A trait which can be implemented for arranging operations.
pub trait Arranging {
type ArrangingItem;

Expand Down
30 changes: 21 additions & 9 deletions src/infrastructure/arranging/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
//!
//! This could be done with an iterator which returns packets as long there are packets in our storage matching the `expected_index`.
//!
//! ```rust
//! ```no-run
//! let stream = OrderingStream::new();
//!
//! let iter = stream.iter_mut();
Expand All @@ -72,6 +72,7 @@
//! - See [super-module](../index.html) description for more details.
use super::{Arranging, ArrangingSystem};
use crate::packet::SequenceNumber;
use std::collections::HashMap;

/// An ordering system that can arrange items in order on different streams.
Expand Down Expand Up @@ -128,12 +129,14 @@ impl<'a, T> ArrangingSystem for OrderingSystem<T> {
/// - See [super-module](../index.html) for more information about streams.
pub struct OrderingStream<T> {
// the id of this stream.
stream_id: u8,
_stream_id: u8,
// the storage for items that are waiting for older items to arrive.
// the items will be stored by key and value where the key is de incoming index and value the item value.
// the items will be stored by key and value where the key is the incoming index and the value is the item value.
storage: HashMap<usize, T>,
// the next expected item index.
expected_index: usize,
// unique identifier which should be used for ordering on a different stream e.g. the remote endpoint.
unique_item_identifier: u16,
}

impl<T> OrderingStream<T> {
Expand All @@ -157,20 +160,29 @@ impl<T> OrderingStream<T> {
OrderingStream {
storage: HashMap::with_capacity(size),
expected_index: 1,
stream_id,
_stream_id: stream_id,
unique_item_identifier: 0,
}
}

/// Returns the identifier of this stream.
fn stream_id(&self) -> u8 {
self.stream_id
#[cfg(test)]
pub fn stream_id(&self) -> u8 {
self._stream_id
}

/// Returns the next expected index.
#[cfg(test)]
pub fn expected_index(&self) -> usize {
self.expected_index
}

/// Returns the unique identifier which should be used for ordering on the other stream e.g. the remote endpoint.
pub fn new_item_identifier(&mut self) -> SequenceNumber {
self.unique_item_identifier = self.unique_item_identifier.wrapping_add(1);
self.unique_item_identifier
}

/// Returns an iterator of stored items.
///
/// # Algorithm for returning items from an Iterator.
Expand All @@ -183,7 +195,7 @@ impl<T> OrderingStream<T> {
///
/// # Example
///
/// ```rust
/// ```ignore
/// let stream = OrderingStream::new();
///
/// let iter = stream.iter_mut();
Expand Down Expand Up @@ -232,9 +244,9 @@ impl<T> Arranging for OrderingStream<T> {
) -> Option<Self::ArrangingItem> {
if incoming_offset == self.expected_index {
self.expected_index += 1;
Some(index)
Some(item)
} else if incoming_offset > self.expected_index {
self.storage.insert(incoming_offset, index);
self.storage.insert(incoming_offset, item);
None
} else {
// only occurs when we get a duplicated incoming_offset.
Expand Down
30 changes: 19 additions & 11 deletions src/infrastructure/arranging/sequencing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
//! - See [super-module](../index.html) description for more details.
use super::{Arranging, ArrangingSystem};
use std::{
collections::HashMap,
marker::PhantomData
};
use crate::packet::SequenceNumber;
use std::{collections::HashMap, marker::PhantomData};

/// An sequencing system that can arrange items in sequence on different streams.
/// A sequencing system that can arrange items in sequence across different streams.
///
/// Checkout [`SequencingStream`](./struct.SequencingStream.html), or module description for more details.
///
Expand Down Expand Up @@ -70,11 +68,13 @@ impl<T> ArrangingSystem for SequencingSystem<T> {
/// - See [super-module](../index.html) for more information about streams.
pub struct SequencingStream<T> {
// the id of this stream.
stream_id: u8,
_stream_id: u8,
// the highest seen item index.
top_index: usize,
// I need `PhantomData`, otherwise, I can't use a generic in the `Arranging` implementation because `T` is not constrained.
phantom: PhantomData<T>
phantom: PhantomData<T>,
// unique identifier which should be used for ordering on an other stream e.g. the remote endpoint.
unique_item_identifier: u16,
}

impl<T> SequencingStream<T> {
Expand All @@ -83,15 +83,23 @@ impl<T> SequencingStream<T> {
/// The default stream will have a capacity of 32 items.
pub fn new(stream_id: u8) -> SequencingStream<T> {
SequencingStream {
stream_id,
_stream_id: stream_id,
top_index: 0,
phantom: PhantomData
phantom: PhantomData,
unique_item_identifier: 0,
}
}

/// Returns the identifier of this stream.
fn stream_id(&self) -> u8 {
self.stream_id
#[cfg(test)]
pub fn stream_id(&self) -> u8 {
self._stream_id
}

/// Returns the unique identifier which should be used for ordering on an other stream e.g. the remote endpoint.
pub fn new_item_identifier(&mut self) -> SequenceNumber {
self.unique_item_identifier = self.unique_item_identifier.wrapping_add(1);
self.unique_item_identifier
}
}

Expand Down
26 changes: 0 additions & 26 deletions src/infrastructure/channels.rs

This file was deleted.

Loading

0 comments on commit 1363ffe

Please sign in to comment.