diff --git a/src/bin/laminar-tester.rs b/src/bin/laminar-tester.rs index 0498075f..ef71c51f 100644 --- a/src/bin/laminar-tester.rs +++ b/src/bin/laminar-tester.rs @@ -8,7 +8,7 @@ use std::{ use clap::{load_yaml, App, AppSettings}; use crossbeam_channel::Sender; -use laminar::{Config, DeliveryMethod, Packet, Result, Socket, SocketEvent, ThroughputMonitoring}; +use laminar::{Config, Packet, Result, Socket, SocketEvent, ThroughputMonitoring}; use log::{debug, error, info}; fn main() { @@ -167,11 +167,7 @@ fn run_client(config: ClientConfiguration) -> Result<()> { fn test_steady_stream(sender: &Sender, config: ClientConfiguration) { info!("Beginning steady-state test"); - let test_packet = Packet::new( - config.listen_host, - config.test_name.into_bytes().into_boxed_slice(), - DeliveryMethod::ReliableUnordered, - ); + let test_packet = Packet::reliable_unordered(config.listen_host, config.test_name.into_bytes()); let time_quantum = 1000 / config.packet_ps as u64; let start_time = Instant::now(); diff --git a/src/error/network_error.rs b/src/error/network_error.rs index cab86d12..e13a8d35 100644 --- a/src/error/network_error.rs +++ b/src/error/network_error.rs @@ -1,5 +1,7 @@ use super::{FragmentErrorKind, PacketErrorKind}; +use crate::SocketEvent; +use crossbeam_channel::SendError; use std::fmt::{self, Display, Formatter}; use std::io; @@ -26,8 +28,12 @@ pub enum ErrorKind { ConnectionPoolError(String), /// Error occurred when joining thread. JoiningThreadFailed, - /// There was an unexpected error caused by an poisoned lock. + /// There was an unexpected error caused by a poisoned lock. PoisonedLock(String), + /// Could not send on `SendChannel`. + SendError(SendError), + /// Expected header but could not be read from buffer. + CouldNotReadHeader(String), } impl Display for ErrorKind { @@ -44,6 +50,8 @@ impl Display for ErrorKind { ErrorKind::ConnectionPoolError(e) => { write!(fmt, "Something went wrong with connection timeout thread. Reason: {:?}", e) }, ErrorKind::JoiningThreadFailed => { write!(fmt, "Joining thread failed.") }, ErrorKind::PoisonedLock(e) => { write!(fmt, "There was an unexpected error caused by an poisoned lock. Reason: {:?}", e) }, + ErrorKind::SendError(e) => { write!(fmt, "Could not sent on channel because it was closed. Reason: {:?}", e) }, + ErrorKind::CouldNotReadHeader(header) => { write!(fmt, "Expected {} header but could not be read from buffer.", header) } } } } @@ -65,3 +73,9 @@ impl From for ErrorKind { ErrorKind::FragmentError(inner) } } + +impl From> for ErrorKind { + fn from(inner: SendError) -> Self { + ErrorKind::SendError(inner) + } +} diff --git a/src/infrastructure.rs b/src/infrastructure.rs index e65d6f4b..7f8bc76d 100644 --- a/src/infrastructure.rs +++ b/src/infrastructure.rs @@ -1,8 +1,9 @@ -mod channels; -mod delivery_method; +mod acknowlegement; +mod congestion; mod fragmenter; -pub use self::channels::Channel; -pub use self::channels::{ReliableChannel, SequencedChannel, UnreliableChannel}; -pub use self::delivery_method::DeliveryMethod; +pub mod arranging; + +pub use self::acknowlegement::AcknowledgementHandler; +pub use self::congestion::CongestionHandler; pub use self::fragmenter::Fragmentation; diff --git a/src/infrastructure/acknowlegement.rs b/src/infrastructure/acknowlegement.rs new file mode 100644 index 00000000..30cc94a6 --- /dev/null +++ b/src/infrastructure/acknowlegement.rs @@ -0,0 +1,50 @@ +use crate::net::{ExternalAcks, LocalAckRecord}; + +/// Type responsible for handling the acknowledgement of packets. +pub struct AcknowledgementHandler { + waiting_packets: LocalAckRecord, + their_acks: ExternalAcks, + pub seq_num: u16, + pub dropped_packets: Vec>, +} + +impl AcknowledgementHandler { + /// Constructs a new `AcknowledgementHandler` with which you can perform acknowledgement operations. + pub fn new() -> AcknowledgementHandler { + AcknowledgementHandler { + seq_num: 0, + waiting_packets: Default::default(), + their_acks: Default::default(), + dropped_packets: Vec::new(), + } + } +} + +impl AcknowledgementHandler { + /// Returns the bit mask that contains the packets who are acknowledged. + pub fn bit_mask(&self) -> u32 { + self.their_acks.field + } + + /// Returns the last acknowledged sequence number by the other endpoint. + pub fn last_seq(&self) -> u16 { + self.their_acks.last_seq + } + + /// Process the incoming sequence number. + /// + /// - Acknowledge the incoming sequence number + /// - Update dropped packets + pub fn process_incoming(&mut self, incoming_seq: u16) { + self.their_acks.ack(incoming_seq); + + let dropped_packets = self.waiting_packets.ack(incoming_seq, self.bit_mask()); + + self.dropped_packets = dropped_packets.into_iter().map(|(_, p)| p).collect(); + } + + /// Enqueue the outgoing packet for acknowledgement. + pub fn process_outgoing(&mut self, payload: &[u8]) { + self.waiting_packets.enqueue(self.seq_num, &payload); + } +} diff --git a/src/infrastructure/arranging/mod.rs b/src/infrastructure/arranging.rs similarity index 94% rename from src/infrastructure/arranging/mod.rs rename to src/infrastructure/arranging.rs index c522b616..806844bd 100644 --- a/src/infrastructure/arranging/mod.rs +++ b/src/infrastructure/arranging.rs @@ -39,10 +39,13 @@ //! The game developer can indicate on which stream he can order his packets and how he wants to arrange them. //! For example, the game developer can say: "Let me set all chat messages to 'stream 1' and all motion packets to 'stream 2'. -pub mod ordering; -pub mod sequencing; +mod ordering; +mod sequencing; -/// A trait which could be implemented for arranging operations. +pub use self::ordering::{IterMut, OrderingStream, OrderingSystem}; +pub use self::sequencing::{SequencingStream, SequencingSystem}; + +/// A trait which can be implemented for arranging operations. pub trait Arranging { type ArrangingItem; diff --git a/src/infrastructure/arranging/ordering.rs b/src/infrastructure/arranging/ordering.rs index ff89f420..f5232a08 100644 --- a/src/infrastructure/arranging/ordering.rs +++ b/src/infrastructure/arranging/ordering.rs @@ -58,7 +58,7 @@ //! //! This could be done with an iterator which returns packets as long there are packets in our storage matching the `expected_index`. //! -//! ```rust +//! ```no-run //! let stream = OrderingStream::new(); //! //! let iter = stream.iter_mut(); @@ -72,6 +72,7 @@ //! - See [super-module](../index.html) description for more details. use super::{Arranging, ArrangingSystem}; +use crate::packet::SequenceNumber; use std::collections::HashMap; /// An ordering system that can arrange items in order on different streams. @@ -128,12 +129,14 @@ impl<'a, T> ArrangingSystem for OrderingSystem { /// - See [super-module](../index.html) for more information about streams. pub struct OrderingStream { // the id of this stream. - stream_id: u8, + _stream_id: u8, // the storage for items that are waiting for older items to arrive. - // the items will be stored by key and value where the key is de incoming index and value the item value. + // the items will be stored by key and value where the key is the incoming index and the value is the item value. storage: HashMap, // the next expected item index. expected_index: usize, + // unique identifier which should be used for ordering on a different stream e.g. the remote endpoint. + unique_item_identifier: u16, } impl OrderingStream { @@ -157,20 +160,29 @@ impl OrderingStream { OrderingStream { storage: HashMap::with_capacity(size), expected_index: 1, - stream_id, + _stream_id: stream_id, + unique_item_identifier: 0, } } /// Returns the identifier of this stream. - fn stream_id(&self) -> u8 { - self.stream_id + #[cfg(test)] + pub fn stream_id(&self) -> u8 { + self._stream_id } /// Returns the next expected index. + #[cfg(test)] pub fn expected_index(&self) -> usize { self.expected_index } + /// Returns the unique identifier which should be used for ordering on the other stream e.g. the remote endpoint. + pub fn new_item_identifier(&mut self) -> SequenceNumber { + self.unique_item_identifier = self.unique_item_identifier.wrapping_add(1); + self.unique_item_identifier + } + /// Returns an iterator of stored items. /// /// # Algorithm for returning items from an Iterator. @@ -183,7 +195,7 @@ impl OrderingStream { /// /// # Example /// - /// ```rust + /// ```ignore /// let stream = OrderingStream::new(); /// /// let iter = stream.iter_mut(); @@ -232,9 +244,9 @@ impl Arranging for OrderingStream { ) -> Option { if incoming_offset == self.expected_index { self.expected_index += 1; - Some(index) + Some(item) } else if incoming_offset > self.expected_index { - self.storage.insert(incoming_offset, index); + self.storage.insert(incoming_offset, item); None } else { // only occurs when we get a duplicated incoming_offset. diff --git a/src/infrastructure/arranging/sequencing.rs b/src/infrastructure/arranging/sequencing.rs index 985c02f6..79b464eb 100644 --- a/src/infrastructure/arranging/sequencing.rs +++ b/src/infrastructure/arranging/sequencing.rs @@ -10,12 +10,10 @@ //! - See [super-module](../index.html) description for more details. use super::{Arranging, ArrangingSystem}; -use std::{ - collections::HashMap, - marker::PhantomData -}; +use crate::packet::SequenceNumber; +use std::{collections::HashMap, marker::PhantomData}; -/// An sequencing system that can arrange items in sequence on different streams. +/// A sequencing system that can arrange items in sequence across different streams. /// /// Checkout [`SequencingStream`](./struct.SequencingStream.html), or module description for more details. /// @@ -70,11 +68,13 @@ impl ArrangingSystem for SequencingSystem { /// - See [super-module](../index.html) for more information about streams. pub struct SequencingStream { // the id of this stream. - stream_id: u8, + _stream_id: u8, // the highest seen item index. top_index: usize, // I need `PhantomData`, otherwise, I can't use a generic in the `Arranging` implementation because `T` is not constrained. - phantom: PhantomData + phantom: PhantomData, + // unique identifier which should be used for ordering on an other stream e.g. the remote endpoint. + unique_item_identifier: u16, } impl SequencingStream { @@ -83,15 +83,23 @@ impl SequencingStream { /// The default stream will have a capacity of 32 items. pub fn new(stream_id: u8) -> SequencingStream { SequencingStream { - stream_id, + _stream_id: stream_id, top_index: 0, - phantom: PhantomData + phantom: PhantomData, + unique_item_identifier: 0, } } /// Returns the identifier of this stream. - fn stream_id(&self) -> u8 { - self.stream_id + #[cfg(test)] + pub fn stream_id(&self) -> u8 { + self._stream_id + } + + /// Returns the unique identifier which should be used for ordering on an other stream e.g. the remote endpoint. + pub fn new_item_identifier(&mut self) -> SequenceNumber { + self.unique_item_identifier = self.unique_item_identifier.wrapping_add(1); + self.unique_item_identifier } } diff --git a/src/infrastructure/channels.rs b/src/infrastructure/channels.rs deleted file mode 100644 index 8ad7e888..00000000 --- a/src/infrastructure/channels.rs +++ /dev/null @@ -1,26 +0,0 @@ -//! This module provides channels for processing packets of different reliabilities. - -mod reliable_channel; -mod sequenced_channel; -mod unreliable_channel; - -use crate::error::Result; -use crate::infrastructure::DeliveryMethod; -use crate::packet::PacketData; - -pub use self::reliable_channel::ReliableChannel; -pub use self::sequenced_channel::SequencedChannel; -pub use self::unreliable_channel::UnreliableChannel; - -/// This provides an abstraction for processing packets to their given reliability. -pub trait Channel { - /// Process a packet before sending it and return a packet instance with the given raw data. - fn process_outgoing( - &mut self, - payload: &[u8], - delivery_method: DeliveryMethod, - ) -> Result; - - /// Progress an packet on receive and receive the processed data. - fn process_incoming<'d>(&mut self, buffer: &'d [u8]) -> Result<&'d [u8]>; -} diff --git a/src/infrastructure/channels/reliable_channel.rs b/src/infrastructure/channels/reliable_channel.rs deleted file mode 100644 index 3ceb90b1..00000000 --- a/src/infrastructure/channels/reliable_channel.rs +++ /dev/null @@ -1,186 +0,0 @@ -use super::Channel; - -use crate::config::Config; -use crate::error::{PacketErrorKind, Result}; -use crate::infrastructure::{DeliveryMethod, Fragmentation}; -use crate::net::{ExternalAcks, LocalAckRecord, NetworkQuality, RttMeasurer}; -use crate::packet::header::{AckedPacketHeader, HeaderReader, HeaderWriter, StandardHeader}; -use crate::packet::{PacketData, PacketTypeId}; -use crate::sequence_buffer::{CongestionData, SequenceBuffer}; - -use log::error; -use std::io::Cursor; -use std::time::Instant; - -/// This channel should be used for processing packets reliable. All packets will be sent and received, ordering depends on given 'ordering' parameter. -/// -/// *Details* -/// -/// | Packet Drop | Packet Duplication | Packet Order | Packet Fragmentation | Packet Delivery | -/// | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: | -/// | No | No | Optional | Yes | Yes | -/// -/// Basically this is almost has all features TCP has. -/// Receive every packet and if specified (file downloading for example) in order (any missing keeps the later ones buffered until they are received). -pub struct ReliableChannel { - // settings - ordered: bool, - config: Config, - - // reliability control - seq_num: u16, - waiting_packets: LocalAckRecord, - their_acks: ExternalAcks, - dropped_packets: Vec>, - - // congestion control - rtt_measurer: RttMeasurer, - congestion_data: SequenceBuffer, - _quality: NetworkQuality, - rtt: f32, -} - -impl ReliableChannel { - /// Creates a new instance of the reliable channel by specifying if channel needs to order incoming packets. - pub fn new(ordered: bool, config: &Config) -> ReliableChannel { - ReliableChannel { - // settings - ordered, - config: config.clone(), - - // reliability control - seq_num: 0, - waiting_packets: Default::default(), - their_acks: Default::default(), - dropped_packets: Vec::new(), - - // congestion control - rtt_measurer: RttMeasurer::new(config), - congestion_data: SequenceBuffer::with_capacity(::max_value() as usize), - _quality: NetworkQuality::Good, - rtt: 0.0, - } - } - - /// Checks if channel is ordered or not - #[allow(dead_code)] - pub fn is_ordered(&self) -> bool { - self.ordered - } - - /// Check if this channel has dropped packets. - /// - /// You could directly call `ReliableChannel::drain_dropped_packets()` and if it returns an empty vector you know there are no packets. - /// But draining a vector will have its extra check logic even if it's empty. - /// So that's why this function exists just a little shortcut to check if there are dropped packets which will be faster at the end. - pub fn has_dropped_packets(&self) -> bool { - !self.dropped_packets.is_empty() - } - - /// Creates a draining iterator that removes dropped packets and yield the ones that are removed. - /// - /// So why drain? - /// You have to think about the packet flow first. - /// 1. Once we send a packet we place it in a queue until acknowledged. - /// 2. If the packet doesn't get acknowledged in some time it will be dropped. - /// 3. When we notice the packet drop we directly want to resend the dropped packet. - /// 4. Once we notice that we start at '1' again. - /// - /// So keeping track of old dropped packets does not make sense, at least for now. - /// We except when dropped packets are retrieved they will be sent out so we don't need to keep track of them internally the caller of this function will have ownership over them after the call. - pub fn drain_dropped_packets(&mut self) -> Vec> { - self.dropped_packets.drain(..).collect() - } -} - -impl Channel for ReliableChannel { - /// This will pre-process a reliable packet - /// - /// 1. Add congestion data entry so that it can be monitored. - /// 2. Queue new packet in acknowledgement system. - /// 3. Fragmentation of the payload. - fn process_outgoing( - &mut self, - payload: &[u8], - delivery_method: DeliveryMethod, - ) -> Result { - if payload.len() > self.config.max_packet_size { - error!( - "Packet too large: Attempting to send {}, max={}", - payload.len(), - self.config.max_packet_size - ); - Err(PacketErrorKind::ExceededMaxPacketSize)?; - } - - // queue congestion data. - self.congestion_data.insert( - CongestionData::new(self.seq_num, Instant::now()), - self.seq_num, - ); - - // queue packet for awaiting acknowledgement. - self.waiting_packets.enqueue(self.seq_num, &payload); - - // calculate size for our packet data. - // safe cast because max packet size is u16 - let payload_length = payload.len() as u16; - let packet_data_size = - Fragmentation::total_fragments_needed(payload_length, self.config.fragment_size); - let mut packet_data = PacketData::with_capacity(packet_data_size as usize); - - let packet_type = if packet_data_size > 1 { - PacketTypeId::Fragment - } else { - PacketTypeId::Packet - }; - - // create our reliable header and write it to an buffer. - let header = AckedPacketHeader::new( - StandardHeader::new(delivery_method, packet_type), - self.seq_num, - self.their_acks.last_seq, - self.their_acks.field, - ); - let mut buffer = Vec::with_capacity(header.size() as usize); - header.parse(&mut buffer)?; - - // spit the packet if the payload length is greater than the allowed fragment size. - if payload_length <= self.config.fragment_size { - packet_data.add_fragment(&buffer, payload)?; - } else { - Fragmentation::spit_into_fragments(payload, header, &mut packet_data, &self.config)?; - } - - // increase local sequence number. - self.seq_num = self.seq_num.wrapping_add(1); - - Ok(packet_data) - } - - /// Process a packet on receive. - /// - /// 1. Read reliable header. - /// 2. Update acknowledgement data. - /// 3. Calculate RTT time. - /// 4. Update dropped packets. - fn process_incoming<'d>(&mut self, buffer: &'d [u8]) -> Result<&'d [u8]> { - let mut cursor = Cursor::new(buffer); - let acked_header = AckedPacketHeader::read(&mut cursor)?; - - self.their_acks.ack(acked_header.seq); - - // update congestion information. - let congestion_data = self.congestion_data.get_mut(acked_header.ack_seq()); - self.rtt = self.rtt_measurer.get_rtt(congestion_data); - - // Update dropped packets if there are any. - let dropped_packets = self - .waiting_packets - .ack(acked_header.ack_seq(), acked_header.ack_field()); - - self.dropped_packets = dropped_packets.into_iter().map(|(_, p)| p).collect(); - - Ok(&buffer[acked_header.size() as usize..buffer.len()]) - } -} diff --git a/src/infrastructure/channels/sequenced_channel.rs b/src/infrastructure/channels/sequenced_channel.rs deleted file mode 100644 index a569cdf4..00000000 --- a/src/infrastructure/channels/sequenced_channel.rs +++ /dev/null @@ -1,40 +0,0 @@ -use super::Channel; - -use crate::error::Result; -use crate::infrastructure::DeliveryMethod; -use crate::packet::PacketData; - -/// This channel should be used for processing packets sequenced. -/// -/// *Details* -/// -/// | Packet Drop | Packet Duplication | Packet Order | Packet Fragmentation | Packet Delivery | -/// | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: | -/// | Yes | No | Yes | Yes | No | -/// -/// Toss away any packets that are older than the most recent (like a position update, you don't care about older ones), -/// packets may be dropped, just the application may not receive older ones if a newer one came in first. - -#[derive(Default)] -pub struct SequencedChannel; - -impl SequencedChannel { - /// Creates a new instance of the sequenced channel by specifying if channel needs to handle packets reliable. - pub fn new() -> SequencedChannel { - SequencedChannel - } -} - -impl Channel for SequencedChannel { - fn process_outgoing( - &mut self, - _payload: &[u8], - _delivery_method: DeliveryMethod, - ) -> Result { - unimplemented!() - } - - fn process_incoming<'d>(&mut self, _buffer: &'d [u8]) -> Result<&'d [u8]> { - unimplemented!() - } -} diff --git a/src/infrastructure/channels/unreliable_channel.rs b/src/infrastructure/channels/unreliable_channel.rs deleted file mode 100644 index f1850679..00000000 --- a/src/infrastructure/channels/unreliable_channel.rs +++ /dev/null @@ -1,62 +0,0 @@ -use super::Channel; - -use crate::error::Result; -use crate::infrastructure::DeliveryMethod; -use crate::net::constants::STANDARD_HEADER_SIZE; -use crate::packet::header::{HeaderReader, HeaderWriter, StandardHeader}; -use crate::packet::{PacketData, PacketTypeId}; - -/// This channel should be used for unreliable processing of packets. -/// -/// **Details** -/// -/// | Packet Drop | Packet Duplication | Packet Order | Packet Fragmentation | Packet Delivery | -/// | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: | -/// | Yes | Yes | No | No | No | -/// -/// Basically just bare UDP, free to be dropped, used for very unnecessary data, great for 'general' position updates. -/// Ordering depends on given 'ordering' parameter. -pub struct UnreliableChannel { - ordered: bool, -} - -impl UnreliableChannel { - /// Create a new instance of the unreliable channel. - pub fn new(ordered: bool) -> UnreliableChannel { - UnreliableChannel { ordered } - } - - /// Returns if a channel is ordered or not - #[allow(dead_code)] - pub fn is_ordered(&self) -> bool { - self.ordered - } -} - -impl Channel for UnreliableChannel { - /// This will prepossess an unreliable packet. - /// - /// 1. Generate default header. - /// 2. Append payload. - /// 3. Return the final data. - fn process_outgoing( - &mut self, - payload: &[u8], - delivery_method: DeliveryMethod, - ) -> Result { - let header = StandardHeader::new(delivery_method, PacketTypeId::Packet); - let mut buffer = Vec::with_capacity(header.size() as usize); - header.parse(&mut buffer)?; - - let mut packet_data = PacketData::with_capacity(payload.len()); - packet_data.add_fragment(&buffer, payload)?; - Ok(packet_data) - } - - /// Process a packet on receive. - /// - /// This will not do anything it will only return the bytes as they are received. - fn process_incoming<'d>(&mut self, buffer: &'d [u8]) -> Result<&'d [u8]> { - Ok(&buffer[STANDARD_HEADER_SIZE as usize..buffer.len()]) - } -} diff --git a/src/infrastructure/congestion.rs b/src/infrastructure/congestion.rs new file mode 100644 index 00000000..59b7539e --- /dev/null +++ b/src/infrastructure/congestion.rs @@ -0,0 +1,42 @@ +use crate::{ + net::{NetworkQuality, RttMeasurer}, + sequence_buffer::{CongestionData, SequenceBuffer}, + Config, +}; + +use std::time::Instant; + +/// Type that is responsible for keeping track of congestion information. +pub struct CongestionHandler { + rtt_measurer: RttMeasurer, + congestion_data: SequenceBuffer, + _quality: NetworkQuality, +} + +impl CongestionHandler { + /// Constructs a new `CongestionHandler` which you can use for keeping track of congestion information. + pub fn new(config: &Config) -> CongestionHandler { + CongestionHandler { + rtt_measurer: RttMeasurer::new(config), + congestion_data: SequenceBuffer::with_capacity(::max_value() as usize), + _quality: NetworkQuality::Good, + } + } + + /// Process incoming sequence number. + /// + /// This will calculate the RTT-time and smooth down the RTT-value to prevent uge RTT-spikes. + pub fn process_incoming(&mut self, incoming_seq: u16) { + let congestion_data = self.congestion_data.get_mut(incoming_seq); + self.rtt_measurer.calculate_rrt(congestion_data); + } + + /// Process outgoing sequence number. + /// + /// This will insert an entry which is used for keeping track of the sending time. + /// Once we process incoming sequence numbers we can calculate the `RTT` time. + pub fn process_outgoing(&mut self, seq: u16) { + self.congestion_data + .insert(CongestionData::new(seq, Instant::now()), seq); + } +} diff --git a/src/infrastructure/delivery_method.rs b/src/infrastructure/delivery_method.rs deleted file mode 100644 index 0d476416..00000000 --- a/src/infrastructure/delivery_method.rs +++ /dev/null @@ -1,103 +0,0 @@ -/// This enum defines different ways in which packets can be delivered. -/// -/// This is a very important concept which could at first be difficult to grasp, but which will be very handy later on. -/// -/// When dealing with networking for games, the two protocols that see the most use are TCP and UDP. -/// UDP is considered to be more unreliable than TCP because it lacks certain features TCP has, as shown below. -/// -/// _TCP_ -/// - Guarantee of delivery. -/// - Guarantee for order. -/// - Packets will not be dropped. -/// - Duplication not possible. -/// - Automatic fragmentation -/// -/// _UDP_ -/// - No guarantee for delivery. -/// - No guarantee for order. -/// - No way of getting dropped packet. -/// - Duplication possible. -/// - No fragmentation -// -/// TCP's features can be very useful, but they also come with some overhead. -/// This can be problematic if you only care about some of them. -/// -/// That is why it would be quite handy if you could somehow specify which features you want on top of UDP. -/// You could say, for example, "I want the guarantee for my packets to arrive, however they don't need to be in order". -/// -/// Laminar provides different kind of reliabilities contained within this enum. -#[derive(Copy, Clone, Debug, PartialOrd, PartialEq, Eq)] -pub enum DeliveryMethod { - /// Unreliable. Packets can be dropped, duplicated or arrive without order. - /// - /// **Details** - /// - /// | Packet Drop | Packet Duplication | Packet Order | Packet Fragmentation | Packet Delivery | - /// | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: | - /// | Yes | Yes | No | No | No | - /// - /// Basically just bare UDP, free to be dropped, used for very unnecessary data, great for 'general' position updates. - UnreliableUnordered, - /// Unreliable. Packets can be dropped, duplicated or arrive with order. - /// - /// **Details** - /// - /// | Packet Drop | Packet Duplication | Packet Order | Packet Fragmentation | Packet Delivery | - /// | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: | - /// | Yes | Yes | Yes | No | No | - /// - /// Basically just bare UDP, free to be dropped, used for very unnecessary data, great for 'general' position updates but packets will be ordered. - UnreliableOrdered, - /// Reliable. All packets will be sent and received, but without order. - /// - /// *Details* - /// - /// | Packet Drop | Packet Duplication | Packet Order | Packet Fragmentation | Packet Delivery | - /// | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: | - /// | No | No | No | Yes | Yes | - /// - /// Basically this is almost TCP like without ordering of packets. - /// Receive every packet and immediately give to application, order does not matter. - ReliableUnordered, - /// Reliable. All packets will be sent and received, with order. - /// - /// *Details* - /// - /// | Packet Drop | Packet Duplication | Packet Order | Packet Fragmentation | Packet Delivery | - /// | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: | - /// | No | No | Yes | Yes | Yes | - /// - /// Basically this is almost has all features TCP has. - /// Receive every packet (file downloading for example) in order (any missing keeps the later ones buffered until they are received). - ReliableOrdered, - /// Unreliable. Packets can be dropped, but never duplicated and arrive in order. - /// - /// *Details* - /// - /// | Packet Drop | Packet Duplication | Packet Order | Packet Fragmentation | Packet Delivery | - /// | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: | - /// | Yes | No | Yes | Yes | No | - /// - /// Toss away any packets that are older than the most recent (like a position update, you don't care about older ones), - /// packets may be dropped, just the application may not receive older ones if a newer one came in first. - Sequenced, -} - -impl DeliveryMethod { - /// Get integer value from `DeliveryMethod` enum. - pub fn get_delivery_method_id(delivery_method: DeliveryMethod) -> u8 { - delivery_method as u8 - } - - /// Get `DeliveryMethod` enum instance from integer value. - pub fn get_delivery_method_from_id(delivery_method_id: u8) -> DeliveryMethod { - match delivery_method_id { - 0 => DeliveryMethod::UnreliableUnordered, - 1 => DeliveryMethod::UnreliableOrdered, - 2 => DeliveryMethod::ReliableUnordered, - 3 => DeliveryMethod::ReliableOrdered, - 4 => DeliveryMethod::Sequenced, - _ => DeliveryMethod::UnreliableUnordered, - } - } -} diff --git a/src/infrastructure/fragmenter.rs b/src/infrastructure/fragmenter.rs index 846bd9f1..2d57c8d4 100644 --- a/src/infrastructure/fragmenter.rs +++ b/src/infrastructure/fragmenter.rs @@ -1,10 +1,12 @@ -use crate::config::Config; -use crate::error::{FragmentErrorKind, Result}; -use crate::packet::header::{AckedPacketHeader, FragmentHeader, HeaderReader, HeaderWriter}; -use crate::packet::PacketData; -use crate::sequence_buffer::{ReassemblyData, SequenceBuffer}; +use crate::{ + config::Config, + error::{FragmentErrorKind, Result}, + net::constants::FRAGMENT_HEADER_SIZE, + packet::header::FragmentHeader, + sequence_buffer::{ReassemblyData, SequenceBuffer}, +}; -use std::io::{Cursor, Read, Write}; +use std::io::Write; /// Type that will manage fragmentation of packets. pub struct Fragmentation { @@ -50,7 +52,7 @@ impl Fragmentation { /// /// So for 4000 bytes we need 4 fragments /// [fragment: 1024] [fragment: 1024] [fragment: 1024] [fragment: 928] - pub fn total_fragments_needed(payload_length: u16, fragment_size: u16) -> u16 { + pub fn fragments_needed(payload_length: u16, fragment_size: u16) -> u16 { let remainder = if payload_length % fragment_size > 0 { 1 } else { @@ -60,31 +62,20 @@ impl Fragmentation { } /// Split the given payload into fragments and write those fragments to the passed packet data. - pub fn spit_into_fragments( - payload: &[u8], - acked_header: AckedPacketHeader, - packet_data: &mut PacketData, - config: &Config, - ) -> Result<()> { + pub fn spit_into_fragments<'a>(payload: &'a [u8], config: &Config) -> Result> { + let mut fragments = Vec::new(); + let payload_length = payload.len() as u16; let num_fragments = - Fragmentation::total_fragments_needed(payload_length, config.fragment_size) as u8; /* safe cast max fragments is u8 */ + // Safe cast max fragments is u8 + Fragmentation::fragments_needed(payload_length, config.fragment_size) as u8; if num_fragments > config.max_fragments { Err(FragmentErrorKind::ExceededMaxFragments)?; } for fragment_id in 0..num_fragments { - let fragment = FragmentHeader::new( - acked_header.standard_header, - fragment_id, - num_fragments, - acked_header, - ); - let mut buffer = Vec::with_capacity(fragment.size() as usize); - fragment.parse(&mut buffer)?; - - // get start end pos in buffer + // get start and end position of buffer let start_fragment_pos = u16::from(fragment_id) * config.fragment_size; let mut end_fragment_pos = (u16::from(fragment_id) + 1) * config.fragment_size; @@ -96,18 +87,21 @@ impl Fragmentation { // get specific slice of data for fragment let fragment_data = &payload[start_fragment_pos as usize..end_fragment_pos as usize]; - packet_data.add_fragment(&buffer, fragment_data)?; + fragments.push(fragment_data); } - Ok(()) + Ok(fragments) } - /// This will read fragment data and returns the complete packet data when all fragments are received. - pub fn handle_fragment(&mut self, cursor: &mut Cursor<&[u8]>) -> Result>> { + /// This will read fragment data and return the complete packet when all fragments are received. + pub fn handle_fragment( + &mut self, + fragment_header: FragmentHeader, + fragment_payload: &[u8], + ) -> Result>> { // read fragment packet - let fragment_header = FragmentHeader::read(cursor)?; - self.create_fragment_if_not_exists(&fragment_header)?; + self.create_fragment_if_not_exists(fragment_header); let num_fragments_received; let num_fragments_total; @@ -134,12 +128,8 @@ impl Fragmentation { reassembly_data.num_fragments_received += 1; reassembly_data.fragments_received[usize::from(fragment_header.id())] = true; - // read payload after fragment header - let mut payload = Vec::new(); - cursor.read_to_end(&mut payload)?; - // add the payload from the fragment to the buffer whe have in cache - reassembly_data.buffer.write_all(payload.as_slice())?; + reassembly_data.buffer.write_all(&*fragment_payload)?; num_fragments_received = reassembly_data.num_fragments_received; num_fragments_total = reassembly_data.num_fragments_total; @@ -159,28 +149,17 @@ impl Fragmentation { } /// If fragment does not exist we need to insert a new entry. - fn create_fragment_if_not_exists(&mut self, fragment_header: &FragmentHeader) -> Result<()> { + fn create_fragment_if_not_exists(&mut self, fragment_header: FragmentHeader) { if !self.fragments.exists(fragment_header.sequence()) { - if fragment_header.id() == 0 { - match fragment_header.packet_header() { - Some(_header) => { - let reassembly_data = ReassemblyData::new( - fragment_header.sequence(), - fragment_header.fragment_count(), - (9 + self.config.fragment_size) as usize, - ); - - self.fragments - .insert(reassembly_data.clone(), fragment_header.sequence()); - } - None => Err(FragmentErrorKind::PacketHeaderNotFound)?, - } - } else { - Err(FragmentErrorKind::AlreadyProcessedFragment)? - } - } + let reassembly_data = ReassemblyData::new( + fragment_header.sequence(), + fragment_header.fragment_count(), + (u16::from(FRAGMENT_HEADER_SIZE) + self.config.fragment_size) as usize, + ); - Ok(()) + self.fragments + .insert(reassembly_data, fragment_header.sequence()); + } } } @@ -190,8 +169,8 @@ mod test { #[test] pub fn total_fragments_needed_test() { - let fragment_number = Fragmentation::total_fragments_needed(4000, 1024); - let fragment_number1 = Fragmentation::total_fragments_needed(500, 1024); + let fragment_number = Fragmentation::fragments_needed(4000, 1024); + let fragment_number1 = Fragmentation::fragments_needed(500, 1024); assert_eq!(fragment_number, 4); assert_eq!(fragment_number1, 1); diff --git a/src/lib.rs b/src/lib.rs index 2fa93451..433ce7b4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,7 +22,7 @@ //! #![warn(missing_docs)] - +#![allow(clippy::trivially_copy_pass_by_ref)] mod infrastructure; mod packet; mod protocol_version; @@ -37,12 +37,13 @@ mod error; /// Networking modules mod net; +#[cfg(feature = "tester")] +pub use self::throughput::ThroughputMonitoring; + pub use self::config::Config; pub use self::error::{ErrorKind, Result}; -pub use self::infrastructure::DeliveryMethod; -pub use self::net::{Socket, SocketEvent}; -pub use self::packet::Packet; +pub use self::net::Socket; +pub use self::net::SocketEvent; +pub use self::net::VirtualConnection; +pub use self::packet::{DeliveryGuarantee, OrderingGuarantee, Packet}; pub use self::protocol_version::ProtocolVersion; - -#[cfg(feature = "tester")] -pub use self::throughput::ThroughputMonitoring; diff --git a/src/net/connection/quality.rs b/src/net/connection/quality.rs index a7f76256..b5dddc64 100644 --- a/src/net/connection/quality.rs +++ b/src/net/connection/quality.rs @@ -16,6 +16,7 @@ pub enum NetworkQuality { /// It is able to smooth out the network jitter if there is any. pub struct RttMeasurer { config: Config, + rtt: f32, } impl RttMeasurer { @@ -23,13 +24,14 @@ impl RttMeasurer { pub fn new(config: &Config) -> RttMeasurer { RttMeasurer { config: config.clone(), + rtt: 0., } } /// This will calculate the round trip time (rtt) from the given acknowledgement. /// Where after it updates the rtt from the given connection. - pub fn get_rtt(&self, congestion_data: Option<&mut CongestionData>) -> f32 { - self.get_smoothed_rtt(congestion_data) + pub fn calculate_rrt(&mut self, congestion_data: Option<&mut CongestionData>) { + self.rtt = self.get_smoothed_rtt(congestion_data); } /// This will get the smoothed round trip time (rtt) from the time we last heard from an packet. diff --git a/src/net/connection/virtual_connection.rs b/src/net/connection/virtual_connection.rs index 1e9e7bc1..686e50ea 100644 --- a/src/net/connection/virtual_connection.rs +++ b/src/net/connection/virtual_connection.rs @@ -1,34 +1,40 @@ -use crate::config::Config; -use crate::error::{ErrorKind, Result}; -use crate::infrastructure::{ - Channel, DeliveryMethod, Fragmentation, ReliableChannel, SequencedChannel, UnreliableChannel, +use crate::{ + config::Config, + error::{ErrorKind, Result}, + infrastructure::{ + arranging::{Arranging, ArrangingSystem, OrderingSystem, SequencingSystem}, + AcknowledgementHandler, CongestionHandler, Fragmentation, + }, + net::constants::{ + ACKED_PACKET_HEADER, DEFAULT_ORDERING_STREAM, DEFAULT_SEQUENCING_STREAM, + STANDARD_HEADER_SIZE, + }, + packet::{ + DeliveryGuarantee, OrderingGuarantee, Outgoing, OutgoingPacketBuilder, Packet, + PacketReader, PacketType, + }, + SocketEvent, }; -use crate::packet::header::HeaderReader; -use crate::packet::header::StandardHeader; -use crate::packet::{Packet, PacketData, PacketTypeId}; -use crate::protocol_version::ProtocolVersion; -use log::error; +use crossbeam_channel::{self, Sender}; use std::fmt; -use std::io::Cursor; use std::net::SocketAddr; use std::time::{Duration, Instant}; /// Contains the information about a certain 'virtual connection' over udp. /// This connections also keeps track of network quality, processing packets, buffering data related to connection etc. pub struct VirtualConnection { - // client information /// Last time we received a packet from this client pub last_heard: Instant, /// The address of the remote endpoint pub remote_address: SocketAddr, - // reliability channels for processing packets. - unreliable_unordered_channel: UnreliableChannel, - reliable_unordered_channel: ReliableChannel, - sequenced_channel: SequencedChannel, + ordering_system: OrderingSystem>, + sequencing_system: SequencingSystem>, + acknowledge_handler: AcknowledgementHandler, + congestion_handler: CongestionHandler, - // fragmentation + config: Config, fragmentation: Fragmentation, } @@ -36,16 +42,14 @@ impl VirtualConnection { /// Creates and returns a new Connection that wraps the provided socket address pub fn new(addr: SocketAddr, config: &Config) -> VirtualConnection { VirtualConnection { - // client information last_heard: Instant::now(), remote_address: addr, - - // reliability channels for processing packets. - unreliable_unordered_channel: UnreliableChannel::new(true), - reliable_unordered_channel: ReliableChannel::new(false, config), - sequenced_channel: SequencedChannel::new(), - + ordering_system: OrderingSystem::new(), + sequencing_system: SequencingSystem::new(), + acknowledge_handler: AcknowledgementHandler::new(), + congestion_handler: CongestionHandler::new(config), fragmentation: Fragmentation::new(config), + config: config.to_owned(), } } @@ -56,87 +60,306 @@ impl VirtualConnection { } /// This pre-process the given buffer to be send over the network. - /// 1. It will append the right header. - /// 2. It will perform some actions related to how the packet should be delivered. - pub fn process_outgoing( + pub fn process_outgoing<'a>( &mut self, - payload: &[u8], - delivery_method: DeliveryMethod, - ) -> Result { - let channel: &mut Channel = match delivery_method { - DeliveryMethod::UnreliableUnordered => &mut self.unreliable_unordered_channel, - DeliveryMethod::ReliableUnordered => &mut self.reliable_unordered_channel, - DeliveryMethod::Sequenced => &mut self.sequenced_channel, - _ => { - error!("Tried using channel type which is not supported yet. Switched to unreliable unordered packet handling."); - &mut self.unreliable_unordered_channel + payload: &'a [u8], + delivery_guarantee: DeliveryGuarantee, + ordering_guarantee: OrderingGuarantee, + ) -> Result> { + match delivery_guarantee { + DeliveryGuarantee::Unreliable => { + let mut builder = OutgoingPacketBuilder::new(payload).with_default_header( + PacketType::Packet, + delivery_guarantee, + ordering_guarantee, + ); + + if let OrderingGuarantee::Sequenced(stream_id) = ordering_guarantee { + let item_identifier = self + .sequencing_system + .get_or_create_stream(stream_id.unwrap_or(DEFAULT_SEQUENCING_STREAM)) + .new_item_identifier(); + + builder = builder.with_sequencing_header(item_identifier as u16, stream_id); + }; + + Ok(Outgoing::Packet(builder.build())) } - }; + DeliveryGuarantee::Reliable => { + let payload_length = payload.len() as u16; + + let outgoing = { + // spit the packet if the payload length is greater than the allowed fragment size. + if payload_length <= self.config.fragment_size { + let mut builder = OutgoingPacketBuilder::new(payload).with_default_header( + PacketType::Packet, + delivery_guarantee, + ordering_guarantee, + ); + + builder = builder.with_acknowledgement_header( + self.acknowledge_handler.seq_num, + self.acknowledge_handler.last_seq(), + self.acknowledge_handler.bit_mask(), + ); + + if let OrderingGuarantee::Ordered(stream_id) = ordering_guarantee { + let item_identifier = self + .ordering_system + .get_or_create_stream(stream_id.unwrap_or(DEFAULT_ORDERING_STREAM)) + .new_item_identifier(); + + builder = + builder.with_ordering_header(item_identifier as u16, stream_id); + }; + + if let OrderingGuarantee::Sequenced(stream_id) = ordering_guarantee { + let item_identifier = self + .sequencing_system + .get_or_create_stream( + stream_id.unwrap_or(DEFAULT_SEQUENCING_STREAM), + ) + .new_item_identifier(); + + builder = + builder.with_sequencing_header(item_identifier as u16, stream_id); + }; + + Outgoing::Packet(builder.build()) + } else { + Outgoing::Fragments( + Fragmentation::spit_into_fragments(payload, &self.config)? + .into_iter() + .enumerate() + .map(|(fragment_id, fragment)| { + let fragments_needed = Fragmentation::fragments_needed( + payload_length, + self.config.fragment_size, + ) + as u8; + + let mut builder = OutgoingPacketBuilder::new(fragment) + .with_default_header( + PacketType::Fragment, + delivery_guarantee, + ordering_guarantee, + ); + + builder = builder.with_fragment_header( + self.acknowledge_handler.seq_num, + fragment_id as u8, + fragments_needed, + ); + + if fragment_id == 0 { + builder = builder.with_acknowledgement_header( + self.acknowledge_handler.seq_num, + self.acknowledge_handler.last_seq(), + self.acknowledge_handler.bit_mask(), + ); + } + + builder.build() + }) + .collect(), + ) + } + }; + + self.congestion_handler + .process_outgoing(self.acknowledge_handler.seq_num); + self.acknowledge_handler.process_outgoing(payload); - let packet_data: PacketData = channel.process_outgoing(payload, delivery_method)?; + self.acknowledge_handler.seq_num = self.acknowledge_handler.seq_num.wrapping_add(1); - Ok(packet_data) + Ok(outgoing) + } + } } - /// This process the incoming data and returns an packet if the data is complete. - /// - /// Returns `Ok(None)`: - /// 1. In the case of fragmentation and not all fragments are received - /// 2. In the case of the packet being queued for ordering and we are waiting on older packets first. - pub fn process_incoming(&mut self, received_data: &[u8]) -> Result> { + /// This processes the incoming data and returns an packet if the data is complete. + pub fn process_incoming( + &mut self, + received_data: &[u8], + sender: &Sender, + ) -> crate::Result<()> { self.last_heard = Instant::now(); - let mut cursor = Cursor::new(received_data); - let header = StandardHeader::read(&mut cursor)?; + let mut packet_reader = PacketReader::new(received_data); - if !ProtocolVersion::valid_version(header.protocol_version) { + let header = packet_reader.read_standard_header()?; + + if !header.is_current_protocol() { return Err(ErrorKind::ProtocolVersionMismatch); } - if header.packet_type_id == PacketTypeId::Fragment { - cursor.set_position(0); - match self.fragmentation.handle_fragment(&mut cursor) { - Ok(Some(payload)) => { - return Ok(Some(Packet::new( - self.remote_address, - payload.into_boxed_slice(), - header.delivery_method, - ))); + match header.delivery_guarantee() { + DeliveryGuarantee::Unreliable => { + if let OrderingGuarantee::Sequenced(_id) = header.ordering_guarantee() { + let arranging_header = + packet_reader.read_arranging_header(u16::from(STANDARD_HEADER_SIZE))?; + + let payload = packet_reader.read_payload(); + + let stream = self + .sequencing_system + .get_or_create_stream(arranging_header.stream_id()); + + if let Some(packet) = + stream.arrange(arranging_header.arranging_id() as usize, payload) + { + Self::queue_packet( + sender, + packet, + self.remote_address, + header.delivery_guarantee(), + OrderingGuarantee::Sequenced(Some(arranging_header.stream_id())), + )?; + } + + return Ok(()); } - Ok(None) => return Ok(None), - Err(e) => return Err(e), + + Self::queue_packet( + sender, + packet_reader.read_payload(), + self.remote_address, + header.delivery_guarantee(), + header.ordering_guarantee(), + )?; } - } + DeliveryGuarantee::Reliable => { + if header.is_fragment() { + if let Ok((fragment_header, acked_header)) = packet_reader.read_fragment() { + let payload = packet_reader.read_payload(); + + match self + .fragmentation + .handle_fragment(fragment_header, &payload) + { + Ok(Some(payload)) => { + Self::queue_packet( + sender, + payload.into_boxed_slice(), + self.remote_address, + header.delivery_guarantee(), + OrderingGuarantee::None, + )?; + } + Ok(None) => return Ok(()), + Err(e) => return Err(e), + }; + + if let Some(acked_header) = acked_header { + self.congestion_handler + .process_incoming(acked_header.sequence()); + self.acknowledge_handler + .process_incoming(acked_header.sequence()); + } + } + } else { + let acked_header = packet_reader.read_acknowledge_header()?; + + if let OrderingGuarantee::Sequenced(_) = header.ordering_guarantee() { + let arranging_header = packet_reader.read_arranging_header(u16::from( + STANDARD_HEADER_SIZE + ACKED_PACKET_HEADER, + ))?; + + let payload = packet_reader.read_payload(); + + let stream = self + .sequencing_system + .get_or_create_stream(arranging_header.stream_id()); + + if let Some(packet) = + stream.arrange(arranging_header.arranging_id() as usize, payload) + { + Self::queue_packet( + sender, + packet, + self.remote_address, + header.delivery_guarantee(), + OrderingGuarantee::Sequenced(Some(arranging_header.stream_id())), + )?; + } + } - // get the right channel to process the packet. - let channel: &mut Channel = match header.delivery_method { - DeliveryMethod::UnreliableUnordered => &mut self.unreliable_unordered_channel, - DeliveryMethod::ReliableUnordered => &mut self.reliable_unordered_channel, - DeliveryMethod::Sequenced => &mut self.sequenced_channel, - _ => { - error!("Tried using channel type which is not supported yet. Swished to unreliable unordered packet handling."); - &mut self.unreliable_unordered_channel + if let OrderingGuarantee::Ordered(_id) = header.ordering_guarantee() { + let arranging_header = packet_reader.read_arranging_header(u16::from( + STANDARD_HEADER_SIZE + ACKED_PACKET_HEADER, + ))?; + + let payload = packet_reader.read_payload(); + + let stream = self + .ordering_system + .get_or_create_stream(arranging_header.stream_id()); + + if let Some(packet) = + stream.arrange(arranging_header.arranging_id() as usize, payload) + { + Self::queue_packet( + sender, + packet, + self.remote_address, + header.delivery_guarantee(), + OrderingGuarantee::Ordered(Some(arranging_header.stream_id())), + )?; + + while let Some(packet) = stream.iter_mut().next() { + Self::queue_packet( + sender, + packet, + self.remote_address, + header.delivery_guarantee(), + OrderingGuarantee::Ordered(Some(arranging_header.stream_id())), + )?; + } + } + } else { + let payload = packet_reader.read_payload(); + + Self::queue_packet( + sender, + payload, + self.remote_address, + header.delivery_guarantee(), + header.ordering_guarantee(), + )?; + } + + self.congestion_handler + .process_incoming(acked_header.sequence()); + self.acknowledge_handler + .process_incoming(acked_header.sequence()); + } } - }; + } - let payload = channel.process_incoming(received_data)?; + Ok(()) + } - Ok(Some(Packet::new( - self.remote_address, - Box::from(payload), - header.delivery_method, - ))) + fn queue_packet( + tx: &Sender, + payload: Box<[u8]>, + remote_addr: SocketAddr, + delivery: DeliveryGuarantee, + ordering: OrderingGuarantee, + ) -> Result<()> { + tx.send(SocketEvent::Packet(Packet::new( + remote_addr, + payload, + delivery, + ordering, + )))?; + Ok(()) } /// This will gather dropped packets from the reliable channels. /// /// Note that after requesting dropped packets the dropped packets will be removed from this client. pub fn gather_dropped_packets(&mut self) -> Vec> { - if self.reliable_unordered_channel.has_dropped_packets() { - return self.reliable_unordered_channel.drain_dropped_packets(); - } - - Vec::new() + self.acknowledge_handler.dropped_packets.drain(..).collect() } } @@ -153,87 +376,444 @@ impl fmt::Debug for VirtualConnection { #[cfg(test)] mod tests { + use super::VirtualConnection; use crate::config::Config; - use crate::infrastructure::DeliveryMethod; - use crate::net::connection::VirtualConnection; + use crate::net::constants; + use crate::packet::header::{AckedPacketHeader, ArrangingHeader, HeaderWriter, StandardHeader}; + use crate::packet::{DeliveryGuarantee, OrderingGuarantee, Outgoing, Packet, PacketType}; + use crate::protocol_version::ProtocolVersion; + use crate::SocketEvent; + use byteorder::{BigEndian, WriteBytesExt}; + use crossbeam_channel::{unbounded, TryRecvError}; + use std::io::Write; - const SERVER_ADDR: &str = "127.0.0.1:12345"; + const PAYLOAD: [u8; 4] = [1, 2, 3, 4]; - fn create_virtual_connection() -> VirtualConnection { - VirtualConnection::new(SERVER_ADDR.parse().unwrap(), &Config::default()) - } + #[test] + fn assure_right_fragmentation() { + let mut protocol_version = Vec::new(); + protocol_version + .write_u16::(ProtocolVersion::get_crc16()) + .unwrap(); - fn assert_packet_payload( - buffer: &[u8], - parts: &Vec>, - connection: &mut VirtualConnection, - ) { - for part in parts { - let packet = connection.process_incoming(&part).unwrap().unwrap(); - assert_eq!(buffer, packet.payload()); + let standard_header = [protocol_version, vec![1, 1, 2]].concat(); + + let acked_header = vec![1, 0, 0, 2, 0, 0, 0, 3]; + let first_fragment = vec![0, 1, 1, 3]; + let second_fragment = vec![0, 1, 2, 3]; + let third_fragment = vec![0, 1, 3, 3]; + + let (tx, rx) = unbounded::(); + + let mut connection = create_virtual_connection(); + connection + .process_incoming( + [standard_header.as_slice(), acked_header.as_slice()] + .concat() + .as_slice(), + &tx, + ) + .unwrap(); + assert!(rx.try_recv().is_err()); + connection + .process_incoming( + [ + standard_header.as_slice(), + first_fragment.as_slice(), + &PAYLOAD, + ] + .concat() + .as_slice(), + &tx, + ) + .unwrap(); + assert!(rx.try_recv().is_err()); + connection + .process_incoming( + [ + standard_header.as_slice(), + second_fragment.as_slice(), + &PAYLOAD, + ] + .concat() + .as_slice(), + &tx, + ) + .unwrap(); + assert!(rx.try_recv().is_err()); + connection + .process_incoming( + [ + standard_header.as_slice(), + third_fragment.as_slice(), + &PAYLOAD, + ] + .concat() + .as_slice(), + &tx, + ) + .unwrap(); + + let complete_fragment = rx.try_recv().unwrap(); + + match complete_fragment { + SocketEvent::Packet(fragment) => assert_eq!( + fragment.payload(), + &*[PAYLOAD, PAYLOAD, PAYLOAD].concat().into_boxed_slice() + ), + _ => { + panic!("Expected fragment other result."); + } } } #[test] - fn process_unreliable_packet() { + fn expect_fragmentation() { let mut connection = create_virtual_connection(); - let buffer = vec![1; 500]; + let buffer = vec![1; 4000]; - let mut packet_data = connection - .process_outgoing(&buffer, DeliveryMethod::UnreliableUnordered) + let outgoing = connection + .process_outgoing( + &buffer, + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(None), + ) .unwrap(); - assert_eq!(packet_data.fragment_count(), 1); - assert_packet_payload(&buffer, packet_data.parts(), &mut connection); + match outgoing { + Outgoing::Packet(_) => panic!("Expected fragment got packet"), + Outgoing::Fragments(fragments) => { + assert_eq!(fragments.len(), 4); + } + } } #[test] - fn process_reliable_unordered_packet() { + fn assure_outgoing_processing_goes_right() { let mut connection = create_virtual_connection(); - let buffer = vec![1; 500]; + let buffer = vec![1; 4000]; + + connection + .process_outgoing( + &buffer, + DeliveryGuarantee::Unreliable, + OrderingGuarantee::None, + ) + .unwrap(); - let mut packet_data = connection - .process_outgoing(&buffer, DeliveryMethod::ReliableUnordered) + connection + .process_outgoing( + &buffer, + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(None), + ) .unwrap(); - assert_eq!(packet_data.fragment_count(), 1); - assert_packet_payload(&buffer, packet_data.parts(), &mut connection); + connection + .process_outgoing( + &buffer, + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(None), + ) + .unwrap(); + + connection + .process_outgoing( + &buffer, + DeliveryGuarantee::Reliable, + OrderingGuarantee::Sequenced(None), + ) + .unwrap(); } #[test] - fn process_fragmented_packet() { + fn assure_right_sequencing() { let mut connection = create_virtual_connection(); - let buffer = vec![1; 4000]; + assert_incoming_with_order( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::unreliable_sequenced( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 1, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::unreliable_sequenced( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 3, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(Some(1)), + &mut connection, + Err(TryRecvError::Empty), + 2, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::unreliable_sequenced( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 4, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Reliable, + OrderingGuarantee::Sequenced(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::reliable_sequenced( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 5, + ); + } - let mut packet_data = connection - .process_outgoing(&buffer, DeliveryMethod::ReliableUnordered) - .unwrap(); + #[test] + fn assure_right_ordering() { + let mut connection = create_virtual_connection(); - // there should be 4 fragments - assert_eq!(packet_data.fragment_count(), 4); + assert_incoming_with_order( + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::reliable_ordered( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 1, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(Some(1)), + &mut connection, + Err(TryRecvError::Empty), + 3, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(Some(1)), + &mut connection, + Err(TryRecvError::Empty), + 4, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::reliable_ordered( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 2, + ); + } - for (index, part) in packet_data.parts().into_iter().enumerate() { - let option = connection.process_incoming(&part).unwrap(); + #[test] + fn assure_right_processing_of_incoming_packets() { + let mut connection = create_virtual_connection(); - // take note index 3 will contain the fragment data because the bytes of the fragmented packet will be returned when all fragments are processed. - // that is why the last packet (index 3) can be asserted on. - match option { - None => { - if index < 3 { - assert!(true) - } else { - assert!(false) - } - } - Some(packet) => { - if index == 3 { - assert_eq!(buffer, packet.payload()); - } - } + assert_incoming_without_order( + DeliveryGuarantee::Unreliable, + &mut connection, + SocketEvent::Packet(Packet::unreliable(get_fake_addr(), PAYLOAD.to_vec())), + ); + + assert_incoming_without_order( + DeliveryGuarantee::Reliable, + &mut connection, + SocketEvent::Packet(Packet::reliable_unordered( + get_fake_addr(), + PAYLOAD.to_vec(), + )), + ); + + assert_incoming_with_order( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::unreliable_sequenced( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 1, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::reliable_ordered( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 1, + ); + } + + #[test] + fn assure_right_header_size() { + assert_right_header_size( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::None, + (constants::STANDARD_HEADER_SIZE) as usize, + ); + assert_right_header_size( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(None), + (constants::STANDARD_HEADER_SIZE + constants::ARRANGING_PACKET_HEADER) as usize, + ); + assert_right_header_size( + DeliveryGuarantee::Reliable, + OrderingGuarantee::None, + (constants::STANDARD_HEADER_SIZE + constants::ACKED_PACKET_HEADER) as usize, + ); + assert_right_header_size( + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(None), + (constants::STANDARD_HEADER_SIZE + + constants::ACKED_PACKET_HEADER + + constants::ARRANGING_PACKET_HEADER) as usize, + ); + } + + /// ======= helper functions ========= + fn create_virtual_connection() -> VirtualConnection { + VirtualConnection::new(get_fake_addr(), &Config::default()) + } + + fn get_fake_addr() -> std::net::SocketAddr { + "127.0.0.1:0".parse().unwrap() + } + + // assert that the processing of the given `DeliveryGuarantee` and `OrderingGuarantee` results into the given `result_event` + fn assert_incoming_with_order( + delivery: DeliveryGuarantee, + ordering: OrderingGuarantee, + connection: &mut VirtualConnection, + result_event: Result, + order_id: u16, + ) { + let mut packet = Vec::new(); + + // configure the right header based on specified guarantees. + let header = StandardHeader::new(delivery, ordering, PacketType::Packet); + header.parse(&mut packet).unwrap(); + + if let OrderingGuarantee::Sequenced(val) = ordering { + if delivery == DeliveryGuarantee::Reliable { + let ack_header = AckedPacketHeader::new(1, 2, 3); + ack_header.parse(&mut packet).unwrap(); + } + + let order_header = ArrangingHeader::new(order_id, val.unwrap()); + order_header.parse(&mut packet).unwrap(); + } + + if let OrderingGuarantee::Ordered(val) = ordering { + if delivery == DeliveryGuarantee::Reliable { + let ack_header = AckedPacketHeader::new(1, 2, 3); + let order_header = ArrangingHeader::new(order_id, val.unwrap()); + ack_header.parse(&mut packet).unwrap(); + order_header.parse(&mut packet).unwrap(); + } + } + + if let OrderingGuarantee::None = ordering { + if delivery == DeliveryGuarantee::Reliable { + let ack_header = AckedPacketHeader::new(1, 2, 3); + ack_header.parse(&mut packet).unwrap(); + } + } + + packet.write_all(&PAYLOAD).unwrap(); + + let (tx, rx) = unbounded::(); + + connection.process_incoming(packet.as_slice(), &tx).unwrap(); + + let event = rx.try_recv(); + + match event { + Ok(val) => assert_eq!(val, result_event.unwrap()), + Err(e) => assert_eq!(e, result_event.err().unwrap()), + } + } + + // assert that the given `DeliveryGuarantee` results into the given `SocketEvent` after processing. + fn assert_incoming_without_order( + delivery: DeliveryGuarantee, + connection: &mut VirtualConnection, + result_event: SocketEvent, + ) { + let mut packet = Vec::new(); + + // configure the right header based on specified guarantees. + let header = StandardHeader::new(delivery, OrderingGuarantee::None, PacketType::Packet); + header.parse(&mut packet).unwrap(); + + if delivery == DeliveryGuarantee::Reliable { + let ack_header = AckedPacketHeader::new(1, 2, 3); + ack_header.parse(&mut packet).unwrap(); + } + + packet.write_all(&PAYLOAD).unwrap(); + + let (tx, rx) = unbounded::(); + + connection.process_incoming(packet.as_slice(), &tx).unwrap(); + + let event = rx.try_recv(); + + assert_eq!(event, Ok(result_event)); + } + + // assert that the size of the processed header is the same as the given one. + fn assert_right_header_size( + delivery: DeliveryGuarantee, + ordering: OrderingGuarantee, + expected_header_size: usize, + ) { + let mut connection = create_virtual_connection(); + + let buffer = vec![1; 500]; + + let outgoing = connection + .process_outgoing(&buffer, delivery, ordering) + .unwrap(); + + match outgoing { + Outgoing::Packet(packet) => { + assert_eq!(packet.contents().len() - buffer.len(), expected_header_size); } + Outgoing::Fragments(_) => panic!("Expected packet got fragment"), } } } diff --git a/src/net/constants.rs b/src/net/constants.rs index aedc38f4..0ea3b03c 100644 --- a/src/net/constants.rs +++ b/src/net/constants.rs @@ -1,11 +1,13 @@ /// Fragment header size -pub const FRAGMENT_HEADER_SIZE: u8 = 4 + STANDARD_HEADER_SIZE; +pub const FRAGMENT_HEADER_SIZE: u8 = 4; /// Acked packet header size -pub const ACKED_PACKET_HEADER: u8 = 8 + STANDARD_HEADER_SIZE; +pub const ACKED_PACKET_HEADER: u8 = 8; +/// Arranging packet header size +pub const ARRANGING_PACKET_HEADER: u8 = 3; /// Standard header size -pub const STANDARD_HEADER_SIZE: u8 = 4; -/// Heartbeat header size -pub const HEART_BEAT_HEADER_SIZE: u8 = 5; +pub const STANDARD_HEADER_SIZE: u8 = 5; +pub const DEFAULT_ORDERING_STREAM: u8 = 255; +pub const DEFAULT_SEQUENCING_STREAM: u8 = 255; /// Default max number of fragments to size pub const MAX_FRAGMENTS_DEFAULT: u16 = 16; /// Default max size of each fragment diff --git a/src/net/events.rs b/src/net/events.rs index f00bdf1b..0e6bf668 100644 --- a/src/net/events.rs +++ b/src/net/events.rs @@ -2,7 +2,7 @@ use crate::packet::Packet; use std::net::SocketAddr; /// Events which will be pushed through the event_receiver returned by `Socket::bind`. -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum SocketEvent { /// A packet has been received from a client. Packet(Packet), diff --git a/src/net/socket.rs b/src/net/socket.rs index 1815c30c..7c91851a 100644 --- a/src/net/socket.rs +++ b/src/net/socket.rs @@ -2,10 +2,10 @@ use crate::{ config::Config, error::{ErrorKind, Result}, net::{connection::ActiveConnections, events::SocketEvent, link_conditioner::LinkConditioner}, - packet::Packet, + packet::{Outgoing, Packet}, }; use crossbeam_channel::{self, unbounded, Receiver, Sender}; -use log::{debug, error}; +use log::error; use std::{ self, io, net::{SocketAddr, ToSocketAddrs, UdpSocket}, @@ -55,23 +55,8 @@ impl Socket { // Nothing should break out of this loop! loop { // First we pull any newly arrived packets and handle them - match self.recv_from() { - Ok(result) => match result { - Some(packet) => { - match self.event_sender.send(SocketEvent::Packet(packet)) { - Ok(_) => {} - Err(e) => { - error!("Error sending SocketEvent: {:?}", e); - } - }; - } - None => { - debug!("Empty packet received"); - } - }, - Err(e) => { - error!("Error receiving packet: {:?}", e); - } + if let Err(e) = self.recv_from() { + error!("Error receiving packet: {:?}", e); }; // Now grab all the packets waiting to be sent and send them @@ -100,34 +85,52 @@ impl Socket { // Serializes and sends a `Packet` on the socket. On success, returns the number of bytes written. fn send_to(&mut self, packet: Packet) -> Result { - let connection = self - .connections - .get_or_insert_connection(packet.addr(), &self.config); - let mut packet_data = - connection.process_outgoing(packet.payload(), packet.delivery_method())?; + let (dropped_packets, processed_packet) = { + let connection = self + .connections + .get_or_insert_connection(packet.addr(), &self.config); + + let processed_packet = connection.process_outgoing( + packet.payload(), + packet.delivery_guarantee(), + packet.order_guarantee(), + )?; + + (connection.gather_dropped_packets(), processed_packet) + }; + let mut bytes_sent = 0; - if let Some(link_conditioner) = &self.link_conditioner { - if link_conditioner.should_send() { - for payload in packet_data.parts() { - bytes_sent += self.send_packet(&packet.addr(), &payload)?; - } - } + let should_send = if let Some(link_conditioner) = &self.link_conditioner { + link_conditioner.should_send() } else { - for payload in connection.gather_dropped_packets() { - bytes_sent += self.send_packet(&packet.addr(), &payload)?; + true + }; + + if should_send { + match processed_packet { + Outgoing::Packet(outgoing) => { + bytes_sent += self.send_packet(&packet.addr(), &outgoing.contents())?; + } + Outgoing::Fragments(packets) => { + for outgoing in packets { + bytes_sent += self.send_packet(&packet.addr(), &outgoing.contents())?; + } + } } - for payload in packet_data.parts() { + for payload in dropped_packets { bytes_sent += self.send_packet(&packet.addr(), &payload)?; } + + return Ok(bytes_sent); } - Ok(bytes_sent) + Ok(0) } - // Receives a single message from the socket. On success, returns the packet containing origin and data. - fn recv_from(&mut self) -> Result> { + // On success the packet will be send on the `event_sender` + fn recv_from(&mut self) -> Result<()> { match self.socket.recv_from(&mut self.recv_buffer) { Ok((recv_len, address)) => { if recv_len == 0 { @@ -137,7 +140,7 @@ impl Socket { let connection = self .connections .get_or_insert_connection(address, &self.config); - connection.process_incoming(received_payload) + connection.process_incoming(received_payload, &self.event_sender)?; } Err(e) => { if e.kind() == io::ErrorKind::WouldBlock { @@ -145,9 +148,10 @@ impl Socket { } else { error!("Encountered an error receiving data: {:?}", e); } - Err(e.into()) + return Err(e.into()); } } + Ok(()) } // Send a single packet over the UDP socket. diff --git a/src/packet.rs b/src/packet.rs index c4503e7a..a04fcbe0 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -1,10 +1,21 @@ /// Contains code dealing with Packet Headers pub mod header; -mod packet_data; +mod enums; +mod outgoing; +mod packet_reader; mod packet_structure; -mod packet_type; -pub use self::packet_data::PacketData; +pub use self::enums::{DeliveryGuarantee, OrderingGuarantee, PacketType}; +pub use self::outgoing::{Outgoing, OutgoingPacket, OutgoingPacketBuilder}; +pub use self::packet_reader::PacketReader; pub use self::packet_structure::Packet; -pub use self::packet_type::{PacketType, PacketTypeId}; + +pub type SequenceNumber = u16; + +pub trait EnumConverter { + type Enum; + + fn to_u8(&self) -> u8; + fn from_u8(input: u8) -> Self::Enum; +} diff --git a/src/packet/enums.rs b/src/packet/enums.rs new file mode 100644 index 00000000..577d536d --- /dev/null +++ b/src/packet/enums.rs @@ -0,0 +1,137 @@ +use crate::packet::EnumConverter; + +/// Enum to specify how a packet should be delivered. +#[derive(Copy, Clone, Debug, PartialOrd, PartialEq, Eq)] +pub enum DeliveryGuarantee { + /// Packet may or may not be delivered + Unreliable, + /// Packet will be delivered + Reliable, +} + +impl EnumConverter for DeliveryGuarantee { + type Enum = DeliveryGuarantee; + + /// Get integer value from `DeliveryGuarantee` enum. + fn to_u8(&self) -> u8 { + *self as u8 + } + + /// Get `DeliveryGuarantee` enum instance from integer value. + fn from_u8(input: u8) -> Self::Enum { + match input { + 0 => DeliveryGuarantee::Unreliable, + 1 => DeliveryGuarantee::Reliable, + _ => unimplemented!("Delivery Guarantee {} does not exist yet.", input), + } + } +} + +/// Enum to specify how a packet should be arranged. +#[derive(Copy, Clone, Debug, PartialOrd, PartialEq, Eq)] +pub enum OrderingGuarantee { + /// No arranging will be done. + None, + /// Packets will be arranged in sequence. + Sequenced(Option), + /// Packets will be arranged in order. + Ordered(Option), +} + +impl EnumConverter for OrderingGuarantee { + type Enum = OrderingGuarantee; + + /// Get integer value from `OrderingGuarantee` enum. + fn to_u8(&self) -> u8 { + match self { + OrderingGuarantee::None => 0, + OrderingGuarantee::Sequenced(_) => 1, + OrderingGuarantee::Ordered(_) => 2, + } + } + + /// Get `OrderingGuarantee` enum instance from integer value. + fn from_u8(input: u8) -> Self::Enum { + match input { + 0 => OrderingGuarantee::None, + 1 => OrderingGuarantee::Sequenced(None), + 2 => OrderingGuarantee::Ordered(None), + _ => unimplemented!("Ordering Guarantee {} does not exist yet.", input), + } + } +} + +#[derive(Copy, Clone, Debug, PartialOrd, PartialEq)] +/// Id to identify a certain packet type. +pub enum PacketType { + /// Full packet that is not fragmented + Packet = 0, + /// Fragment of a full packet + Fragment = 1, +} + +impl EnumConverter for PacketType { + type Enum = PacketType; + + fn to_u8(&self) -> u8 { + *self as u8 + } + + fn from_u8(input: u8) -> Self::Enum { + match input { + 0 => PacketType::Packet, + 1 => PacketType::Fragment, + _ => unimplemented!("Packet ID {} does not exist yet.", input), + } + } +} + +#[cfg(test)] +mod tests { + use crate::packet::{ + enums::{DeliveryGuarantee, OrderingGuarantee, PacketType}, + EnumConverter, + }; + + #[test] + fn assure_parsing_ordering_guarantee() { + let none = OrderingGuarantee::None; + let ordered = OrderingGuarantee::Ordered(None); + let sequenced = OrderingGuarantee::Sequenced(None); + + assert_eq!( + OrderingGuarantee::None, + OrderingGuarantee::from_u8(none.to_u8()) + ); + assert_eq!( + OrderingGuarantee::Ordered(None), + OrderingGuarantee::from_u8(ordered.to_u8()) + ); + assert_eq!( + OrderingGuarantee::Sequenced(None), + OrderingGuarantee::from_u8(sequenced.to_u8()) + ) + } + + #[test] + fn assure_parsing_delivery_guarantee() { + let unreliable = DeliveryGuarantee::Unreliable; + let reliable = DeliveryGuarantee::Reliable; + assert_eq!( + DeliveryGuarantee::Unreliable, + DeliveryGuarantee::from_u8(unreliable.to_u8()) + ); + assert_eq!( + DeliveryGuarantee::Reliable, + DeliveryGuarantee::from_u8(reliable.to_u8()) + ) + } + + #[test] + fn assure_parsing_packet_id() { + let packet = PacketType::Packet; + let fragment = PacketType::Fragment; + assert_eq!(PacketType::Packet, PacketType::from_u8(packet.to_u8())); + assert_eq!(PacketType::Fragment, PacketType::from_u8(fragment.to_u8())) + } +} diff --git a/src/packet/header.rs b/src/packet/header.rs index 94ba2eb2..cc25b5d3 100644 --- a/src/packet/header.rs +++ b/src/packet/header.rs @@ -1,13 +1,13 @@ mod acked_packet_header; +mod arranging_header; mod fragment_header; mod header_reader; mod header_writer; -mod heart_beat_header; mod standard_header; pub use self::acked_packet_header::AckedPacketHeader; +pub use self::arranging_header::ArrangingHeader; pub use self::fragment_header::FragmentHeader; pub use self::header_reader::HeaderReader; pub use self::header_writer::HeaderWriter; -pub use self::heart_beat_header::HeartBeatHeader; pub use self::standard_header::StandardHeader; diff --git a/src/packet/header/acked_packet_header.rs b/src/packet/header/acked_packet_header.rs index c6f1b3c7..dacc5af4 100644 --- a/src/packet/header/acked_packet_header.rs +++ b/src/packet/header/acked_packet_header.rs @@ -1,15 +1,12 @@ use super::{HeaderReader, HeaderWriter}; use crate::error::Result; use crate::net::constants::ACKED_PACKET_HEADER; -use crate::packet::header::StandardHeader; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::io::Cursor; #[derive(Copy, Clone, Debug)] /// This header providing reliability information. pub struct AckedPacketHeader { - /// StandardHeader for the Acked Packet - pub standard_header: StandardHeader, /// this is the sequence number so that we can know where in the sequence of packages this packet belongs. pub seq: u16, // this is the last acknowledged sequence number. @@ -22,14 +19,8 @@ impl AckedPacketHeader { /// When we compose packet headers, the local sequence becomes the sequence number of the packet, and the remote sequence becomes the ack. /// The ack bitfield is calculated by looking into a queue of up to 33 packets, containing sequence numbers in the range [remote sequence - 32, remote sequence]. /// We set bit n (in [1,32]) in ack bits to 1 if the sequence number remote sequence - n is in the received queue. - pub fn new( - standard_header: StandardHeader, - seq_num: u16, - last_seq: u16, - bit_field: u32, - ) -> AckedPacketHeader { + pub fn new(seq_num: u16, last_seq: u16, bit_field: u32) -> AckedPacketHeader { AckedPacketHeader { - standard_header, seq: seq_num, ack_seq: last_seq, ack_field: bit_field, @@ -43,11 +34,13 @@ impl AckedPacketHeader { } /// Get bit field of all last 32 acknowledged packages + #[cfg(test)] pub fn ack_field(&self) -> u32 { self.ack_field } /// Get last acknowledged sequence number. + #[cfg(test)] pub fn ack_seq(&self) -> u16 { self.ack_seq } @@ -57,7 +50,6 @@ impl HeaderWriter for AckedPacketHeader { type Output = Result<()>; fn parse(&self, buffer: &mut Vec) -> Self::Output { - self.standard_header.parse(buffer)?; buffer.write_u16::(self.seq)?; buffer.write_u16::(self.ack_seq)?; buffer.write_u32::(self.ack_field)?; @@ -69,45 +61,54 @@ impl HeaderReader for AckedPacketHeader { type Header = Result; fn read(rdr: &mut Cursor<&[u8]>) -> Self::Header { - let standard_header = StandardHeader::read(rdr)?; let seq = rdr.read_u16::()?; let ack_seq = rdr.read_u16::()?; let ack_field = rdr.read_u32::()?; Ok(AckedPacketHeader { - standard_header, seq, ack_seq, ack_field, }) } - fn size(&self) -> u8 { + fn size() -> u8 { ACKED_PACKET_HEADER } } #[cfg(test)] mod tests { - use crate::packet::header::{AckedPacketHeader, HeaderReader, HeaderWriter, StandardHeader}; + use crate::net::constants::ACKED_PACKET_HEADER; + use crate::packet::header::{AckedPacketHeader, HeaderReader, HeaderWriter}; use std::io::Cursor; #[test] - pub fn serializes_deserialize_acked_header_test() { - let packet_header = AckedPacketHeader::new(StandardHeader::default(), 1, 1, 5421); - let mut buffer = Vec::with_capacity((packet_header.size() + 1) as usize); + fn serialize() { + let mut buffer = Vec::new(); + let header = AckedPacketHeader::new(1, 2, 3); + header.parse(&mut buffer).is_ok(); + + assert_eq!(buffer[1], 1); + assert_eq!(buffer[3], 2); + assert_eq!(buffer[7], 3); + } - let _ = packet_header.parse(&mut buffer); + #[test] + fn deserialize() { + let buffer = vec![0, 1, 0, 2, 0, 0, 0, 3]; let mut cursor = Cursor::new(buffer.as_slice()); - match AckedPacketHeader::read(&mut cursor) { - Ok(packet_deserialized) => { - assert_eq!(packet_deserialized.seq, 1); - assert_eq!(packet_deserialized.ack_seq, 1); - assert_eq!(packet_deserialized.ack_field, 5421); - } - Err(e) => println!("{:?}", e), - } + let header = AckedPacketHeader::read(&mut cursor).unwrap(); + + assert_eq!(header.sequence(), 1); + assert_eq!(header.ack_seq(), 2); + assert_eq!(header.ack_field(), 3); + } + + #[test] + fn size() { + assert_eq!(AckedPacketHeader::size(), ACKED_PACKET_HEADER); } } diff --git a/src/packet/header/arranging_header.rs b/src/packet/header/arranging_header.rs new file mode 100644 index 00000000..287cd911 --- /dev/null +++ b/src/packet/header/arranging_header.rs @@ -0,0 +1,98 @@ +use super::{HeaderReader, HeaderWriter}; +use crate::error::Result; +use crate::net::constants::ARRANGING_PACKET_HEADER; +use crate::packet::SequenceNumber; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use std::io::Cursor; + +#[derive(Copy, Clone, Debug)] +/// This header represents a fragmented packet header. +pub struct ArrangingHeader { + arranging_id: SequenceNumber, + stream_id: u8, +} + +impl ArrangingHeader { + /// Create new fragment with the given packet header + pub fn new(arranging_id: SequenceNumber, stream_id: u8) -> Self { + ArrangingHeader { + arranging_id, + stream_id, + } + } + + /// Get the sequence number from this packet. + pub fn arranging_id(&self) -> SequenceNumber { + self.arranging_id + } + + /// Get the sequence number from this packet. + pub fn stream_id(&self) -> u8 { + self.stream_id + } +} + +impl HeaderWriter for ArrangingHeader { + type Output = Result<()>; + + fn parse(&self, buffer: &mut Vec) -> Self::Output { + buffer.write_u16::(self.arranging_id)?; + buffer.write_u8(self.stream_id)?; + + Ok(()) + } +} + +impl HeaderReader for ArrangingHeader { + type Header = Result; + + fn read(rdr: &mut Cursor<&[u8]>) -> Self::Header { + let arranging_id = rdr.read_u16::()?; + let stream_id = rdr.read_u8()?; + + let header = ArrangingHeader { + arranging_id, + stream_id, + }; + + Ok(header) + } + + /// Get the size of this header. + fn size() -> u8 { + ARRANGING_PACKET_HEADER + } +} + +#[cfg(test)] +mod tests { + use crate::net::constants::ARRANGING_PACKET_HEADER; + use crate::packet::header::{ArrangingHeader, HeaderReader, HeaderWriter}; + use std::io::Cursor; + + #[test] + fn serialize() { + let mut buffer = Vec::new(); + let header = ArrangingHeader::new(1, 2); + header.parse(&mut buffer).is_ok(); + + assert_eq!(buffer[1], 1); + assert_eq!(buffer[2], 2); + } + + #[test] + fn deserialize() { + let buffer = vec![0, 1, 2]; + let mut cursor = Cursor::new(buffer.as_slice()); + + let header = ArrangingHeader::read(&mut cursor).unwrap(); + + assert_eq!(header.arranging_id(), 1); + assert_eq!(header.stream_id(), 2); + } + + #[test] + fn size() { + assert_eq!(ArrangingHeader::size(), ARRANGING_PACKET_HEADER); + } +} diff --git a/src/packet/header/fragment_header.rs b/src/packet/header/fragment_header.rs index 3a7a137a..233f402c 100644 --- a/src/packet/header/fragment_header.rs +++ b/src/packet/header/fragment_header.rs @@ -1,34 +1,24 @@ -use super::{AckedPacketHeader, HeaderReader, HeaderWriter, StandardHeader}; -use crate::error::{FragmentErrorKind, Result}; +use super::{HeaderReader, HeaderWriter}; +use crate::error::Result; use crate::net::constants::FRAGMENT_HEADER_SIZE; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use log::error; use std::io::Cursor; #[derive(Copy, Clone, Debug)] /// This header represents a fragmented packet header. pub struct FragmentHeader { - standard_header: StandardHeader, sequence: u16, id: u8, num_fragments: u8, - packet_header: Option, } impl FragmentHeader { /// Create new fragment with the given packet header - pub fn new( - standard_header: StandardHeader, - id: u8, - num_fragments: u8, - packet_header: AckedPacketHeader, - ) -> Self { + pub fn new(seq: u16, id: u8, num_fragments: u8) -> Self { FragmentHeader { - standard_header, id, num_fragments, - packet_header: Some(packet_header), - sequence: packet_header.seq, + sequence: seq, } } @@ -46,32 +36,16 @@ impl FragmentHeader { pub fn fragment_count(&self) -> u8 { self.num_fragments } - - /// Get the packet header if attached to fragment. - pub fn packet_header(&self) -> Option { - self.packet_header - } } impl HeaderWriter for FragmentHeader { type Output = Result<()>; fn parse(&self, buffer: &mut Vec) -> Self::Output { - self.standard_header.parse(buffer)?; buffer.write_u16::(self.sequence)?; buffer.write_u8(self.id)?; buffer.write_u8(self.num_fragments)?; - // append acked header only first time - if self.id == 0 { - match self.packet_header { - Some(header) => { - header.parse(buffer)?; - } - None => return Err(FragmentErrorKind::PacketHeaderNotFound.into()), - } - } - Ok(()) } } @@ -80,77 +54,57 @@ impl HeaderReader for FragmentHeader { type Header = Result; fn read(rdr: &mut Cursor<&[u8]>) -> Self::Header { - let standard_header = StandardHeader::read(rdr)?; let sequence = rdr.read_u16::()?; let id = rdr.read_u8()?; let num_fragments = rdr.read_u8()?; - let mut header = FragmentHeader { - standard_header, + let header = FragmentHeader { sequence, id, num_fragments, - packet_header: None, }; - // append acked header is only appended to first packet. - if id == 0 { - header.packet_header = Some(AckedPacketHeader::read(rdr)?); - } - Ok(header) } /// Get the size of this header. - fn size(&self) -> u8 { - if self.id == 0 { - match self.packet_header { - Some(header) => header.size() + FRAGMENT_HEADER_SIZE, - None => { - error!("Attempting to retrieve size on a 0 ID packet with no packet header"); - 0 - } - } - } else { - FRAGMENT_HEADER_SIZE - } + fn size() -> u8 { + FRAGMENT_HEADER_SIZE } } #[cfg(test)] mod tests { - use crate::infrastructure::DeliveryMethod; - use crate::packet::header::{ - AckedPacketHeader, FragmentHeader, HeaderReader, HeaderWriter, StandardHeader, - }; - use crate::packet::PacketTypeId; + use crate::net::constants::FRAGMENT_HEADER_SIZE; + use crate::packet::header::{FragmentHeader, HeaderReader, HeaderWriter}; use std::io::Cursor; #[test] - pub fn serializes_deserialize_fragment_header_test() { - // create default header - let standard_header = - StandardHeader::new(DeliveryMethod::UnreliableUnordered, PacketTypeId::Fragment); - - let packet_header = AckedPacketHeader::new(standard_header.clone(), 1, 1, 5421); + fn serialize() { + let mut buffer = Vec::new(); + let header = FragmentHeader::new(1, 2, 3); + header.parse(&mut buffer).is_ok(); + + assert_eq!(buffer[1], 1); + assert_eq!(buffer[2], 2); + assert_eq!(buffer[3], 3); + } - // create fragment header with the default header and acked header. - let fragment = FragmentHeader::new(standard_header.clone(), 0, 1, packet_header.clone()); - let mut fragment_buffer = Vec::with_capacity((fragment.size() + 1) as usize); - fragment.parse(&mut fragment_buffer).unwrap(); + #[test] + fn deserialize() { + let buffer = vec![0, 1, 2, 3]; - let mut cursor: Cursor<&[u8]> = Cursor::new(fragment_buffer.as_slice()); - let fragment_deserialized = FragmentHeader::read(&mut cursor).unwrap(); + let mut cursor = Cursor::new(buffer.as_slice()); - assert_eq!(fragment_deserialized.id, 0); - assert_eq!(fragment_deserialized.num_fragments, 1); - assert_eq!(fragment_deserialized.sequence, 1); + let header = FragmentHeader::read(&mut cursor).unwrap(); - assert!(fragment_deserialized.packet_header.is_some()); + assert_eq!(header.sequence(), 1); + assert_eq!(header.id(), 2); + assert_eq!(header.fragment_count(), 3); + } - let fragment_packet_header = fragment_deserialized.packet_header.unwrap(); - assert_eq!(fragment_packet_header.seq, 1); - assert_eq!(fragment_packet_header.ack_seq(), 1); - assert_eq!(fragment_packet_header.ack_field(), 5421); + #[test] + fn size() { + assert_eq!(FragmentHeader::size(), FRAGMENT_HEADER_SIZE); } } diff --git a/src/packet/header/header_reader.rs b/src/packet/header/header_reader.rs index 3f7f7be3..f9ff60a1 100644 --- a/src/packet/header/header_reader.rs +++ b/src/packet/header/header_reader.rs @@ -8,5 +8,5 @@ pub trait HeaderReader { fn read(rdr: &mut Cursor<&[u8]>) -> Self::Header; /// This will get the size of the header. - fn size(&self) -> u8; + fn size() -> u8; } diff --git a/src/packet/header/heart_beat_header.rs b/src/packet/header/heart_beat_header.rs deleted file mode 100644 index 4518b106..00000000 --- a/src/packet/header/heart_beat_header.rs +++ /dev/null @@ -1,59 +0,0 @@ -use super::{HeaderReader, HeaderWriter}; -use crate::error::Result; -use crate::net::constants::HEART_BEAT_HEADER_SIZE; -use crate::packet::PacketTypeId; -use crate::protocol_version::ProtocolVersion; -use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use std::io::Cursor; - -#[derive(Copy, Clone, Debug)] -/// This header represents an heartbeat packet header. -/// An heart beat just keeps the client awake. -pub struct HeartBeatHeader { - packet_type_id: PacketTypeId, -} - -impl HeartBeatHeader { - /// Create new heartbeat header. - pub fn new() -> Self { - HeartBeatHeader { - packet_type_id: PacketTypeId::HeartBeat, - } - } -} - -impl Default for HeartBeatHeader { - fn default() -> Self { - HeartBeatHeader::new() - } -} - -impl HeaderWriter for HeartBeatHeader { - type Output = Result<()>; - - fn parse(&self, buffer: &mut Vec) -> Self::Output { - buffer.write_u16::(ProtocolVersion::get_crc16())?; - buffer.write_u8(PacketTypeId::get_id(self.packet_type_id))?; - - Ok(()) - } -} - -impl HeaderReader for HeartBeatHeader { - type Header = Result; - - fn read(rdr: &mut Cursor<&[u8]>) -> Self::Header { - let _ = rdr.read_u32::()?; - let _ = rdr.read_u8(); - let header = HeartBeatHeader { - packet_type_id: PacketTypeId::HeartBeat, - }; - - Ok(header) - } - - /// Get the size of this header. - fn size(&self) -> u8 { - HEART_BEAT_HEADER_SIZE - } -} diff --git a/src/packet/header/standard_header.rs b/src/packet/header/standard_header.rs index b373edb4..8d32baf3 100644 --- a/src/packet/header/standard_header.rs +++ b/src/packet/header/standard_header.rs @@ -1,8 +1,7 @@ use super::{HeaderReader, HeaderWriter}; use crate::error::Result; -use crate::infrastructure::DeliveryMethod; use crate::net::constants::STANDARD_HEADER_SIZE; -use crate::packet::PacketTypeId; +use crate::packet::{DeliveryGuarantee, EnumConverter, OrderingGuarantee, PacketType}; use crate::protocol_version::ProtocolVersion; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::io::Cursor; @@ -10,28 +9,67 @@ use std::io::Cursor; #[derive(Copy, Clone, Debug)] /// This header will be included in each packet, and contains some basic information. pub struct StandardHeader { - /// crc16 of the protocol version. - pub protocol_version: u16, - /// specifies the packet type. - pub packet_type_id: PacketTypeId, - /// specifies how this packet should be processed. - pub delivery_method: DeliveryMethod, + protocol_version: u16, + packet_type: PacketType, + delivery_guarantee: DeliveryGuarantee, + ordering_guarantee: OrderingGuarantee, } impl StandardHeader { /// Create new heartbeat header. - pub fn new(delivery_method: DeliveryMethod, packet_type_id: PacketTypeId) -> Self { + pub fn new( + delivery_guarantee: DeliveryGuarantee, + ordering_guarantee: OrderingGuarantee, + packet_type: PacketType, + ) -> Self { StandardHeader { protocol_version: ProtocolVersion::get_crc16(), - packet_type_id, - delivery_method, + delivery_guarantee, + ordering_guarantee, + packet_type, } } + + /// Returns the protocol version + #[cfg(test)] + pub fn protocol_version(&self) -> u16 { + self.protocol_version + } + + /// Returns the DeliveryGuarantee + pub fn delivery_guarantee(&self) -> DeliveryGuarantee { + self.delivery_guarantee + } + + /// Returns the OrderingGuarantee + pub fn ordering_guarantee(&self) -> OrderingGuarantee { + self.ordering_guarantee + } + + /// Returns the PacketType + #[cfg(test)] + pub fn packet_type(&self) -> PacketType { + self.packet_type + } + + /// Returns true if the packet is a fragment, false if not + pub fn is_fragment(&self) -> bool { + self.packet_type == PacketType::Fragment + } + + /// Checks if the protocol version in the packet is a valid version + pub fn is_current_protocol(&self) -> bool { + ProtocolVersion::valid_version(self.protocol_version) + } } impl Default for StandardHeader { fn default() -> Self { - StandardHeader::new(DeliveryMethod::UnreliableUnordered, PacketTypeId::Packet) + StandardHeader::new( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::None, + PacketType::Packet, + ) } } @@ -40,9 +78,9 @@ impl HeaderWriter for StandardHeader { fn parse(&self, buffer: &mut Vec) -> Self::Output { buffer.write_u16::(self.protocol_version)?; - buffer.write_u8(PacketTypeId::get_id(self.packet_type_id))?; - buffer.write_u8(DeliveryMethod::get_delivery_method_id(self.delivery_method))?; - + buffer.write_u8(self.packet_type.to_u8())?; + buffer.write_u8(self.delivery_guarantee.to_u8())?; + buffer.write_u8(self.ordering_guarantee.to_u8())?; Ok(()) } } @@ -53,47 +91,67 @@ impl HeaderReader for StandardHeader { fn read(rdr: &mut Cursor<&[u8]>) -> Self::Header { let protocol_version = rdr.read_u16::()?; /* protocol id */ let packet_id = rdr.read_u8()?; - let delivery_method_id = rdr.read_u8()?; + let delivery_guarantee_id = rdr.read_u8()?; + let order_guarantee_id = rdr.read_u8()?; let header = StandardHeader { protocol_version, - packet_type_id: PacketTypeId::get_packet_type(packet_id), - delivery_method: DeliveryMethod::get_delivery_method_from_id(delivery_method_id), + packet_type: PacketType::from_u8(packet_id), + delivery_guarantee: DeliveryGuarantee::from_u8(delivery_guarantee_id), + ordering_guarantee: OrderingGuarantee::from_u8(order_guarantee_id), }; Ok(header) } /// Get the size of this header. - fn size(&self) -> u8 { + fn size() -> u8 { STANDARD_HEADER_SIZE } } #[cfg(test)] mod tests { - use crate::infrastructure::DeliveryMethod; + use crate::net::constants::STANDARD_HEADER_SIZE; use crate::packet::header::{HeaderReader, HeaderWriter, StandardHeader}; - use crate::packet::PacketTypeId; - use crate::protocol_version::ProtocolVersion; + use crate::packet::{DeliveryGuarantee, EnumConverter, OrderingGuarantee, PacketType}; use std::io::Cursor; #[test] - pub fn serializes_deserialize_packet_header_test() { - let packet_header = StandardHeader::default(); - let mut buffer = Vec::with_capacity((packet_header.size() + 1) as usize); + fn serialize() { + let mut buffer = Vec::new(); + let header = StandardHeader::new( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(None), + PacketType::Packet, + ); + header.parse(&mut buffer).is_ok(); - let _ = packet_header.parse(&mut buffer); + // [0 .. 3] protocol version + assert_eq!(buffer[2], PacketType::Packet.to_u8()); + assert_eq!(buffer[3], DeliveryGuarantee::Unreliable.to_u8()); + assert_eq!(buffer[4], OrderingGuarantee::Sequenced(None).to_u8()); + } + + #[test] + fn deserialize() { + let buffer = vec![0, 1, 0, 1, 1]; let mut cursor = Cursor::new(buffer.as_slice()); - let packet_header = StandardHeader::read(&mut cursor).unwrap(); - assert!(ProtocolVersion::valid_version( - packet_header.protocol_version - )); - assert_eq!(packet_header.packet_type_id, PacketTypeId::Packet); + + let header = StandardHeader::read(&mut cursor).unwrap(); + + assert_eq!(header.protocol_version(), 1); + assert_eq!(header.packet_type(), PacketType::Packet); + assert_eq!(header.delivery_guarantee(), DeliveryGuarantee::Reliable); assert_eq!( - packet_header.delivery_method, - DeliveryMethod::UnreliableUnordered + header.ordering_guarantee(), + OrderingGuarantee::Sequenced(None) ); } + + #[test] + fn size() { + assert_eq!(StandardHeader::size(), STANDARD_HEADER_SIZE); + } } diff --git a/src/packet/outgoing.rs b/src/packet/outgoing.rs new file mode 100644 index 00000000..80ee5483 --- /dev/null +++ b/src/packet/outgoing.rs @@ -0,0 +1,202 @@ +use crate::{ + net::constants::{DEFAULT_ORDERING_STREAM, DEFAULT_SEQUENCING_STREAM}, + packet::{ + header::{ + AckedPacketHeader, ArrangingHeader, FragmentHeader, HeaderWriter, StandardHeader, + }, + DeliveryGuarantee, OrderingGuarantee, PacketType, + }, +}; + +/// Builder that could be used to construct an outgoing laminar packet. +pub struct OutgoingPacketBuilder<'p> { + header: Vec, + payload: &'p [u8], +} + +impl<'p> OutgoingPacketBuilder<'p> { + /// Construct a new builder from the given `payload`. + pub fn new(payload: &'p [u8]) -> OutgoingPacketBuilder<'p> { + OutgoingPacketBuilder { + header: Vec::new(), + payload, + } + } + + /// This will add the `FragmentHeader` to the header. + pub fn with_fragment_header(mut self, packet_seq: u16, id: u8, num_fragments: u8) -> Self { + let header = FragmentHeader::new(packet_seq, id, num_fragments); + + header + .parse(&mut self.header) + .expect("Could not write fragment header to buffer"); + + self + } + + /// This will add the [`StandardHeader`](./headers/standard_header) to the header. + pub fn with_default_header( + mut self, + packet_type: PacketType, + delivery_guarantee: DeliveryGuarantee, + ordering_guarantee: OrderingGuarantee, + ) -> Self { + let header = StandardHeader::new(delivery_guarantee, ordering_guarantee, packet_type); + header + .parse(&mut self.header) + .expect("Could not write default header to buffer"); + + self + } + + /// This will add the [`AckedPacketHeader`](./headers/acked_packet_header) to the header. + pub fn with_acknowledgement_header( + mut self, + seq_num: u16, + last_seq: u16, + bit_field: u32, + ) -> Self { + let header = AckedPacketHeader::new(seq_num, last_seq, bit_field); + header + .parse(&mut self.header) + .expect("Could not write acknowledgement header to buffer"); + + self + } + + /// This will add the [`ArrangingHeader`](./headers/arranging_header) if needed. + /// + /// - `arranging_id` = identifier for this packet that needs to be sequenced. + /// - `stream_id` = stream on which this packet will be sequenced. If `None` than the a default stream will be used. + pub fn with_sequencing_header(mut self, arranging_id: u16, stream_id: Option) -> Self { + let header = + ArrangingHeader::new(arranging_id, stream_id.unwrap_or(DEFAULT_SEQUENCING_STREAM)); + + header + .parse(&mut self.header) + .expect("Could not write arranging header to buffer"); + + self + } + + /// This will add the [`ArrangingHeader`](./headers/arranging_header) if needed. + /// + /// - `arranging_id` = identifier for this packet that needs to be ordered. + /// - `stream_id` = stream on which this packet will be ordered. If `None` than the a default stream will be used. + pub fn with_ordering_header(mut self, arranging_id: u16, stream_id: Option) -> Self { + let header = + ArrangingHeader::new(arranging_id, stream_id.unwrap_or(DEFAULT_ORDERING_STREAM)); + + header + .parse(&mut self.header) + .expect("Could not write arranging header to buffer"); + + self + } + + /// This will construct a `OutgoingPacket` from the contents constructed with this builder. + pub fn build(self) -> OutgoingPacket<'p> { + OutgoingPacket { + header: self.header, + payload: self.payload, + } + } +} + +/// Packet that that contains data which is ready to be sent to a remote endpoint. +pub struct OutgoingPacket<'p> { + header: Vec, + payload: &'p [u8], +} + +impl<'p> OutgoingPacket<'p> { + /// This will return the contents of this packet; the content includes the header and payload bytes. + /// + /// # Remark + /// - Until here we could use a reference to the outgoing data but here we need to do a hard copy. + /// Because the header could vary in size but should be in front of the payload provided by the user. + pub fn contents(&self) -> Box<[u8]> { + [self.header.as_slice(), &self.payload] + .concat() + .into_boxed_slice() + } +} + +/// Enum for storing different kinds of outgoing types with data. +pub enum Outgoing<'a> { + /// Represents a single packet. + Packet(OutgoingPacket<'a>), + /// Represents a packet that is fragmented and thus contains more than one `OutgoingPacket`. + Fragments(Vec>), +} + +#[cfg(test)] +mod tests { + use crate::packet::PacketType; + use crate::packet::{DeliveryGuarantee, OrderingGuarantee, OutgoingPacketBuilder}; + + fn test_payload() -> Vec { + return "test".as_bytes().to_vec(); + } + + #[test] + fn assure_fragment_header() { + let payload = test_payload(); + + let outgoing = OutgoingPacketBuilder::new(&payload) + .with_fragment_header(0, 0, 0) + .build(); + + let expected: Vec = [vec![0, 0, 0, 0], test_payload()].concat().to_vec(); + + assert_eq!(outgoing.contents().to_vec(), expected); + } + + #[test] + fn assure_arranging_header() { + let payload = test_payload(); + + let outgoing = OutgoingPacketBuilder::new(&payload) + .with_sequencing_header(1, Some(2)) + .build(); + + let expected: Vec = [vec![0, 1, 2], test_payload()].concat().to_vec(); + + assert_eq!(outgoing.contents().to_vec(), expected); + } + + #[test] + fn assure_acknowledgement_header() { + let payload = test_payload(); + + let outgoing = OutgoingPacketBuilder::new(&payload) + .with_acknowledgement_header(1, 2, 3) + .build(); + + let expected: Vec = [vec![0, 1, 0, 2, 0, 0, 0, 3], test_payload()] + .concat() + .to_vec(); + + assert_eq!(outgoing.contents().to_vec(), expected); + } + + #[test] + fn assure_default_header() { + let payload = test_payload(); + + let outgoing = OutgoingPacketBuilder::new(&payload) + .with_default_header( + PacketType::Packet, + DeliveryGuarantee::Reliable, + OrderingGuarantee::Sequenced(None), + ) + .build(); + + let expected: Vec = [vec![0, 1, 1], test_payload()].concat().to_vec(); + + assert_eq!( + outgoing.contents()[2..outgoing.contents().len()].to_vec(), + expected + ); + } +} diff --git a/src/packet/packet_data.rs b/src/packet/packet_data.rs deleted file mode 100644 index 3c467c9a..00000000 --- a/src/packet/packet_data.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::error::Result; -use std::io::Write; - -/// Contains the raw data this packet exists of. Note that a packet can be divided into separate fragments -#[derive(Debug, Default)] -pub struct PacketData { - parts: Vec>, -} - -impl PacketData { - /// Creates a new PacketData with a specified capacity - pub fn with_capacity(size: usize) -> PacketData { - PacketData { - parts: Vec::with_capacity(size), - } - } - - /// Add fragment to this packet - pub fn add_fragment(&mut self, fragment: &[u8], fragment_data: &[u8]) -> Result<()> { - let mut part = Vec::with_capacity(fragment.len() + fragment_data.len()); - part.write_all(fragment)?; - part.write_all(fragment_data)?; - self.parts.push(part); - Ok(()) - } - - /// Return the total fragments this packet is divided into. - #[cfg(test)] - pub fn fragment_count(&self) -> usize { - self.parts.len() - } - - /// Return the parts this packet exists of. - pub fn parts(&mut self) -> &Vec> { - &self.parts - } -} - -#[cfg(test)] -mod tests { - use super::PacketData; - use crate::packet::header::{AckedPacketHeader, HeaderReader, HeaderWriter, StandardHeader}; - - #[test] - fn add_ang_get_parts() { - let acked_header = AckedPacketHeader::new(StandardHeader::default(), 1, 1, 5421); - let mut buffer = Vec::new(); - let _ = acked_header.parse(&mut buffer); - - let mut packet_data = PacketData::with_capacity(acked_header.size() as usize); - let _ = packet_data.add_fragment(&buffer, &vec![1, 2, 3, 4, 5]); - let _ = packet_data.add_fragment(&buffer, &vec![1, 2, 3, 4, 5]); - let _ = packet_data.add_fragment(&buffer, &vec![1, 2, 3, 4, 5]); - - assert_eq!(packet_data.fragment_count(), 3); - - let _ = packet_data.parts().into_iter().map(|x| { - let _header = &x[0..acked_header.size() as usize]; - let body = &x[acked_header.size() as usize..buffer.len()]; - assert_eq!(body.to_vec(), vec![1, 2, 3, 4, 5]); - }); - } -} diff --git a/src/packet/packet_reader.rs b/src/packet/packet_reader.rs new file mode 100644 index 00000000..6dfee81c --- /dev/null +++ b/src/packet/packet_reader.rs @@ -0,0 +1,249 @@ +use crate::net::constants::STANDARD_HEADER_SIZE; +use crate::packet::header::{ + AckedPacketHeader, ArrangingHeader, FragmentHeader, HeaderReader, StandardHeader, +}; +use crate::{ErrorKind, Result}; + +use std::io::Cursor; + +/// Can be used to read the packet contents of laminar. +/// +/// # Remarks +/// - `PacketReader` is using an underlying `Cursor` to manage the reading of the bytes. +/// - `PacketReader` can interpret where some data is located in the buffer, that's why you don't have to worry about the position of the `Cursor`. +pub struct PacketReader<'s> { + buffer: &'s [u8], + cursor: Cursor<&'s [u8]>, +} + +impl<'s> PacketReader<'s> { + /// Construct a new instance of `PacketReader`, the given `buffer` will be used to read information from. + pub fn new(buffer: &'s [u8]) -> PacketReader<'s> { + PacketReader { + buffer, + cursor: Cursor::new(buffer), + } + } + + /// Read the `StandardHeader` from the underlying buffer. + /// + /// # Remark + /// - Will change the position to the location of `StandardHeader` + pub fn read_standard_header(&mut self) -> Result { + self.cursor.set_position(0); + + if self.can_read(StandardHeader::size()) { + StandardHeader::read(&mut self.cursor) + } else { + Err(ErrorKind::CouldNotReadHeader(String::from("standard"))) + } + } + + /// Read the `StandardHeader` from the underlying buffer. + /// + /// # Remark + /// - Will change the position to the location of `StandardHeader` + pub fn read_arranging_header(&mut self, start_offset: u16) -> Result { + self.cursor.set_position(u64::from(start_offset)); + + if self.can_read(ArrangingHeader::size()) { + ArrangingHeader::read(&mut self.cursor) + } else { + Err(ErrorKind::CouldNotReadHeader(String::from("arranging"))) + } + } + + /// Read the `AckedPacketHeader` from the underlying buffer. + /// + /// # Remark + /// - Will change the position to the location of `AckedPacketHeader` + pub fn read_acknowledge_header(&mut self) -> Result { + // acknowledge header comes after standard header. + self.cursor.set_position(u64::from(STANDARD_HEADER_SIZE)); + + if self.can_read(AckedPacketHeader::size()) { + AckedPacketHeader::read(&mut self.cursor) + } else { + Err(ErrorKind::CouldNotReadHeader(String::from( + "acknowledgement", + ))) + } + } + + /// Read the `FragmentHeader` and optionally the `AckedPacketHeader` from the underlying buffer. + /// + /// # Remark + /// - Notice that this will continue on the position of last read header; + /// e.g. when reading `StandardHeader` the position of the underlying `Cursor` will be at the end where it left of, + /// when calling this function afterward it will read the `FragmentHeader` from there on. + /// - Note that only the first fragment of a sequence contains acknowledgement information that's why `AckedPacketHeader` is optional. + pub fn read_fragment(&mut self) -> Result<(FragmentHeader, Option)> { + if self.can_read(FragmentHeader::size()) { + let fragment_header = FragmentHeader::read(&mut self.cursor)?; + + let acked_header = if fragment_header.id() == 0 { + Some(AckedPacketHeader::read(&mut self.cursor)?) + } else { + None + }; + + Ok((fragment_header, acked_header)) + } else { + Err(ErrorKind::CouldNotReadHeader(String::from("fragment"))) + } + } + + /// Read the payload` from the underlying buffer. + /// + /// # Remark + /// - Notice that this will continue on the position of last read header; + /// e.g. when reading `StandardHeader` the position of the underlying `Cursor` will be at the end where it left of, + /// when calling this function afterward it will read all the bytes from there on. + pub fn read_payload(&self) -> Box<[u8]> { + self.buffer[self.cursor.position() as usize..self.buffer.len()] + .to_vec() + .into_boxed_slice() + } + + // checks if a given length of bytes could be read with the buffer. + fn can_read(&self, length: u8) -> bool { + (self.buffer.len() - self.cursor.position() as usize) >= length as usize + } +} + +#[cfg(test)] +mod tests { + use crate::packet::header::{AckedPacketHeader, HeaderReader, StandardHeader}; + use crate::packet::{DeliveryGuarantee, OrderingGuarantee, PacketReader, PacketType}; + + #[test] + fn can_read_test() { + let buffer = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + + let reader = PacketReader::new(buffer.as_slice()); + assert_eq!(reader.can_read(buffer.len() as u8), true); + assert_eq!(reader.can_read((buffer.len() + 1) as u8), false); + } + + #[test] + fn read_standard_header() { + // standard header + let reliable_ordered_payload: Vec = vec![vec![0, 1, 0, 1, 2]].concat(); + + let mut reader = PacketReader::new(reliable_ordered_payload.as_slice()); + + let standard_header = reader.read_standard_header().unwrap(); + + assert_eq!(standard_header.protocol_version(), 1); + assert_eq!(standard_header.packet_type(), PacketType::Packet); + assert_eq!( + standard_header.delivery_guarantee(), + DeliveryGuarantee::Reliable + ); + assert_eq!( + standard_header.ordering_guarantee(), + OrderingGuarantee::Ordered(None) + ); + } + + #[test] + fn read_acknowledgement_header() { + // standard header, acked header + let reliable_ordered_payload: Vec = + vec![vec![0, 1, 0, 1, 2], vec![0, 1, 0, 2, 0, 0, 0, 3]].concat(); + + let mut reader = PacketReader::new(reliable_ordered_payload.as_slice()); + + let acked_header = reader.read_acknowledge_header().unwrap(); + + assert_eq!(acked_header.sequence(), 1); + assert_eq!(acked_header.ack_seq(), 2); + assert_eq!(acked_header.ack_field(), 3); + } + + #[test] + fn read_fragment_header() { + // standard header, acked header, arranging header + let reliable_ordered_payload: Vec = vec![ + vec![0, 1, 0, 1, 2], + vec![0, 1, 0, 3], + vec![0, 1, 0, 2, 0, 0, 0, 3], + ] + .concat(); + + let mut reader = PacketReader::new(reliable_ordered_payload.as_slice()); + + let standard_header = reader.read_standard_header().unwrap(); + let (fragment_header, acked_header) = reader.read_fragment().unwrap(); + + assert_eq!(standard_header.protocol_version(), 1); + assert_eq!(standard_header.packet_type(), PacketType::Packet); + assert_eq!( + standard_header.delivery_guarantee(), + DeliveryGuarantee::Reliable + ); + assert_eq!( + standard_header.ordering_guarantee(), + OrderingGuarantee::Ordered(None) + ); + + assert_eq!(acked_header.unwrap().sequence(), 1); + assert_eq!(acked_header.unwrap().ack_seq(), 2); + assert_eq!(acked_header.unwrap().ack_field(), 3); + + assert_eq!(fragment_header.sequence(), 1); + assert_eq!(fragment_header.id(), 0); + assert_eq!(fragment_header.fragment_count(), 3); + } + + #[test] + fn read_arranging_header_after_standard_header() { + // standard header, arranging header + let reliable_ordered_payload: Vec = vec![vec![0, 1, 0, 1, 2], vec![0, 1, 2]].concat(); + + let mut reader = PacketReader::new(reliable_ordered_payload.as_slice()); + + let arranging_header = reader + .read_arranging_header(StandardHeader::size() as u16) + .unwrap(); + + assert_eq!(arranging_header.arranging_id(), 1); + assert_eq!(arranging_header.stream_id(), 2); + } + + #[test] + fn read_reliable_ordered() { + // standard header, acked header, arranging header + let reliable_ordered_payload: Vec = vec![ + vec![0, 1, 0, 1, 2], + vec![0, 1, 0, 2, 0, 0, 0, 3], + vec![0, 1, 2], + ] + .concat(); + let mut reader = PacketReader::new(reliable_ordered_payload.as_slice()); + + let standard_header = reader.read_standard_header().unwrap(); + let acked_header = reader.read_acknowledge_header().unwrap(); + let arranging_header = reader + .read_arranging_header((StandardHeader::size() + AckedPacketHeader::size()) as u16) + .unwrap(); + + assert_eq!(standard_header.protocol_version(), 1); + assert_eq!(standard_header.packet_type(), PacketType::Packet); + assert_eq!( + standard_header.delivery_guarantee(), + DeliveryGuarantee::Reliable + ); + assert_eq!( + standard_header.ordering_guarantee(), + OrderingGuarantee::Ordered(None) + ); + + assert_eq!(acked_header.sequence(), 1); + assert_eq!(acked_header.ack_seq(), 2); + assert_eq!(acked_header.ack_field(), 3); + + assert_eq!(arranging_header.arranging_id(), 1); + assert_eq!(arranging_header.stream_id(), 2); + } +} diff --git a/src/packet/packet_structure.rs b/src/packet/packet_structure.rs index 448a9fa0..f016fb8c 100644 --- a/src/packet/packet_structure.rs +++ b/src/packet/packet_structure.rs @@ -1,28 +1,48 @@ -use crate::infrastructure::DeliveryMethod; +use crate::packet::{DeliveryGuarantee, OrderingGuarantee}; use std::net::SocketAddr; #[derive(Clone, PartialEq, Eq, Debug)] -/// This is a user friendly packet containing the payload from the packet and the endpoint from where it came. +/// This is a user friendly packet containing the payload, endpoint, and reliability guarantees. +/// A packet could have reliability guarantees to specify how it should be delivered and processed. +/// +/// | Reliability Type | Packet Drop | Packet Duplication | Packet Order | Packet Fragmentation |Packet Delivery| +/// | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: +/// | **Unreliable Unordered** | Yes | Yes | No | No | No +/// | **Reliable Unordered** | No | No | No | Yes | Yes +/// | **Reliable Ordered** | No | No | Ordered | Yes | Yes +/// | **Sequenced** | Yes | No | Sequenced | No | No +/// +/// You are able to send packets with any the above guarantees. pub struct Packet { /// the endpoint from where it came addr: SocketAddr, /// the raw payload of the packet payload: Box<[u8]>, /// defines on how the packet will be delivered. - delivery_method: DeliveryMethod, + delivery: DeliveryGuarantee, + /// defines on how the packet will be ordered. + ordering: OrderingGuarantee, } impl Packet { - /// Create an new packet by passing the receiver, data and how this packet should be delivered. - pub fn new(addr: SocketAddr, payload: Box<[u8]>, delivery_method: DeliveryMethod) -> Self { + /// Create a new packet by passing the receiver, data, and guarantees on how this packet should be delivered. + pub(crate) fn new( + addr: SocketAddr, + payload: Box<[u8]>, + delivery: DeliveryGuarantee, + ordering: OrderingGuarantee, + ) -> Packet { Packet { addr, payload, - delivery_method, + delivery, + ordering, } } - /// Unreliable. Packets can be dropped, duplicated or arrive without order. + /// Create a new unreliable packet by passing the receiver, data. + /// + /// Unreliable: Packets can be dropped, duplicated or arrive without order. /// /// **Details** /// @@ -30,16 +50,42 @@ impl Packet { /// | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: | /// | Yes | Yes | No | No | No | /// - /// Basically just bare UDP, free to be dropped, used for very unnecessary data, great for 'general' position updates. + /// Basically just bare UDP. The packet may or may not be delivered. pub fn unreliable(addr: SocketAddr, payload: Vec) -> Packet { - Packet::new( + Packet { addr, - payload.into_boxed_slice(), - DeliveryMethod::UnreliableUnordered, - ) + payload: payload.into_boxed_slice(), + delivery: DeliveryGuarantee::Unreliable, + ordering: OrderingGuarantee::None, + } } - /// Reliable. All packets will be sent and received, but without order. + /// Create a new unreliable sequenced packet by passing the receiver, data. + /// + /// Unreliable Sequenced; Packets can be dropped, but could not be duplicated and arrive in sequence. + /// + /// *Details* + /// + /// | Packet Drop | Packet Duplication | Packet Order | Packet Fragmentation | Packet Delivery | + /// | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: | + /// | Yes | Yes | Sequenced | No | No | + /// + /// Basically just bare UDP, free to be dropped, but has some sequencing to it so that only the newest packets are kept. + pub fn unreliable_sequenced( + addr: SocketAddr, + payload: Vec, + stream_id: Option, + ) -> Packet { + Packet { + addr, + payload: payload.into_boxed_slice(), + delivery: DeliveryGuarantee::Unreliable, + ordering: OrderingGuarantee::Sequenced(stream_id), + } + } + + /// Create a new packet by passing the receiver, data. + /// Reliable; All packets will be sent and received, but without order. /// /// *Details* /// @@ -47,28 +93,157 @@ impl Packet { /// | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: | /// | No | No | No | Yes | Yes | /// - /// Basically this is almost TCP like without ordering of packets. - /// Receive every packet and immediately give to application, order does not matter. + /// Basically this is almost TCP without ordering of packets. pub fn reliable_unordered(addr: SocketAddr, payload: Vec) -> Packet { - Packet::new( + Packet { + addr, + payload: payload.into_boxed_slice(), + delivery: DeliveryGuarantee::Reliable, + ordering: OrderingGuarantee::None, + } + } + + /// Create a new packet by passing the receiver, data and a optional stream on which the ordering will be done. + /// + /// Reliable; All packets will be sent and received, with order. + /// + /// *Details* + /// + /// | Packet Drop | Packet Duplication | Packet Order | Packet Fragmentation | Packet Delivery | + /// | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: | + /// | No | No | Ordered | Yes | Yes | + /// + /// Basically this is almost TCP-like with ordering of packets. + /// + /// # Remark + /// - When `stream_id` is specified as `None` the default stream will be used; if you are not sure what this is you can leave it at `None`. + pub fn reliable_ordered(addr: SocketAddr, payload: Vec, stream_id: Option) -> Packet { + Packet { addr, - payload.into_boxed_slice(), - DeliveryMethod::ReliableUnordered, - ) + payload: payload.into_boxed_slice(), + delivery: DeliveryGuarantee::Reliable, + ordering: OrderingGuarantee::Ordered(stream_id), + } } - /// Get the payload (raw data) of this packet. + /// Create a new packet by passing the receiver, data and a optional stream on which the sequencing will be done. + /// + /// Reliable; All packets will be sent and received, but arranged in sequence. + /// Which means that only the newest packets will be let through, older packets will be received but they won't get to the user. + /// + /// *Details* + /// + /// | Packet Drop | Packet Duplication | Packet Order | Packet Fragmentation | Packet Delivery | + /// | :-------------: | :-------------: | :-------------: | :-------------: | :-------------: | + /// | Yes | No | Sequenced | Yes | Yes | + /// + /// Basically this is almost TCP-like but then sequencing instead of ordering. + /// + /// # Remark + /// - When `stream_id` is specified as `None` the default stream will be used; if you are not sure what this is you can leave it at `None`. + pub fn reliable_sequenced(addr: SocketAddr, payload: Vec, stream_id: Option) -> Packet { + Packet { + addr, + payload: payload.into_boxed_slice(), + delivery: DeliveryGuarantee::Reliable, + ordering: OrderingGuarantee::Sequenced(stream_id), + } + } + + /// Returns the payload of this packet. pub fn payload(&self) -> &[u8] { &self.payload } - /// Get the endpoint from this packet. + /// Returns the address of this packet. + /// + /// # Remark + /// Could be both the receiving endpoint or the one to send this packet to. + /// This depends whether it is a packet that has been received or one that needs to be send. pub fn addr(&self) -> SocketAddr { self.addr } - /// Get the type representing on how this packet will be delivered. - pub fn delivery_method(&self) -> DeliveryMethod { - self.delivery_method + /// Returns the [`DeliveryGuarantee`](./enum.DeliveryGuarantee.html) of this packet. + pub fn delivery_guarantee(&self) -> DeliveryGuarantee { + self.delivery + } + + /// Returns the [`OrderingGuarantee`](./enum.OrderingGuarantee.html) of this packet. + pub fn order_guarantee(&self) -> OrderingGuarantee { + self.ordering + } +} + +#[cfg(test)] +mod tests { + use crate::packet::{DeliveryGuarantee, OrderingGuarantee, Packet}; + use std::net::SocketAddr; + + #[test] + fn create_unreliable() { + let packet = Packet::unreliable(test_addr(), test_payload()); + + assert_eq!(packet.addr(), test_addr()); + assert_eq!(packet.payload(), test_payload().as_slice()); + assert_eq!(packet.delivery_guarantee(), DeliveryGuarantee::Unreliable); + assert_eq!(packet.order_guarantee(), OrderingGuarantee::None); + } + + #[test] + fn create_unreliable_sequenced() { + let packet = Packet::unreliable_sequenced(test_addr(), test_payload(), Some(1)); + + assert_eq!(packet.addr(), test_addr()); + assert_eq!(packet.payload(), test_payload().as_slice()); + assert_eq!(packet.delivery_guarantee(), DeliveryGuarantee::Unreliable); + assert_eq!( + packet.order_guarantee(), + OrderingGuarantee::Sequenced(Some(1)) + ); + } + + #[test] + fn create_reliable() { + let packet = Packet::reliable_unordered(test_addr(), test_payload()); + + assert_eq!(packet.addr(), test_addr()); + assert_eq!(packet.payload(), test_payload().as_slice()); + assert_eq!(packet.delivery_guarantee(), DeliveryGuarantee::Reliable); + assert_eq!(packet.order_guarantee(), OrderingGuarantee::None); + } + + #[test] + fn create_reliable_ordered() { + let packet = Packet::reliable_ordered(test_addr(), test_payload(), Some(1)); + + assert_eq!(packet.addr(), test_addr()); + assert_eq!(packet.payload(), test_payload().as_slice()); + assert_eq!(packet.delivery_guarantee(), DeliveryGuarantee::Reliable); + assert_eq!( + packet.order_guarantee(), + OrderingGuarantee::Ordered(Some(1)) + ); + } + + #[test] + fn create_reliable_sequence() { + let packet = Packet::reliable_sequenced(test_addr(), test_payload(), Some(1)); + + assert_eq!(packet.addr(), test_addr()); + assert_eq!(packet.payload(), test_payload().as_slice()); + assert_eq!(packet.delivery_guarantee(), DeliveryGuarantee::Reliable); + assert_eq!( + packet.order_guarantee(), + OrderingGuarantee::Sequenced(Some(1)) + ); + } + + fn test_payload() -> Vec { + return "test".as_bytes().to_vec(); + } + + fn test_addr() -> SocketAddr { + "127.0.0.1:12345".parse().unwrap() } } diff --git a/src/packet/packet_type.rs b/src/packet/packet_type.rs deleted file mode 100644 index 34167e13..00000000 --- a/src/packet/packet_type.rs +++ /dev/null @@ -1,47 +0,0 @@ -use crate::packet::header::{AckedPacketHeader, FragmentHeader}; - -/// These are the different packets that could be send by te user. -#[allow(dead_code)] -pub enum PacketType { - /// Packet header containing packet information. - Normal(AckedPacketHeader), - /// Part of an packet also called 'fragment' containing fragment info. - Fragment(FragmentHeader), - /// Packet to keep the connection alive. - HeartBeat {/* fields ... */}, - /// Disconnect request - Disconnect {/* fields ... */}, -} - -#[derive(Copy, Clone, Debug, PartialOrd, PartialEq)] -/// Id to identify an certain packet type. -pub enum PacketTypeId { - /// Full packet that is not fragmented - Packet = 0, - /// Fragment of a full packet - Fragment = 1, - /// Special packet that serves as a heartbeat - HeartBeat = 2, - /// Special packet that disconnects - Disconnect = 3, - /// Unknown packet type - Unknown = 255, -} - -impl PacketTypeId { - /// Get integer value from `PacketTypeId` enum. - pub fn get_id(packet_type: PacketTypeId) -> u8 { - packet_type as u8 - } - - /// Get `PacketTypeid` enum instance from integer value. - pub fn get_packet_type(packet_type_id: u8) -> PacketTypeId { - match packet_type_id { - 0 => PacketTypeId::Packet, - 1 => PacketTypeId::Fragment, - 2 => PacketTypeId::HeartBeat, - 3 => PacketTypeId::Disconnect, - _ => PacketTypeId::Unknown, - } - } -} diff --git a/src/sequence_buffer/congestion_data.rs b/src/sequence_buffer/congestion_data.rs index 3a94629f..7478ff58 100644 --- a/src/sequence_buffer/congestion_data.rs +++ b/src/sequence_buffer/congestion_data.rs @@ -1,14 +1,15 @@ +use crate::packet::SequenceNumber; use std::time::Instant; #[derive(Clone)] /// This contains the information required to reassemble fragments. pub struct CongestionData { - pub sequence: u16, + pub sequence: SequenceNumber, pub sending_time: Instant, } impl CongestionData { - pub fn new(sequence: u16, sending_time: Instant) -> Self { + pub fn new(sequence: SequenceNumber, sending_time: Instant) -> Self { CongestionData { sequence, sending_time, diff --git a/src/sequence_buffer/reassembly_data.rs b/src/sequence_buffer/reassembly_data.rs index f3d67114..16b06f4b 100644 --- a/src/sequence_buffer/reassembly_data.rs +++ b/src/sequence_buffer/reassembly_data.rs @@ -1,9 +1,10 @@ use crate::net::constants::MAX_FRAGMENTS_DEFAULT; +use crate::packet::SequenceNumber; #[derive(Clone)] /// This contains the information required to reassemble fragments. pub struct ReassemblyData { - pub sequence: u16, + pub sequence: SequenceNumber, pub num_fragments_received: u8, pub num_fragments_total: u8, pub buffer: Vec, @@ -11,7 +12,7 @@ pub struct ReassemblyData { } impl ReassemblyData { - pub fn new(sequence: u16, num_fragments_total: u8, prealloc: usize) -> Self { + pub fn new(sequence: SequenceNumber, num_fragments_total: u8, prealloc: usize) -> Self { Self { sequence, num_fragments_received: 0, diff --git a/src/sequence_buffer/sequence_buffer_structure.rs b/src/sequence_buffer/sequence_buffer_structure.rs index 883d08de..a5685c2d 100644 --- a/src/sequence_buffer/sequence_buffer_structure.rs +++ b/src/sequence_buffer/sequence_buffer_structure.rs @@ -1,3 +1,4 @@ +use crate::packet::SequenceNumber; use std::clone::Clone; /// Collection to store data of any kind. @@ -6,7 +7,7 @@ where T: Default + Clone + Send + Sync, { entries: Vec, - entry_sequences: Vec, + entry_sequences: Vec, } impl SequenceBuffer @@ -28,7 +29,7 @@ where } /// Get mutable entry from collection by sequence number. - pub fn get_mut(&mut self, sequence: u16) -> Option<&mut T> { + pub fn get_mut(&mut self, sequence: SequenceNumber) -> Option<&mut T> { let index = self.index(sequence); if self.entry_sequences[index] != sequence { @@ -39,7 +40,7 @@ where } /// Insert new entry into the collection. - pub fn insert(&mut self, data: T, sequence: u16) -> &mut T { + pub fn insert(&mut self, data: T, sequence: SequenceNumber) -> &mut T { let index = self.index(sequence); self.entries[index] = data; @@ -49,7 +50,7 @@ where } /// Remove entry from collection. - pub fn remove(&mut self, sequence: u16) { + pub fn remove(&mut self, sequence: SequenceNumber) { // TODO: validity check let index = self.index(sequence); self.entries[index] = T::default(); @@ -57,7 +58,7 @@ where } /// checks if an certain entry exists. - pub fn exists(&self, sequence: u16) -> bool { + pub fn exists(&self, sequence: SequenceNumber) -> bool { let index = self.index(sequence); if self.entry_sequences[index] != sequence { return false; @@ -73,7 +74,7 @@ where } /// converts an sequence number to an index that could be used for the inner storage. - fn index(&self, sequence: u16) -> usize { + fn index(&self, sequence: SequenceNumber) -> usize { sequence as usize % self.entries.len() } } diff --git a/tests/fragmentation_packets_test.rs b/tests/fragmentation_packets_test.rs index 6e4454af..4142a559 100644 --- a/tests/fragmentation_packets_test.rs +++ b/tests/fragmentation_packets_test.rs @@ -3,7 +3,7 @@ mod common; #[cfg(feature = "tester")] use common::{client_addr, Client, Server, ServerEvent}; -use laminar::{DeliveryMethod, Packet}; +use laminar::{DeliveryGuarantee, OrderingGuarantee, Packet}; use log::debug; use std::net::SocketAddr; use std::{thread, time::Duration}; @@ -19,7 +19,8 @@ fn send_receive_fragment_packets() { let client = Client::new(Duration::from_millis(1), 5000); let assert_function = move |packet: Packet| { - assert_eq!(packet.delivery_method(), DeliveryMethod::ReliableUnordered); + assert_eq!(packet.order_guarantee(), OrderingGuarantee::None); + assert_eq!(packet.delivery_guarantee(), DeliveryGuarantee::Reliable); assert_eq!(packet.payload(), payload().as_slice()); }; diff --git a/tests/unreliable_packets_test.rs b/tests/unreliable_packets_test.rs index 8b8505a5..932dde94 100644 --- a/tests/unreliable_packets_test.rs +++ b/tests/unreliable_packets_test.rs @@ -4,7 +4,7 @@ mod common; #[cfg(feature = "tester")] use common::{client_addr, Client, Server, ServerEvent}; -use laminar::{DeliveryMethod, Packet}; +use laminar::{DeliveryGuarantee, OrderingGuarantee, Packet}; use log::{debug, error, info}; use std::net::SocketAddr; use std::{thread, time::Duration}; @@ -19,11 +19,8 @@ fn send_receive_unreliable_packets() { let client = Client::new(Duration::from_millis(1), 5000); let assert_function = move |packet: Packet| { - // assert_eq!(packet.addr(), client_addr); - assert_eq!( - packet.delivery_method(), - DeliveryMethod::UnreliableUnordered - ); + assert_eq!(packet.order_guarantee(), OrderingGuarantee::None); + assert_eq!(packet.delivery_guarantee(), DeliveryGuarantee::Unreliable); assert_eq!(packet.payload(), payload().as_slice()); }; @@ -66,10 +63,8 @@ fn send_receive_unreliable_packets_muliple_clients() { let client = Client::new(Duration::from_millis(16), 500); let assert_function = move |packet: Packet| { - assert_eq!( - packet.delivery_method(), - DeliveryMethod::UnreliableUnordered - ); + assert_eq!(packet.order_guarantee(), OrderingGuarantee::None); + assert_eq!(packet.delivery_guarantee(), DeliveryGuarantee::Unreliable); assert_eq!(packet.payload(), payload().as_slice()); };