diff --git a/benches/packet_processing.rs b/benches/packet_processing.rs index e39e1498..51eca391 100644 --- a/benches/packet_processing.rs +++ b/benches/packet_processing.rs @@ -5,8 +5,10 @@ extern crate criterion; use std::sync::Arc; use self::laminar::net::{NetworkConfig, VirtualConnection}; -use self::laminar::packet::header::{HeaderParser, HeaderReader, AckedPacketHeader, StandardHeader}; -use self::laminar::packet::{PacketTypeId}; +use self::laminar::packet::header::{ + AckedPacketHeader, HeaderParser, HeaderReader, StandardHeader, +}; +use self::laminar::packet::PacketTypeId; use laminar::infrastructure::DeliveryMethod; use self::criterion::Criterion; @@ -14,45 +16,63 @@ use self::criterion::Criterion; const SERVER_ADDR: &str = "127.0.0.1:12345"; const CLIENT_ADDR: &str = "127.0.0.1:12346"; -fn process_packet_before_send(connection: &mut VirtualConnection, config: &NetworkConfig, delivery_method: DeliveryMethod) { +fn process_packet_before_send( + connection: &mut VirtualConnection, + config: &NetworkConfig, + delivery_method: DeliveryMethod, +) { let payload = vec![1, 2, 3, 4, 5]; - let packet_data = connection.process_outgoing(&payload, delivery_method).unwrap(); + let packet_data = connection + .process_outgoing(&payload, delivery_method) + .unwrap(); } fn send_unreliable_benchmark(c: &mut Criterion) { let config = NetworkConfig::default(); - let mut connection = VirtualConnection::new(SERVER_ADDR.parse().unwrap(), &Arc::new(NetworkConfig::default())); + let mut connection = VirtualConnection::new( + SERVER_ADDR.parse().unwrap(), + &Arc::new(NetworkConfig::default()), + ); c.bench_function("process unreliable before send", move |b| { - b.iter(|| process_packet_before_send(&mut connection, &config, DeliveryMethod::UnreliableUnordered)) + b.iter(|| { + process_packet_before_send( + &mut connection, + &config, + DeliveryMethod::UnreliableUnordered, + ) + }) }); } fn send_reliable_benchmark(c: &mut Criterion) { let config = NetworkConfig::default(); - let mut connection = VirtualConnection::new(SERVER_ADDR.parse().unwrap(), &Arc::new(NetworkConfig::default())); + let mut connection = VirtualConnection::new( + SERVER_ADDR.parse().unwrap(), + &Arc::new(NetworkConfig::default()), + ); c.bench_function("process reliable before send", move |b| { - b.iter(|| process_packet_before_send(&mut connection, &config, DeliveryMethod::ReliableUnordered)) + b.iter(|| { + process_packet_before_send(&mut connection, &config, DeliveryMethod::ReliableUnordered) + }) }); } -fn process_packet_when_received( - connection: &mut VirtualConnection, - data: &Vec, -) { - let packet = connection - .process_incoming(&data) - .unwrap() - .unwrap(); +fn process_packet_when_received(connection: &mut VirtualConnection, data: &Vec) { + let packet = connection.process_incoming(&data).unwrap().unwrap(); } fn receive_unreliable_benchmark(c: &mut Criterion) { - let mut connection = VirtualConnection::new(SERVER_ADDR.parse().unwrap(), &Arc::new(NetworkConfig::default())); + let mut connection = VirtualConnection::new( + SERVER_ADDR.parse().unwrap(), + &Arc::new(NetworkConfig::default()), + ); // setup fake received bytes. - let packet_header = StandardHeader::new(DeliveryMethod::UnreliableUnordered, PacketTypeId::Packet); + let packet_header = + StandardHeader::new(DeliveryMethod::UnreliableUnordered, PacketTypeId::Packet); let mut buffer = Vec::with_capacity(packet_header.size() as usize); packet_header.parse(&mut buffer).unwrap(); @@ -64,10 +84,18 @@ fn receive_unreliable_benchmark(c: &mut Criterion) { } fn receive_reliable_benchmark(c: &mut Criterion) { - let mut connection = VirtualConnection::new(SERVER_ADDR.parse().unwrap(), &Arc::new(NetworkConfig::default())); + let mut connection = VirtualConnection::new( + SERVER_ADDR.parse().unwrap(), + &Arc::new(NetworkConfig::default()), + ); // setup fake received bytes. - let packet_header = AckedPacketHeader::new(StandardHeader::new(DeliveryMethod::ReliableUnordered, PacketTypeId::Packet),0, 1, 2); + let packet_header = AckedPacketHeader::new( + StandardHeader::new(DeliveryMethod::ReliableUnordered, PacketTypeId::Packet), + 0, + 1, + 2, + ); let mut buffer = Vec::with_capacity(packet_header.size() as usize); packet_header.parse(&mut buffer).unwrap(); @@ -78,5 +106,11 @@ fn receive_reliable_benchmark(c: &mut Criterion) { }); } -criterion_group!(benches, send_reliable_benchmark, send_unreliable_benchmark, receive_reliable_benchmark, receive_unreliable_benchmark); +criterion_group!( + benches, + send_reliable_benchmark, + send_unreliable_benchmark, + receive_reliable_benchmark, + receive_unreliable_benchmark +); criterion_main!(benches); diff --git a/rustfmt.toml b/rustfmt.toml index ebf95cf9..44148a2d 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,2 +1 @@ reorder_imports = true -wrap_comments = false \ No newline at end of file diff --git a/src/error/mod.rs b/src/error/mod.rs index 017bfcdc..592b96c3 100644 --- a/src/error/mod.rs +++ b/src/error/mod.rs @@ -2,11 +2,11 @@ mod error_kinds; mod network_error; -pub use self::network_error::{NetworkErrorKind, NetworkError}; -pub use self::error_kinds::{PacketErrorKind, FragmentErrorKind}; +pub use self::error_kinds::{FragmentErrorKind, PacketErrorKind}; +pub use self::network_error::{NetworkError, NetworkErrorKind}; -use std::result; use failure::Error; +use std::result; /// Convenience alias for a standard result pub type Result = result::Result; @@ -35,5 +35,3 @@ pub type Result = result::Result; /// } /// ``` pub type NetworkResult = result::Result; - - diff --git a/src/error/network_error.rs b/src/error/network_error.rs index a37fd7c9..65381e31 100644 --- a/src/error/network_error.rs +++ b/src/error/network_error.rs @@ -1,23 +1,31 @@ use super::{FragmentErrorKind, PacketErrorKind}; -use std::sync::PoisonError; -use std::fmt::{self, Display,Formatter}; -use std::io; use std::error::Error; +use std::fmt::{self, Display, Formatter}; +use std::io; +use std::sync::PoisonError; -use failure::{Fail, Backtrace, Context}; +use failure::{Backtrace, Context, Fail}; #[derive(Fail, Debug)] /// Enum with all possible network errors that could occur. -pub enum NetworkErrorKind -{ - #[fail(display = "Something went wrong with receiving/parsing fragments. Reason: {:?}.", _0)] +pub enum NetworkErrorKind { + #[fail( + display = "Something went wrong with receiving/parsing fragments. Reason: {:?}.", + _0 + )] /// Error relating to receiving or parsing a fragment FragmentError(FragmentErrorKind), - #[fail(display = "Something went wrong with receiving/parsing packets. Reason: {:?}.", _0)] + #[fail( + display = "Something went wrong with receiving/parsing packets. Reason: {:?}.", + _0 + )] /// Error relating to receiving or parsing a packet PacketError(PacketErrorKind), - #[fail(display = "Could not add a connection to the connection pool, because the connection lock is poisoned. Reason: {:?}.", _0)] + #[fail( + display = "Could not add a connection to the connection pool, because the connection lock is poisoned. Reason: {:?}.", + _0 + )] /// Failed to add a connection FailedToAddConnection(String), #[fail(display = "An IO Error occurred. Reason: {:?}.", _0)] @@ -39,8 +47,7 @@ pub enum NetworkErrorKind #[derive(Debug)] /// An error that could occur during network operations. -pub struct NetworkError -{ +pub struct NetworkError { inner: Context, } @@ -75,7 +82,6 @@ impl Display for NetworkError { } } - impl NetworkError { /// Get the error kind from the error. This is useful when you want to match on the error kind. pub fn kind(&self) -> &NetworkErrorKind { @@ -90,7 +96,9 @@ impl NetworkError { impl From for NetworkError { fn from(kind: NetworkErrorKind) -> NetworkError { - NetworkError { inner: Context::new(kind) } + NetworkError { + inner: Context::new(kind), + } } } @@ -108,13 +116,13 @@ impl From for NetworkError { impl From for NetworkError { fn from(inner: FragmentErrorKind) -> Self { - NetworkErrorKind::FragmentError (inner).into() + NetworkErrorKind::FragmentError(inner).into() } } impl From for NetworkError { fn from(inner: PacketErrorKind) -> Self { - NetworkErrorKind::PacketError (inner).into() + NetworkErrorKind::PacketError(inner).into() } } @@ -122,4 +130,4 @@ impl From> for NetworkError { fn from(inner: PoisonError) -> Self { NetworkErrorKind::FailedToAddConnection(inner.description().to_owned()).into() } -} \ No newline at end of file +} diff --git a/src/events.rs b/src/events.rs index 352390ff..e2dda7e3 100644 --- a/src/events.rs +++ b/src/events.rs @@ -25,7 +25,7 @@ pub enum Event { #[cfg(test)] mod test { use super::Event; - use net::{VirtualConnection, NetworkConfig}; + use net::{NetworkConfig, VirtualConnection}; use std::net::ToSocketAddrs; use std::sync::{Arc, RwLock}; @@ -37,7 +37,10 @@ mod test { let addr = format!("{}:{}", TEST_HOST_IP, TEST_PORT).to_socket_addrs(); let mut addr = addr.unwrap(); - let test_conn = Arc::new(RwLock::new(VirtualConnection::new(addr.next().unwrap(), &Arc::new(NetworkConfig::default())))); + let test_conn = Arc::new(RwLock::new(VirtualConnection::new( + addr.next().unwrap(), + &Arc::new(NetworkConfig::default()), + ))); let _ = Event::Connected(test_conn); } } diff --git a/src/infrastructure/channels/mod.rs b/src/infrastructure/channels/mod.rs index 65706b6b..19dfeff1 100644 --- a/src/infrastructure/channels/mod.rs +++ b/src/infrastructure/channels/mod.rs @@ -1,23 +1,26 @@ //! This module provides channels for processing packets of different reliabilities. -mod unreliable_channel; -mod sequenced_channel; mod reliable_channel; +mod sequenced_channel; +mod unreliable_channel; -use infrastructure::DeliveryMethod; use error::NetworkResult; +use infrastructure::DeliveryMethod; use packet::PacketData; -pub use self::unreliable_channel::UnreliableChannel; -pub use self::sequenced_channel::SequencedChannel; pub use self::reliable_channel::ReliableChannel; +pub use self::sequenced_channel::SequencedChannel; +pub use self::unreliable_channel::UnreliableChannel; /// This provides an abstraction for processing packets to their given reliability. pub trait Channel { /// Process a packet before sending it and return a packet instance with the given raw data. - fn process_outgoing(&mut self, payload: &[u8], delivery_method: DeliveryMethod) -> NetworkResult; + fn process_outgoing( + &mut self, + payload: &[u8], + delivery_method: DeliveryMethod, + ) -> NetworkResult; /// Progress an packet on receive and receive the processed data. - fn process_incoming<'d>(&mut self, buffer: &'d[u8]) -> NetworkResult<&'d[u8]>; + fn process_incoming<'d>(&mut self, buffer: &'d [u8]) -> NetworkResult<&'d [u8]>; } - diff --git a/src/infrastructure/channels/sequenced_channel.rs b/src/infrastructure/channels/sequenced_channel.rs index 17e9b51d..5e9e7818 100644 --- a/src/infrastructure/channels/sequenced_channel.rs +++ b/src/infrastructure/channels/sequenced_channel.rs @@ -1,8 +1,8 @@ use super::Channel; -use packet::PacketData; -use infrastructure::DeliveryMethod; use error::NetworkResult; +use infrastructure::DeliveryMethod; +use packet::PacketData; /// This channel should be used for processing packets sequenced. /// @@ -26,11 +26,15 @@ impl SequencedChannel { } impl Channel for SequencedChannel { - fn process_outgoing(&mut self, _payload: &[u8], _delivery_method: DeliveryMethod) -> NetworkResult { + fn process_outgoing( + &mut self, + _payload: &[u8], + _delivery_method: DeliveryMethod, + ) -> NetworkResult { unimplemented!() } - fn process_incoming<'d>(&mut self, _buffer: &'d[u8]) -> NetworkResult<&'d[u8]> { + fn process_incoming<'d>(&mut self, _buffer: &'d [u8]) -> NetworkResult<&'d [u8]> { unimplemented!() } -} \ No newline at end of file +} diff --git a/src/infrastructure/delivery_method.rs b/src/infrastructure/delivery_method.rs index 0287880f..1a18e603 100644 --- a/src/infrastructure/delivery_method.rs +++ b/src/infrastructure/delivery_method.rs @@ -24,9 +24,8 @@ /// You could say for example I want the guarantee for my packets to arrive, however they don't need to be in order. /// /// Laminar provides different kind of reliabilities contained within this enum. -#[derive(Copy, Clone, Debug,PartialOrd, PartialEq, Eq)] -pub enum DeliveryMethod -{ +#[derive(Copy, Clone, Debug, PartialOrd, PartialEq, Eq)] +pub enum DeliveryMethod { /// Unreliable. Packets can be dropped, duplicated or arrive without order. /// /// **Details** @@ -79,7 +78,7 @@ pub enum DeliveryMethod /// /// Toss away any packets that are older than the most recent (like a position update, you don't care about older ones), /// packets may be dropped, just the application may not receive older ones if a newer one came in first. - Sequenced, + Sequenced, } impl DeliveryMethod { @@ -96,7 +95,7 @@ impl DeliveryMethod { 2 => DeliveryMethod::ReliableUnordered, 3 => DeliveryMethod::ReliableOrdered, 4 => DeliveryMethod::Sequenced, - _ => DeliveryMethod::UnreliableUnordered + _ => DeliveryMethod::UnreliableUnordered, } } -} \ No newline at end of file +} diff --git a/src/infrastructure/fragmenter.rs b/src/infrastructure/fragmenter.rs index 8b2bc680..0301a9d2 100644 --- a/src/infrastructure/fragmenter.rs +++ b/src/infrastructure/fragmenter.rs @@ -1,22 +1,25 @@ -use packet::header::{FragmentHeader, AckedPacketHeader, HeaderReader, HeaderParser}; -use sequence_buffer::{ReassemblyData, SequenceBuffer}; use error::{FragmentErrorKind, NetworkResult}; use net::NetworkConfig; +use packet::header::{AckedPacketHeader, FragmentHeader, HeaderParser, HeaderReader}; use packet::PacketData; +use sequence_buffer::{ReassemblyData, SequenceBuffer}; -use std::sync::Arc; use std::io::{Cursor, Read, Write}; +use std::sync::Arc; /// Type that will manage fragmentation of packets. pub struct Fragmentation { fragments: SequenceBuffer, - config: Arc + config: Arc, } impl Fragmentation { /// Creates and returns a new Fragmentation pub fn new(config: &Arc) -> Fragmentation { - Fragmentation { fragments: SequenceBuffer::with_capacity(config.fragment_reassembly_buffer_size), config: config.clone() } + Fragmentation { + fragments: SequenceBuffer::with_capacity(config.fragment_reassembly_buffer_size), + config: config.clone(), + } } /// This functions checks how many times a number fits into another number and will round up. @@ -58,16 +61,27 @@ impl Fragmentation { } /// Split the given payload into fragments and write those fragments to the passed packet data. - pub fn spit_into_fragments<'d>(payload: &'d[u8], acked_header: AckedPacketHeader, packet_data: &mut PacketData, config: &Arc) -> NetworkResult<()> { + pub fn spit_into_fragments<'d>( + payload: &'d [u8], + acked_header: AckedPacketHeader, + packet_data: &mut PacketData, + config: &Arc, + ) -> NetworkResult<()> { let payload_length = payload.len() as u16; - let num_fragments = Fragmentation::total_fragments_needed(payload_length, config.fragment_size) as u8; /* safe cast max fragments is u8 */ + let num_fragments = + Fragmentation::total_fragments_needed(payload_length, config.fragment_size) as u8; /* safe cast max fragments is u8 */ if num_fragments > config.max_fragments { Err(FragmentErrorKind::ExceededMaxFragments)?; } for fragment_id in 0..num_fragments { - let fragment = FragmentHeader::new(acked_header.standard_header, fragment_id, num_fragments, acked_header); + let fragment = FragmentHeader::new( + acked_header.standard_header, + fragment_id, + num_fragments, + acked_header, + ); let mut buffer = Vec::with_capacity(fragment.size() as usize); fragment.parse(&mut buffer)?; @@ -90,7 +104,10 @@ impl Fragmentation { } /// This will read fragment data and returns the complete packet data when all fragments are received. - pub fn handle_fragment(&mut self, cursor: &mut Cursor<&[u8]>) -> NetworkResult>> { + pub fn handle_fragment( + &mut self, + cursor: &mut Cursor<&[u8]>, + ) -> NetworkResult>> { // read fragment packet let fragment_header = FragmentHeader::read(cursor)?; @@ -146,7 +163,10 @@ impl Fragmentation { } /// If fragment does not exist we need to insert a new entry. - fn create_fragment_if_not_exists(&mut self, fragment_header: &FragmentHeader) -> NetworkResult<()> { + fn create_fragment_if_not_exists( + &mut self, + fragment_header: &FragmentHeader, + ) -> NetworkResult<()> { if !self.fragments.exists(fragment_header.sequence()) { if fragment_header.id() == 0 { match fragment_header.packet_header() { @@ -157,7 +177,8 @@ impl Fragmentation { (9 + self.config.fragment_size) as usize, ); - self.fragments.insert(reassembly_data.clone(), fragment_header.sequence()); + self.fragments + .insert(reassembly_data.clone(), fragment_header.sequence()); } None => Err(FragmentErrorKind::PacketHeaderNotFound)?, } @@ -182,4 +203,4 @@ mod test { assert_eq!(fragment_number, 4); assert_eq!(fragment_number1, 1); } -} \ No newline at end of file +} diff --git a/src/infrastructure/mod.rs b/src/infrastructure/mod.rs index 0316956f..d026845b 100644 --- a/src/infrastructure/mod.rs +++ b/src/infrastructure/mod.rs @@ -1,10 +1,9 @@ +mod channels; ///! This module provides infrastructure logic. With infrastructure is meant, everything that's responsible for the packet flow and processing. - mod delivery_method; mod fragmenter; -mod channels; -pub use self::channels::{ReliableChannel, UnreliableChannel, SequencedChannel}; +pub use self::channels::Channel; +pub use self::channels::{ReliableChannel, SequencedChannel, UnreliableChannel}; pub use self::delivery_method::DeliveryMethod; pub use self::fragmenter::Fragmentation; -pub use self::channels::Channel; diff --git a/src/net/connection/connection_pool.rs b/src/net/connection/connection_pool.rs index fa3ce795..b3174974 100644 --- a/src/net/connection/connection_pool.rs +++ b/src/net/connection/connection_pool.rs @@ -1,16 +1,16 @@ use super::VirtualConnection; -use error::{NetworkResult, NetworkError}; +use error::{NetworkError, NetworkResult}; use events::Event; use net::NetworkConfig; +use log::info; use std::collections::HashMap; +use std::error::Error; use std::net::SocketAddr; use std::sync::mpsc::Sender; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; -use std::error::Error; -use log::{info}; pub type Connection = Arc>; pub type Connections = HashMap; @@ -42,7 +42,7 @@ impl ConnectionPool { connections: Arc::new(RwLock::new(HashMap::new())), sleepy_time, poll_interval, - config: config.clone() + config: config.clone(), } } @@ -64,8 +64,8 @@ impl ConnectionPool { #[allow(dead_code)] pub fn count(&self) -> usize { match self.connections.read() { - Ok(connections) => { connections.len() }, - Err(_) => { 0 }, + Ok(connections) => connections.len(), + Err(_) => 0, } } @@ -86,7 +86,8 @@ impl ConnectionPool { Ok(thread::Builder::new() .name("check_for_timeouts".into()) .spawn(move || loop { - let timed_out_clients = ConnectionPool::check_for_timeouts(&connections, poll_interval, &events_sender); + let timed_out_clients = + ConnectionPool::check_for_timeouts(&connections, poll_interval, &events_sender); if !timed_out_clients.is_empty() { match connections.write() { @@ -95,9 +96,7 @@ impl ConnectionPool { connections.remove(&timed_out_client); } } - Err(e) => { - panic!("Error when checking for timed out connections: {}", e) - } + Err(e) => panic!("Error when checking for timed out connections: {}", e), } } @@ -130,9 +129,7 @@ impl ConnectionPool { } } } - Err(e) => { - panic!("Error when checking for timed out connections: {}", e) - } + Err(e) => panic!("Error when checking for timed out connections: {}", e), } timed_out_clients diff --git a/src/net/connection/quality.rs b/src/net/connection/quality.rs index 3f0422b3..a9dd1f7f 100644 --- a/src/net/connection/quality.rs +++ b/src/net/connection/quality.rs @@ -1,8 +1,8 @@ use net::NetworkConfig; use sequence_buffer::CongestionData; -use std::time::Duration; use std::sync::Arc; +use std::time::Duration; /// Represents the quality of a network. pub enum NetworkQuality { @@ -21,15 +21,14 @@ pub struct RttMeasurer { impl RttMeasurer { /// Creates and returns a new RttMeasurer pub fn new(config: &Arc) -> RttMeasurer { - RttMeasurer { config: config.clone() } + RttMeasurer { + config: config.clone(), + } } /// This will calculate the round trip time (rtt) from the given acknowledgement. /// Where after it updates the rtt from the given connection. - pub fn get_rtt( - &self, - congestion_data: Option<&mut CongestionData> - ) -> f32 { + pub fn get_rtt(&self, congestion_data: Option<&mut CongestionData>) -> f32 { self.get_smoothed_rtt(congestion_data) } @@ -72,12 +71,12 @@ impl RttMeasurer { #[cfg(test)] mod test { - use net::connection::{VirtualConnection}; - use net::NetworkConfig; use super::RttMeasurer; + use net::connection::VirtualConnection; + use net::NetworkConfig; use std::net::ToSocketAddrs; - use std::time::Duration; use std::sync::Arc; + use std::time::Duration; static TEST_HOST_IP: &'static str = "127.0.0.1"; static TEST_PORT: &'static str = "20000"; @@ -87,7 +86,8 @@ mod test { let mut addr = format!("{}:{}", TEST_HOST_IP, TEST_PORT) .to_socket_addrs() .unwrap(); - let _new_conn = VirtualConnection::new(addr.next().unwrap(), &Arc::new(NetworkConfig::default())); + let _new_conn = + VirtualConnection::new(addr.next().unwrap(), &Arc::new(NetworkConfig::default())); } #[test] diff --git a/src/net/connection/virtual_connection.rs b/src/net/connection/virtual_connection.rs index 95e4c646..f0a5ca07 100644 --- a/src/net/connection/virtual_connection.rs +++ b/src/net/connection/virtual_connection.rs @@ -1,17 +1,19 @@ +use error::{NetworkErrorKind, NetworkResult}; +use infrastructure::{ + Channel, DeliveryMethod, Fragmentation, ReliableChannel, SequencedChannel, UnreliableChannel, +}; use net::NetworkConfig; -use infrastructure::{UnreliableChannel, SequencedChannel, ReliableChannel, Fragmentation, DeliveryMethod, Channel}; -use error::{NetworkResult, NetworkErrorKind}; +use packet::header::HeaderReader; use packet::header::StandardHeader; -use packet::{Packet, PacketTypeId, PacketData}; +use packet::{Packet, PacketData, PacketTypeId}; use protocol_version::ProtocolVersion; -use packet::header::HeaderReader; -use std::io::Cursor; +use log::error; use std::fmt; +use std::io::Cursor; use std::net::SocketAddr; -use std::time::{Duration, Instant}; use std::sync::Arc; -use log::error; +use std::time::{Duration, Instant}; /// Contains the information about a certain 'virtual connection' over udp. /// This connections also keeps track of network quality, processing packets, buffering data related to connection etc. @@ -57,11 +59,15 @@ impl VirtualConnection { /// This pre-process the given buffer to be send over the network. /// 1. It will append the right header. /// 2. It will perform some actions related to how the packet should be delivered. - pub fn process_outgoing(&mut self, payload: &[u8], delivery_method: DeliveryMethod) -> NetworkResult { + pub fn process_outgoing( + &mut self, + payload: &[u8], + delivery_method: DeliveryMethod, + ) -> NetworkResult { let channel: &mut Channel = match delivery_method { - DeliveryMethod::UnreliableUnordered => { &mut self.unreliable_unordered_channel }, - DeliveryMethod::ReliableUnordered => { &mut self.reliable_unordered_channel }, - DeliveryMethod::Sequenced => { &mut self.sequenced_channel }, + DeliveryMethod::UnreliableUnordered => &mut self.unreliable_unordered_channel, + DeliveryMethod::ReliableUnordered => &mut self.reliable_unordered_channel, + DeliveryMethod::Sequenced => &mut self.sequenced_channel, _ => { error!("Tried using channel type which is not supported yet. Swished to unreliable unordered packet handling."); &mut self.unreliable_unordered_channel @@ -91,7 +97,13 @@ impl VirtualConnection { if header.packet_type_id == PacketTypeId::Fragment { cursor.set_position(0); match self.fragmentation.handle_fragment(&mut cursor) { - Ok(Some(payload)) => return Ok(Some(Packet::new(self.remote_address, payload.into_boxed_slice(), header.delivery_method))), + Ok(Some(payload)) => { + return Ok(Some(Packet::new( + self.remote_address, + payload.into_boxed_slice(), + header.delivery_method, + ))) + } Ok(None) => return Ok(None), Err(e) => return Err(e), } @@ -99,9 +111,9 @@ impl VirtualConnection { // get the right channel to process the packet. let channel: &mut Channel = match header.delivery_method { - DeliveryMethod::UnreliableUnordered => { &mut self.unreliable_unordered_channel }, - DeliveryMethod::ReliableUnordered => { &mut self.reliable_unordered_channel }, - DeliveryMethod::Sequenced => { &mut self.sequenced_channel }, + DeliveryMethod::UnreliableUnordered => &mut self.unreliable_unordered_channel, + DeliveryMethod::ReliableUnordered => &mut self.reliable_unordered_channel, + DeliveryMethod::Sequenced => &mut self.sequenced_channel, _ => { error!("Tried using channel type which is not supported yet. Swished to unreliable unordered packet handling."); &mut self.unreliable_unordered_channel @@ -110,7 +122,11 @@ impl VirtualConnection { let payload = channel.process_incoming(received_data)?; - Ok(Some(Packet::new(self.remote_address, Box::from(payload), header.delivery_method))) + Ok(Some(Packet::new( + self.remote_address, + Box::from(payload), + header.delivery_method, + ))) } /// This will gather dropped packets from the reliable channels. @@ -138,18 +154,25 @@ impl fmt::Debug for VirtualConnection { #[cfg(test)] mod tests { - use net::NetworkConfig; + use infrastructure::DeliveryMethod; use net::connection::VirtualConnection; - use infrastructure::{DeliveryMethod}; + use net::NetworkConfig; use std::sync::Arc; const SERVER_ADDR: &str = "127.0.0.1:12345"; fn create_virtual_connection() -> VirtualConnection { - VirtualConnection::new(SERVER_ADDR.parse().unwrap(), &Arc::new(NetworkConfig::default())) + VirtualConnection::new( + SERVER_ADDR.parse().unwrap(), + &Arc::new(NetworkConfig::default()), + ) } - fn assert_packet_payload(buffer: &[u8], parts: &Vec>, connection: &mut VirtualConnection){ + fn assert_packet_payload( + buffer: &[u8], + parts: &Vec>, + connection: &mut VirtualConnection, + ) { for part in parts { let packet = connection.process_incoming(&part).unwrap().unwrap(); assert_eq!(buffer, packet.payload()); @@ -162,7 +185,9 @@ mod tests { let buffer = vec![1; 500]; - let mut packet_data = connection.process_outgoing(&buffer, DeliveryMethod::UnreliableUnordered).unwrap(); + let mut packet_data = connection + .process_outgoing(&buffer, DeliveryMethod::UnreliableUnordered) + .unwrap(); assert_eq!(packet_data.fragment_count(), 1); assert_packet_payload(&buffer, packet_data.parts(), &mut connection); @@ -172,10 +197,11 @@ mod tests { fn process_reliable_unordered_packet() { let mut connection = create_virtual_connection(); - let buffer = vec![1; 500]; - let mut packet_data = connection.process_outgoing(&buffer, DeliveryMethod::ReliableUnordered).unwrap(); + let mut packet_data = connection + .process_outgoing(&buffer, DeliveryMethod::ReliableUnordered) + .unwrap(); assert_eq!(packet_data.fragment_count(), 1); assert_packet_payload(&buffer, packet_data.parts(), &mut connection); @@ -187,7 +213,9 @@ mod tests { let buffer = vec![1; 4000]; - let mut packet_data = connection.process_outgoing(&buffer, DeliveryMethod::ReliableUnordered).unwrap(); + let mut packet_data = connection + .process_outgoing(&buffer, DeliveryMethod::ReliableUnordered) + .unwrap(); // there should be 4 fragments assert_eq!(packet_data.fragment_count(), 4); @@ -198,8 +226,14 @@ mod tests { // take note index 3 will contain the fragment data because the bytes of the fragmented packet will be returned when all fragments are processed. // that is why the last packet (index 3) can be asserted on. match option { - None => if index < 3 { assert!(true) } else { assert!(false) } - Some(packet) => if index == 3 { assert_eq!(buffer, packet.payload()); } + None => if index < 3 { + assert!(true) + } else { + assert!(false) + }, + Some(packet) => if index == 3 { + assert_eq!(buffer, packet.payload()); + }, } } } diff --git a/src/net/local_ack.rs b/src/net/local_ack.rs index 9eacc300..577063ae 100644 --- a/src/net/local_ack.rs +++ b/src/net/local_ack.rs @@ -141,7 +141,7 @@ mod test { fn drops_old_packets() { let mut record: LocalAckRecord = Default::default(); record.enqueue(0, &Vec::new()); - record.enqueue(40,&Vec::new()); + record.enqueue(40, &Vec::new()); let dropped = record.ack(40, 0); assert_eq!(dropped, vec![(0, Vec::new().into_boxed_slice())]); assert!(record.is_empty()); diff --git a/src/net/mod.rs b/src/net/mod.rs index ad49d742..f43e7bcf 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,15 +1,14 @@ +mod connection; mod external_ack; mod link_conditioner; mod local_ack; mod network_config; -mod connection; mod udp; /// Contains useful constants pub mod constants; - -pub use self::connection::{NetworkQuality, VirtualConnection, RttMeasurer}; +pub use self::connection::{NetworkQuality, RttMeasurer, VirtualConnection}; pub use self::external_ack::ExternalAcks; pub use self::local_ack::LocalAckRecord; pub use self::network_config::NetworkConfig; diff --git a/src/net/udp.rs b/src/net/udp.rs index e935c23c..0d740cf0 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -1,5 +1,5 @@ -use std::net::{self, ToSocketAddrs, SocketAddr}; use net::connection::ConnectionPool; +use std::net::{self, SocketAddr, ToSocketAddrs}; use error::{NetworkError, NetworkErrorKind, NetworkResult}; use events::Event; @@ -7,10 +7,10 @@ use net::link_conditioner::LinkConditioner; use net::NetworkConfig; use packet::Packet; +use std::error::Error; use std::sync::mpsc::{self, Receiver, Sender}; use std::sync::Arc; use std::thread; -use std::error::Error; /// Represents an : combination listening for UDP traffic pub struct UdpSocket { @@ -20,7 +20,7 @@ pub struct UdpSocket { link_conditioner: Option, _timeout_check_thread: thread::JoinHandle<()>, events: (Sender, Receiver), - connections: ConnectionPool + connections: ConnectionPool, } impl UdpSocket { @@ -36,7 +36,7 @@ impl UdpSocket { Ok(UdpSocket { socket, - recv_buffer: vec![0;config.receive_buffer_max_size], + recv_buffer: vec![0; config.receive_buffer_max_size], _config: config.clone(), link_conditioner: None, connections: connection_pool, @@ -47,9 +47,7 @@ impl UdpSocket { /// Receives a single datagram message on the socket. On success, returns the packet containing origin and data. pub fn recv(&mut self) -> NetworkResult> { - let (len, addr) = self - .socket - .recv_from(&mut self.recv_buffer)?; + let (len, addr) = self.socket.recv_from(&mut self.recv_buffer)?; if len > 0 { let packet = &self.recv_buffer[..len]; @@ -60,7 +58,6 @@ impl UdpSocket { .map_err(|error| NetworkError::poisoned_connection_error(error.description()))?; lock.process_incoming(&packet) - } else { Err(NetworkErrorKind::ReceivedDataToShort)? } @@ -97,7 +94,7 @@ impl UdpSocket { } /// Send a single packet over the udp socket. - fn send_packet(&self, addr: &SocketAddr, payload: &[u8]) -> NetworkResult { + fn send_packet(&self, addr: &SocketAddr, payload: &[u8]) -> NetworkResult { let mut bytes_sent = 0; bytes_sent += self diff --git a/src/packet/header/fragment_header.rs b/src/packet/header/fragment_header.rs index 1534b13a..8059ce06 100644 --- a/src/packet/header/fragment_header.rs +++ b/src/packet/header/fragment_header.rs @@ -1,9 +1,9 @@ -use super::{HeaderParser, HeaderReader, AckedPacketHeader, StandardHeader}; -use net::constants::FRAGMENT_HEADER_SIZE; +use super::{AckedPacketHeader, HeaderParser, HeaderReader, StandardHeader}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use error::{NetworkResult, FragmentErrorKind}; -use std::io::Cursor; +use error::{FragmentErrorKind, NetworkResult}; use log::error; +use net::constants::FRAGMENT_HEADER_SIZE; +use std::io::Cursor; #[derive(Copy, Clone, Debug)] /// This header represents a fragmented packet header. @@ -17,7 +17,12 @@ pub struct FragmentHeader { impl FragmentHeader { /// Create new fragment with the given packet header - pub fn new(standard_header: StandardHeader, id: u8, num_fragments: u8, packet_header: AckedPacketHeader) -> Self { + pub fn new( + standard_header: StandardHeader, + id: u8, + num_fragments: u8, + packet_header: AckedPacketHeader, + ) -> Self { FragmentHeader { standard_header, id, @@ -114,21 +119,24 @@ impl HeaderReader for FragmentHeader { #[cfg(test)] mod tests { - use packet::header::{FragmentHeader, HeaderParser, HeaderReader, AckedPacketHeader, StandardHeader}; use infrastructure::DeliveryMethod; + use packet::header::{ + AckedPacketHeader, FragmentHeader, HeaderParser, HeaderReader, StandardHeader, + }; use packet::PacketTypeId; use std::io::Cursor; #[test] pub fn serializes_deserialize_fragment_header_test() { // create default header - let standard_header = StandardHeader::new(DeliveryMethod::UnreliableUnordered, PacketTypeId::Fragment); + let standard_header = + StandardHeader::new(DeliveryMethod::UnreliableUnordered, PacketTypeId::Fragment); let packet_header = AckedPacketHeader::new(standard_header.clone(), 1, 1, 5421); // create fragment header with the default header and acked header. let fragment = FragmentHeader::new(standard_header.clone(), 0, 1, packet_header.clone()); - let mut fragment_buffer= Vec::with_capacity((fragment.size() + 1) as usize); + let mut fragment_buffer = Vec::with_capacity((fragment.size() + 1) as usize); fragment.parse(&mut fragment_buffer).unwrap(); let mut cursor: Cursor<&[u8]> = Cursor::new(fragment_buffer.as_slice()); diff --git a/src/packet/header/mod.rs b/src/packet/header/mod.rs index b0733135..286472c1 100644 --- a/src/packet/header/mod.rs +++ b/src/packet/header/mod.rs @@ -1,13 +1,13 @@ -mod header_parser; -mod header_reader; mod acked_packet_header; mod fragment_header; +mod header_parser; +mod header_reader; mod heart_beat_header; mod standard_header; -pub use self::header_parser::HeaderParser; -pub use self::header_reader::HeaderReader; pub use self::acked_packet_header::AckedPacketHeader; pub use self::fragment_header::FragmentHeader; +pub use self::header_parser::HeaderParser; +pub use self::header_reader::HeaderReader; pub use self::heart_beat_header::HeartBeatHeader; pub use self::standard_header::StandardHeader; diff --git a/src/packet/header/standard_header.rs b/src/packet/header/standard_header.rs index 3e8e6733..86ed341b 100644 --- a/src/packet/header/standard_header.rs +++ b/src/packet/header/standard_header.rs @@ -1,10 +1,10 @@ use super::{HeaderParser, HeaderReader}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use error::NetworkResult; +use infrastructure::DeliveryMethod; use net::constants::STANDARD_HEADER_SIZE; use packet::PacketTypeId; use protocol_version::ProtocolVersion; -use infrastructure::DeliveryMethod; use std::io::Cursor; #[derive(Copy, Clone, Debug)] @@ -87,9 +87,13 @@ mod tests { let mut cursor = Cursor::new(buffer.as_slice()); let packet_header = StandardHeader::read(&mut cursor).unwrap(); - assert!(ProtocolVersion::valid_version(packet_header.protocol_version)); + assert!(ProtocolVersion::valid_version( + packet_header.protocol_version + )); assert_eq!(packet_header.packet_type_id, PacketTypeId::Packet); - assert_eq!(packet_header.delivery_method, DeliveryMethod::UnreliableUnordered); + assert_eq!( + packet_header.delivery_method, + DeliveryMethod::UnreliableUnordered + ); } } - diff --git a/src/packet/mod.rs b/src/packet/mod.rs index c113885f..c4503e7a 100644 --- a/src/packet/mod.rs +++ b/src/packet/mod.rs @@ -1,11 +1,10 @@ /// Contains code dealing with Packet Headers pub mod header; -mod packet_structure; mod packet_data; +mod packet_structure; mod packet_type; pub use self::packet_data::PacketData; pub use self::packet_structure::Packet; -pub use self::packet_type::{PacketTypeId, PacketType}; - +pub use self::packet_type::{PacketType, PacketTypeId}; diff --git a/src/packet/packet_data.rs b/src/packet/packet_data.rs index 97fc7930..008b9166 100644 --- a/src/packet/packet_data.rs +++ b/src/packet/packet_data.rs @@ -1,5 +1,5 @@ -use std::io::Write; use error::NetworkResult; +use std::io::Write; /// Contains the raw data this packet exists of. Note that a packet can be divided into separate fragments #[derive(Debug, Default)] @@ -11,7 +11,7 @@ impl PacketData { /// Creates a new PacketData with a specified capacity pub fn with_capacity(size: usize) -> PacketData { PacketData { - parts: Vec::with_capacity(size) + parts: Vec::with_capacity(size), } } @@ -31,20 +31,20 @@ impl PacketData { /// Return the parts this packet exists of. pub fn parts(&mut self) -> &Vec> { - &self.parts + &self.parts } } #[cfg(test)] mod tests { use super::PacketData; - use packet::header::{AckedPacketHeader, StandardHeader, HeaderParser, HeaderReader}; + use packet::header::{AckedPacketHeader, HeaderParser, HeaderReader, StandardHeader}; #[test] fn add_ang_get_parts() { let acked_header = AckedPacketHeader::new(StandardHeader::default(), 1, 1, 5421); let mut buffer = Vec::new(); - let _ = acked_header.parse(&mut buffer); + let _ = acked_header.parse(&mut buffer); let mut packet_data = PacketData::with_capacity(acked_header.size() as usize); let _ = packet_data.add_fragment(&buffer, &vec![1, 2, 3, 4, 5]); @@ -54,8 +54,8 @@ mod tests { assert_eq!(packet_data.fragment_count(), 3); let _ = packet_data.parts().into_iter().map(|x| { - let _header = &x[0 .. acked_header.size() as usize]; - let body = &x[acked_header.size() as usize .. buffer.len()]; + let _header = &x[0..acked_header.size() as usize]; + let body = &x[acked_header.size() as usize..buffer.len()]; assert_eq!(body.to_vec(), vec![1, 2, 3, 4, 5]); }); } diff --git a/src/packet/packet_structure.rs b/src/packet/packet_structure.rs index d64714b6..6f73b93d 100644 --- a/src/packet/packet_structure.rs +++ b/src/packet/packet_structure.rs @@ -1,5 +1,5 @@ -use std::net::SocketAddr; use infrastructure::DeliveryMethod; +use std::net::SocketAddr; #[derive(Clone, PartialEq, Eq, Debug)] /// This is a user friendly packet containing the payload from the packet and the endpoint from where it came. @@ -32,7 +32,11 @@ impl Packet { /// /// Basically just bare UDP, free to be dropped, used for very unnecessary data, great for 'general' position updates. pub fn unreliable(addr: SocketAddr, payload: Vec) -> Packet { - Packet::new(addr,payload.into_boxed_slice(),DeliveryMethod::UnreliableUnordered) + Packet::new( + addr, + payload.into_boxed_slice(), + DeliveryMethod::UnreliableUnordered, + ) } /// Reliable. All packets will be sent and received, but without order. @@ -46,7 +50,11 @@ impl Packet { /// Basically this is almost TCP like without ordering of packets. /// Receive every packet and immediately give to application, order does not matter. pub fn reliable_unordered(addr: SocketAddr, payload: Vec) -> Packet { - Packet::new(addr,payload.into_boxed_slice(),DeliveryMethod::ReliableUnordered) + Packet::new( + addr, + payload.into_boxed_slice(), + DeliveryMethod::ReliableUnordered, + ) } /// Get the payload (raw data) of this packet. diff --git a/src/sequence_buffer/mod.rs b/src/sequence_buffer/mod.rs index eee7def6..43a0564c 100644 --- a/src/sequence_buffer/mod.rs +++ b/src/sequence_buffer/mod.rs @@ -4,4 +4,4 @@ mod sequence_buffer_structure; pub use self::congestion_data::CongestionData; pub use self::reassembly_data::ReassemblyData; -pub use self::sequence_buffer_structure::SequenceBuffer; \ No newline at end of file +pub use self::sequence_buffer_structure::SequenceBuffer; diff --git a/src/sequence_buffer/sequence_buffer_structure.rs b/src/sequence_buffer/sequence_buffer_structure.rs index 684b6f00..883d08de 100644 --- a/src/sequence_buffer/sequence_buffer_structure.rs +++ b/src/sequence_buffer/sequence_buffer_structure.rs @@ -1,12 +1,18 @@ use std::clone::Clone; /// Collection to store data of any kind. -pub struct SequenceBuffer where T: Default + Clone + Send + Sync { +pub struct SequenceBuffer +where + T: Default + Clone + Send + Sync, +{ entries: Vec, entry_sequences: Vec, } -impl SequenceBuffer where T: Default + Clone + Send + Sync { +impl SequenceBuffer +where + T: Default + Clone + Send + Sync, +{ /// Create collection with a specific capacity. pub fn with_capacity(size: usize) -> Self { let mut entries = Vec::with_capacity(size); @@ -80,8 +86,7 @@ mod tests { struct DataStub; #[test] - fn insert_into_fragment_buffer_test() - { + fn insert_into_fragment_buffer_test() { let mut fragment_buffer = SequenceBuffer::with_capacity(2); fragment_buffer.insert(DataStub, 1); assert!(fragment_buffer.exists(1));