From fac7e2be8352411d8887a1e374cee887071c3740 Mon Sep 17 00:00:00 2001 From: Timon Date: Wed, 27 Feb 2019 20:29:45 +0100 Subject: [PATCH] Implemented Arranging-Streams | Don't MERGE (#145) * Let's define two concepts here: _"Sequencing: this is the process of only caring about the newest items."_ [1](https://dictionary.cambridge.org/dictionary/english/sequencing) _"Ordering: this is the process of putting something in a particular order."_ [2](https://dictionary.cambridge.org/dictionary/english/ordering) - Sequencing: Only the newest items will be passed trough e.g. `1,3,2,5,4` which results in `1,3,5`. - Ordering: All items are returned in order `1,3,2,5,4` which results in `1,2,3,4,5`. What are these 'arranging streams'? You can see 'arranging streams' as something to arrange items that have no relationship at all with one another. Think of a highway where you have several lanes where cars are driving. Because there are these lanes, cars can move on faster. For example, the cargo drivers drive on the right and the high-speed cars on the left. The cargo drivers have no influence on fast cars and vice versa. If a game developer wants to send data to a client, it may be that he wants to send data ordered, unordered or sequenced. Data might be the following: 1. Player movement, we want to order player movements because we don't care about old positions. 2. Bullet movement, we want to order bullet movement because we don't care about old positions of bullets. 3. Chat messages, we want to order chat messages because it is nice to see the text in the right order. Player movement and chat messages are totally unrelated to each other and you absolutely do not want that movement packets are interrupted when a chat message is not sent. With ordering, we can only return items when all packets up to the current package are received. So if a chat package is missing, the other packages will suffer from it. It would be nice if we could order player movements and chat messages separately. This is exactly what `ordering streams` are meant for. The game developer can indicate which stream he can order his packets and how he wants to arrange them. For example, the game developer can say: "Let me set all chat messages to 'stream 1' and all motion packets to 'stream 2'. --- ci/coverage.sh | 0 src/infrastructure.rs | 1 - src/infrastructure/arranging/mod.rs | 68 ++++ src/infrastructure/arranging/ordering.rs | 441 +++++++++++++++++++++ src/infrastructure/arranging/sequencing.rs | 236 +++++++++++ 5 files changed, 745 insertions(+), 1 deletion(-) mode change 100755 => 100644 ci/coverage.sh create mode 100644 src/infrastructure/arranging/mod.rs create mode 100644 src/infrastructure/arranging/ordering.rs create mode 100644 src/infrastructure/arranging/sequencing.rs diff --git a/ci/coverage.sh b/ci/coverage.sh old mode 100755 new mode 100644 diff --git a/src/infrastructure.rs b/src/infrastructure.rs index d026845b..e65d6f4b 100644 --- a/src/infrastructure.rs +++ b/src/infrastructure.rs @@ -1,5 +1,4 @@ mod channels; -///! This module provides infrastructure logic. With infrastructure is meant, everything that's responsible for the packet flow and processing. mod delivery_method; mod fragmenter; diff --git a/src/infrastructure/arranging/mod.rs b/src/infrastructure/arranging/mod.rs new file mode 100644 index 00000000..c522b616 --- /dev/null +++ b/src/infrastructure/arranging/mod.rs @@ -0,0 +1,68 @@ +//! This module is about arranging items, over different streams, based on an certain algorithm. +//! +//! The above sentence contains a lot of important information, lets zoom in at the above sentence. +//! +//! ## Items +//! +//! By items, you can understand 'packets' and 'arranging' can be done based either with sequencing or ordering. +//! +//! ## Ordering VS Sequencing +//! Let's define two concepts here: +//! _"Sequencing: this is the process of only caring about the newest items."_ [1](https://dictionary.cambridge.org/dictionary/english/sequencing) +//! _"Ordering: this is the process of putting something in a particular order."_ [2](https://dictionary.cambridge.org/dictionary/english/ordering) +//! +//! - Sequencing: Only the newest items will be passed trough e.g. `1,3,2,5,4` which results in `1,3,5`. +//! - Ordering: All items are returned in order `1,3,2,5,4` which results in `1,2,3,4,5`. +//! +//! ## Arranging Streams +//! What are these 'arranging streams'? +//! You can see 'arranging streams' as something to arrange items that have no relationship at all with one another. +//! +//! ## Simple Example +//! Think of a highway where you have several lanes where cars are driving. +//! Because there are these lanes, cars can move on faster. +//! For example, the cargo drivers drive on the right and the high-speed cars on the left. +//! The cargo drivers have no influence on fast cars and vice versa. +//! +//! ## Real Example +//! If a game developer wants to send data to a client, it may be that he wants to send data ordered, unordered or sequenced. +//! Data might be the following: +//! 1. Player movement, we want to order player movements because we don't care about old positions. +//! 2. Bullet movement, we want to order bullet movement because we don't care about old positions of bullets. +//! 3. Chat messages, we want to order chat messages because it is nice to see the text in the right order. +//! +//! Player movement and chat messages are totally unrelated to each other and you absolutely do not want that movement packets are interrupted when a chat message is not sent. +//! With ordering, we can only return items when all packets up to the current package are received. +//! +//! So if a chat package is missing, the other packages will suffer from it. +//! It would be nice if we could order player movements and chat messages separately. This is exactly what `ordering streams` are meant for. +//! The game developer can indicate on which stream he can order his packets and how he wants to arrange them. +//! For example, the game developer can say: "Let me set all chat messages to 'stream 1' and all motion packets to 'stream 2'. + +pub mod ordering; +pub mod sequencing; + +/// A trait which could be implemented for arranging operations. +pub trait Arranging { + type ArrangingItem; + + /// Arrange the given item based on the given index. + /// If the `incoming_offset` somehow does not satisfies the arranging algorithm it returns `None`. + /// If the `incoming_offset` satisfies the arranging algorithm it returns `Some` with the passed item. + fn arrange( + &mut self, + incoming_index: usize, + item: Self::ArrangingItem, + ) -> Option; +} + +/// An arranging system that has multiple streams on which you can arrange items. +pub trait ArrangingSystem { + /// The type of stream that is used for arranging items. + type Stream; + + /// Returns the number of streams currently created. + fn stream_count(&self) -> usize; + /// Try to get a `Stream` by `stream_id`. When the stream does not exist, it will be inserted by the given `stream_id` and returned. + fn get_or_create_stream(&mut self, stream_id: u8) -> &mut Self::Stream; +} diff --git a/src/infrastructure/arranging/ordering.rs b/src/infrastructure/arranging/ordering.rs new file mode 100644 index 00000000..ff89f420 --- /dev/null +++ b/src/infrastructure/arranging/ordering.rs @@ -0,0 +1,441 @@ +//! Module with logic for arranging items in-order on multiple streams. +//! +//! _"Order is the process of putting something in a particular order."_ +//! +//! # How ordering works. +//! Imagine we have this sequence: `1,5,4,2,3` and we want the user to eventually see: `1,2,3,4,5`. +//! +//! Let's define some variables: +//! +//! ## Variable Setup +//! **hashmap** +//! +//! | Key | Value | +//! | :-------------: | :-------------: | +//! | _ | _ | +//! +//! `expected_index = 1;` +//! +//! ## Ordering +//! **Receive '1'** +//! +//! - Packet 1 is equals to our expected index we can return it immediately. +//! - Increase `expected_index` to '2' +//! +//! **Receive '5'** +//! +//! Packet '5' is not equal to our expected index so we need to store it until we received **all** packets up to 5 before returning. +//! +//! | Key | Value | +//! | :-------------: | :-------------: | +//! | 5 | packet | +//! +//! **Receive '4'** +//! +//! Packet '4' is not equal to our expected index so we need to store it until we received **all** packets up to 4 before returning. +//! +//! | Key | Value | +//! | :-------------: | :-------------: | +//! | 5 | packet | +//! | 4 | packet | +//! +//! **Receive '3'** +//! +//! Packet '3' is not equal to our expected index so we need to store it until we received **all** packets up to 3 before returning. +//! +//! | Key | Value | +//! | :-------------: | :-------------: | +//! | 5 | packet | +//! | 4 | packet | +//! | 4 | packet | +//! +//! **Receive '2'** +//! +//! - Packet 2 is equals to our expected index we can return it immediately. +//! - Increase `expected_index` to '3' +//! +//! Now we received our `expected_index` we can check if we have the next `expected_index` in storage. +//! +//! This could be done with an iterator which returns packets as long there are packets in our storage matching the `expected_index`. +//! +//! ```rust +//! let stream = OrderingStream::new(); +//! +//! let iter = stream.iter_mut(); +//! +//! while let Some(packet) = iter.next() { +//! // packets from iterator are in order. +//! } +//! ``` +//! +//! # Remarks +//! - See [super-module](../index.html) description for more details. + +use super::{Arranging, ArrangingSystem}; +use std::collections::HashMap; + +/// An ordering system that can arrange items in order on different streams. +/// +/// Checkout [`OrderingStream`](./struct.OrderingStream.html), or module description for more details. +/// +/// # Remarks +/// - See [super-module](../index.html) for more information about streams. +pub struct OrderingSystem { + // '[HashMap]' with streams on which items can be ordered. + streams: HashMap>, +} + +impl OrderingSystem { + /// Constructs a new [`OrderingSystem`](./struct.OrderingSystem.html). + pub fn new() -> OrderingSystem { + OrderingSystem { + streams: HashMap::with_capacity(32), + } + } +} + +impl<'a, T> ArrangingSystem for OrderingSystem { + type Stream = OrderingStream; + + /// Returns the number of ordering streams currently active. + fn stream_count(&self) -> usize { + self.streams.len() + } + + /// Try to get an [`OrderingStream`](./struct.OrderingStream.html) by `stream_id`. + /// When the stream does not exist, it will be inserted by the given `stream_id` and returned. + fn get_or_create_stream(&mut self, stream_id: u8) -> &mut Self::Stream { + self.streams + .entry(stream_id) + .or_insert_with(|| OrderingStream::new(stream_id)) + } +} + +/// A stream on which items will be arranged in-order. +/// +/// # Algorithm +/// +/// With every ordering operation an `incoming_index` is given. We also keep a local record of the `expected_index`. +/// +/// There are three scenarios that are important to us. +/// 1. `incoming_index` == `expected_index`. +/// This package meets the expected order, so we can return it immediately. +/// 2. `incoming_index` > `expected_index`. +/// This package is newer than we expect, so we have to hold it temporarily until we have received all previous packages. +/// 3. `incoming_index`< `expected_index` +/// This can only happen in cases where we have a duplicated package. Again we don't give anything back. +/// # Remarks +/// - See [super-module](../index.html) for more information about streams. +pub struct OrderingStream { + // the id of this stream. + stream_id: u8, + // the storage for items that are waiting for older items to arrive. + // the items will be stored by key and value where the key is de incoming index and value the item value. + storage: HashMap, + // the next expected item index. + expected_index: usize, +} + +impl OrderingStream { + /// Constructs a new, empty [`OrderingStream`](./struct.OrderingStream.html). + /// + /// The default stream will have a capacity of 32 items. + pub fn new(stream_id: u8) -> OrderingStream { + OrderingStream::with_capacity(1024, stream_id) + } + + /// Constructs a new, empty [`OrderingStream`] with the specified capacity. + /// + /// The stream will be able to hold exactly capacity elements without + /// reallocating. If capacity is 0, the vector will not allocate. + /// + /// It is important to note that although the returned stream has the capacity specified, + /// the stream will have a zero length. + /// + /// [`OrderingStream`]: ./struct.OrderingStream.html + pub fn with_capacity(size: usize, stream_id: u8) -> OrderingStream { + OrderingStream { + storage: HashMap::with_capacity(size), + expected_index: 1, + stream_id, + } + } + + /// Returns the identifier of this stream. + fn stream_id(&self) -> u8 { + self.stream_id + } + + /// Returns the next expected index. + pub fn expected_index(&self) -> usize { + self.expected_index + } + + /// Returns an iterator of stored items. + /// + /// # Algorithm for returning items from an Iterator. + /// + /// 1. See if there is an item matching our `expected_index` + /// 2. If there is return the `Some(item)` + /// - Increase the `expected_index` + /// - Start at '1' + /// 3. If there isn't return `None` + /// + /// # Example + /// + /// ```rust + /// let stream = OrderingStream::new(); + /// + /// let iter = stream.iter_mut(); + /// + /// while let Some(item) = iter.next() { + /// // Items from iterator are in order. + /// } + /// ``` + /// + /// # Remarks + /// - Iterator mutates the `expected_index`. + /// - You can't use this iterator for iterating trough all cached values. + pub fn iter_mut(&mut self) -> IterMut { + IterMut { + items: &mut self.storage, + expected_index: &mut self.expected_index, + } + } +} + +impl Arranging for OrderingStream { + type ArrangingItem = T; + + /// Will order the given item based on the ordering algorithm. + /// + /// With every ordering operation an `incoming_index` is given. We also keep a local record of the `expected_index`. + /// + /// # Algorithm + /// + /// There are three scenarios that are important to us. + /// 1. `incoming_index` == `expected_index`. + /// This package meets the expected order, so we can return it immediately. + /// 2. `incoming_index` > `expected_index`. + /// This package is newer than we expect, so we have to hold it temporarily until we have received all previous packages. + /// 3. `incoming_index` < `expected_index` + /// This can only happen in cases where we have a duplicated package. Again we don't give anything back. + /// + /// # Remark + /// - When we receive an item there is a possibility that a gab is filled and one or more items will could be returned. + /// You should use the `iter_mut` instead for reading the items in order. + /// However the item given to `arrange` will be returned directly when it matches the `expected_index`. + fn arrange( + &mut self, + incoming_offset: usize, + item: Self::ArrangingItem, + ) -> Option { + if incoming_offset == self.expected_index { + self.expected_index += 1; + Some(index) + } else if incoming_offset > self.expected_index { + self.storage.insert(incoming_offset, index); + None + } else { + // only occurs when we get a duplicated incoming_offset. + None + } + } +} + +/// Mutable Iterator for [`OrderingStream`](./struct.OrderingStream.html). +/// +/// # Algorithm for returning items from Iterator. +/// +/// 1. See if there is an item matching our `expected_index` +/// 2. If there is return the `Some(item)` +/// - Increase the `expected_index` +/// - Start at '1' +/// 3. If there isn't return `None` +/// +/// # Remarks +/// +/// - Iterator mutates the `expected_index`. +/// - You can't use this iterator for iterating trough all cached values. +pub struct IterMut<'a, T> { + items: &'a mut HashMap, + expected_index: &'a mut usize, +} + +impl<'a, T> Iterator for IterMut<'a, T> { + type Item = T; + + /// Returns `Some` when there is an item in our cache matching the `expected_index`. + /// Returns `None` if there are no times matching our `expected` index. + fn next(&mut self) -> Option<::Item> { + match self.items.remove(&self.expected_index) { + None => None, + Some(e) => { + *self.expected_index += 1; + Some(e) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::{Arranging, ArrangingSystem, OrderingSystem}; + + #[derive(Debug, PartialEq, Clone)] + struct Packet { + pub sequence: usize, + pub ordering_stream: u8, + } + + impl Packet { + fn new(sequence: usize, ordering_stream: u8) -> Packet { + Packet { + sequence, + ordering_stream, + } + } + } + + #[test] + fn create_stream() { + let mut system: OrderingSystem = OrderingSystem::new(); + let stream = system.get_or_create_stream(1); + + assert_eq!(stream.expected_index(), 1); + assert_eq!(stream.stream_id(), 1); + } + + #[test] + fn create_existing_stream() { + let mut system: OrderingSystem = OrderingSystem::new(); + + system.get_or_create_stream(1); + let stream = system.get_or_create_stream(1); + + assert_eq!(stream.stream_id(), 1); + } + + #[test] + fn iter_mut() { + let mut system: OrderingSystem = OrderingSystem::new(); + + system.get_or_create_stream(1); + let stream = system.get_or_create_stream(1); + + let stub_packet1 = Packet::new(1, 1); + let stub_packet2 = Packet::new(2, 1); + let stub_packet3 = Packet::new(3, 1); + let stub_packet4 = Packet::new(4, 1); + let stub_packet5 = Packet::new(5, 1); + + { + assert_eq!( + stream.arrange(1, stub_packet1.clone()).unwrap(), + stub_packet1 + ); + + stream.arrange(4, stub_packet4.clone()).is_none(); + stream.arrange(5, stub_packet5.clone()).is_none(); + stream.arrange(3, stub_packet3.clone()).is_none(); + } + { + let mut iterator = stream.iter_mut(); + + // since we are awaiting for packet '2' our iterator should not return yet. + assert_eq!(iterator.next(), None); + } + { + assert_eq!( + stream.arrange(2, stub_packet2.clone()).unwrap(), + stub_packet2 + ); + } + { + // since we processed packet 2 by now we should be able to iterate and get back: 3,4,5; + let mut iterator = stream.iter_mut(); + + assert_eq!(iterator.next().unwrap(), stub_packet3); + assert_eq!(iterator.next().unwrap(), stub_packet4); + assert_eq!(iterator.next().unwrap(), stub_packet5); + } + } + + /// Asserts that the given collection, on the left, should result - after it is ordered - into the given collection, on the right. + macro_rules! assert_order { + ( [$( $x:expr ),*] , [$( $y:expr),*] , $stream_id:expr) => { + { + // initialize vector of given range on the left. + let mut before: Vec = Vec::new(); + $( + before.push($x); + )* + + // initialize vector of given range on the right. + let mut after: Vec = Vec::new(); + $( + after.push($y); + )* + + // generate test packets + let mut packets = Vec::new(); + for (_, v) in before.iter().enumerate() { + packets.push(Packet::new(*v, $stream_id)); + } + + // create system to handle the ordering of our packets. + let mut ordering_system = OrderingSystem::::new(); + + // get stream '1' to order the packets on. + let stream = ordering_system.get_or_create_stream(1); + + // order packets + let mut ordered_packets = Vec::new(); + + for packet in packets.into_iter() { + match stream.arrange(packet.sequence, packet.clone()) { + Some(packet) => { + ordered_packets.push(packet.sequence); + // empty the remaining ordered packets into an vector so that we can check if they are ordered. + let mut iter = stream.iter_mut(); + + while let Some(packet) = iter.next() { + ordered_packets.push(packet.sequence); + } + } + None => {} + }; + } + + // assert if the expected range of the given numbers equals to the processed range which is in sequence. + assert_eq!(after, ordered_packets); + } + }; + } + + #[test] + fn ordering_test() { + // we order on stream 1 + assert_order!([1, 3, 5, 4, 2], [1, 2, 3, 4, 5], 1); + assert_order!([1, 5, 4, 3, 2], [1, 2, 3, 4, 5], 1); + assert_order!([5, 3, 4, 2, 1], [1, 2, 3, 4, 5], 1); + assert_order!([4, 3, 2, 1, 5], [1, 2, 3, 4, 5], 1); + assert_order!([2, 1, 4, 3, 5], [1, 2, 3, 4, 5], 1); + assert_order!([5, 2, 1, 4, 3], [1, 2, 3, 4, 5], 1); + assert_order!([3, 2, 4, 1, 5], [1, 2, 3, 4, 5], 1); + assert_order!([2, 1, 4, 3, 5], [1, 2, 3, 4, 5], 1); + } + + #[test] + fn multiple_stream_ordering_test() { + // we order on streams [1...8] + assert_order!([1, 3, 5, 4, 2], [1, 2, 3, 4, 5], 1); + assert_order!([1, 5, 4, 3, 2], [1, 2, 3, 4, 5], 2); + assert_order!([5, 3, 4, 2, 1], [1, 2, 3, 4, 5], 3); + assert_order!([4, 3, 2, 1, 5], [1, 2, 3, 4, 5], 4); + assert_order!([2, 1, 4, 3, 5], [1, 2, 3, 4, 5], 5); + assert_order!([5, 2, 1, 4, 3], [1, 2, 3, 4, 5], 6); + assert_order!([3, 2, 4, 1, 5], [1, 2, 3, 4, 5], 7); + assert_order!([2, 1, 4, 3, 5], [1, 2, 3, 4, 5], 8); + } +} diff --git a/src/infrastructure/arranging/sequencing.rs b/src/infrastructure/arranging/sequencing.rs new file mode 100644 index 00000000..985c02f6 --- /dev/null +++ b/src/infrastructure/arranging/sequencing.rs @@ -0,0 +1,236 @@ +//! Module with logic for arranging items in-sequence on multiple streams. +//! +//! "_Sequencing is the process of only caring about the newest items._" +//! +//! With sequencing, we only care about the newest items. When old items arrive we just toss them away. +//! +//! Example: sequence `1,3,2,5,4` will result into `1,3,5`. +//! +//! # Remarks +//! - See [super-module](../index.html) description for more details. + +use super::{Arranging, ArrangingSystem}; +use std::{ + collections::HashMap, + marker::PhantomData +}; + +/// An sequencing system that can arrange items in sequence on different streams. +/// +/// Checkout [`SequencingStream`](./struct.SequencingStream.html), or module description for more details. +/// +/// # Remarks +/// - See [super-module](../index.html) for more information about streams. +pub struct SequencingSystem { + // '[HashMap]' with streams on which items can be arranged in-sequence. + streams: HashMap>, +} + +impl SequencingSystem { + /// Constructs a new [`SequencingSystem`](./struct.SequencingSystem.html). + pub fn new() -> SequencingSystem { + SequencingSystem { + streams: HashMap::with_capacity(32), + } + } +} + +impl ArrangingSystem for SequencingSystem { + type Stream = SequencingStream; + + /// Returns the number of sequencing streams currently created. + fn stream_count(&self) -> usize { + self.streams.len() + } + + /// Try to get an [`SequencingStream`](./struct.SequencingStream.html) by `stream_id`. + /// When the stream does not exist, it will be inserted by the given `stream_id` and returned. + fn get_or_create_stream(&mut self, stream_id: u8) -> &mut Self::Stream { + self.streams + .entry(stream_id) + .or_insert_with(|| SequencingStream::new(stream_id)) + } +} + +/// A stream on which items will be arranged in-sequence. +/// +/// # Algorithm +/// +/// With every sequencing operation an `top_index` is given. +/// +/// There are two scenarios that are important to us. +/// 1. `incoming_index` >= `top_index`. +/// This item is the newest or newer than the last one we have seen. +/// Because of that we should return it back to the user. +/// 2. `incoming_index` < `top_index`. +/// This item is older than the newest item we have seen so far. +/// Since we don't care about old items we can toss it a way. +/// +/// # Remarks +/// - See [super-module](../index.html) for more information about streams. +pub struct SequencingStream { + // the id of this stream. + stream_id: u8, + // the highest seen item index. + top_index: usize, + // I need `PhantomData`, otherwise, I can't use a generic in the `Arranging` implementation because `T` is not constrained. + phantom: PhantomData +} + +impl SequencingStream { + /// Constructs a new, empty '[SequencingStream](./struct.SequencingStream.html)'. + /// + /// The default stream will have a capacity of 32 items. + pub fn new(stream_id: u8) -> SequencingStream { + SequencingStream { + stream_id, + top_index: 0, + phantom: PhantomData + } + } + + /// Returns the identifier of this stream. + fn stream_id(&self) -> u8 { + self.stream_id + } +} + +impl Arranging for SequencingStream { + type ArrangingItem = T; + + /// Will arrange the given item based on a sequencing algorithm. + /// + /// With every sequencing operation an `top_index` is given. + /// + /// # Algorithm + /// + /// There are two scenarios that are important to us. + /// 1. `incoming_index` >= `top_index`. + /// This item is the newest or newer than the last one we have seen. + /// Because of that we should return it back to the user. + /// 2. `incoming_index` < `top_index`. + /// This item is older than we the newest packet we have seen so far. + /// Since we don't care about old items we can toss it a way. + /// + /// # Remark + /// - All old packets will be tossed away. + /// - None is returned when an old packet is received. + fn arrange( + &mut self, + incoming_index: usize, + item: Self::ArrangingItem, + ) -> Option { + if incoming_index > self.top_index { + self.top_index = incoming_index; + return Some(item); + } + None + } +} + +#[cfg(test)] +mod tests { + use super::{Arranging, ArrangingSystem, SequencingSystem}; + + #[derive(Debug, PartialEq, Clone)] + struct Packet { + pub sequence: usize, + pub ordering_stream: u8, + } + + impl Packet { + fn new(sequence: usize, ordering_stream: u8) -> Packet { + Packet { + sequence, + ordering_stream, + } + } + } + + #[test] + fn create_stream() { + let mut system: SequencingSystem = SequencingSystem::new(); + let stream = system.get_or_create_stream(1); + + assert_eq!(stream.stream_id(), 1); + } + + #[test] + fn create_existing_stream() { + let mut system: SequencingSystem = SequencingSystem::new(); + + system.get_or_create_stream(1); + let stream = system.get_or_create_stream(1); + + assert_eq!(stream.stream_id(), 1); + } + + /// asserts that the given collection, on the left, should result - after it is sequenced - into the given collection, on the right. + macro_rules! assert_sequence { + ( [$( $x:expr ),*], [$( $y:expr),*], $stream_id:expr) => { + { + // initialize vector of given range on the left. + let mut before: Vec = Vec::new(); + $( + before.push($x); + )* + + // initialize vector of given range on the right. + let mut after: Vec = Vec::new(); + $( + after.push($y); + )* + + // generate test packets + let mut packets = Vec::new(); + + for (_, v) in before.iter().enumerate() { + packets.push(Packet::new(*v, $stream_id)); + } + + // create system to handle sequenced packets. + let mut sequence_system = SequencingSystem::::new(); + + // get stream '1' to process the sequenced packets on. + let stream = sequence_system.get_or_create_stream(1); + + // get packets arranged in sequence. + let mut sequenced_packets = Vec::new(); + + for packet in packets.into_iter() { + match stream.arrange(packet.sequence, packet.clone()) { + Some(packet) => { sequenced_packets.push(packet.sequence);}, + None => {} + }; + } + + // assert if the expected range of the given numbers equals to the processed range which is in sequence. + assert_eq!(after, sequenced_packets); + } + }; + } + + // This will assert a bunch of ranges to a correct sequenced range. + #[test] + fn sequencing_test() { + assert_sequence!([1, 3, 5, 4, 2], [1, 3, 5], 1); + assert_sequence!([1, 5, 4, 3, 2], [1, 5], 1); + assert_sequence!([5, 3, 4, 2, 1], [5], 1); + assert_sequence!([4, 3, 2, 1, 5], [4, 5], 1); + assert_sequence!([2, 1, 4, 3, 5], [2, 4, 5], 1); + assert_sequence!([5, 2, 1, 4, 3], [5], 1); + assert_sequence!([3, 2, 4, 1, 5], [3, 4, 5], 1); + } + + // This will assert a bunch of ranges to a correct sequenced range. + #[test] + fn multiple_stream_sequencing_test() { + assert_sequence!([1, 3, 5, 4, 2], [1, 3, 5], 1); + assert_sequence!([1, 5, 4, 3, 2], [1, 5], 2); + assert_sequence!([5, 3, 4, 2, 1], [5], 3); + assert_sequence!([4, 3, 2, 1, 5], [4, 5], 4); + assert_sequence!([2, 1, 4, 3, 5], [2, 4, 5], 5); + assert_sequence!([5, 2, 1, 4, 3], [5], 6); + assert_sequence!([3, 2, 4, 1, 5], [3, 4, 5], 7); + } +}