Skip to content

Commit

Permalink
Merge #138
Browse files Browse the repository at this point in the history
138: Use Mio socket rather than a blocking socket from the std library r=timonpost,fhaynes a=jstnlef

Based on what @TimonPost mentioned the other day, I took another look at how I could put together a minimum PR for the changes I made to my own re-implementation of Laminar using a mio socket while cleaning up the mutexes. This is that minimum PR. That said, a few of the naming changes did make it in because I believe that the present names are misleading. However, some of the larger restructuring can be held off for another time.
One of the interesting upshots I've noticed is that I've yet to run into [this issue](#134) running the same test.

Co-authored-by: Justin LeFebvre <[email protected]>
  • Loading branch information
bors[bot] and jstnlef committed Feb 16, 2019
2 parents 54e159a + c10529f commit ce36c22
Show file tree
Hide file tree
Showing 34 changed files with 616 additions and 809 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
[package]
name = "laminar"
version = "0.1.0"
version = "0.2.0"
authors = [
"Lucio Franco <[email protected]>",
"Fletcher Haynes <[email protected]>",
"TimonPost <[email protected]>",
"Justin LeFebvre <[email protected]>"
]
description = "A simple semi-reliable UDP protocol for multiplayer games"
keywords = ["gamedev", "networking", "udp", "amethyst"]
Expand All @@ -24,10 +25,12 @@ travis-ci = { repository = "amethyst/laminar", branch = "master" }
[dependencies]
byteorder = "1.2"
crc = "1.8"
crossbeam-channel = "0.3"
failure = "0.1"
failure_derive = "0.1"
lazy_static = "1.1"
log = "0.4"
mio = "0.6"
rand = "0.5"
clap = { version = "2.32", features = ["yaml"], optional = true }
env_logger = { version = "0.6", optional = true }
Expand Down
32 changes: 12 additions & 20 deletions benches/packet_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};

use laminar::{net::VirtualConnection, DeliveryMethod, NetworkConfig, ProtocolVersion};
use laminar::{net::VirtualConnection, Config, DeliveryMethod, ProtocolVersion};

use criterion::{criterion_group, criterion_main, Criterion};

Expand All @@ -11,7 +11,7 @@ const CLIENT_ADDR: &str = "127.0.0.1:12346";

fn process_packet_before_send(
connection: &mut VirtualConnection,
_config: &NetworkConfig,
_config: &Config,
delivery_method: DeliveryMethod,
) {
let payload = vec![1, 2, 3, 4, 5];
Expand All @@ -22,11 +22,9 @@ fn process_packet_before_send(
}

fn send_unreliable_benchmark(c: &mut Criterion) {
let config = NetworkConfig::default();
let mut connection = VirtualConnection::new(
SERVER_ADDR.parse().unwrap(),
Arc::new(NetworkConfig::default()),
);
let config = Config::default();
let mut connection =
VirtualConnection::new(SERVER_ADDR.parse().unwrap(), Arc::new(Config::default()));

c.bench_function("process unreliable before send", move |b| {
b.iter(|| {
Expand All @@ -40,11 +38,9 @@ fn send_unreliable_benchmark(c: &mut Criterion) {
}

fn send_reliable_benchmark(c: &mut Criterion) {
let config = NetworkConfig::default();
let mut connection = VirtualConnection::new(
SERVER_ADDR.parse().unwrap(),
Arc::new(NetworkConfig::default()),
);
let config = Config::default();
let mut connection =
VirtualConnection::new(SERVER_ADDR.parse().unwrap(), Arc::new(Config::default()));

c.bench_function("process reliable before send", move |b| {
b.iter(|| {
Expand Down Expand Up @@ -84,10 +80,8 @@ fn acked_header_bytes(
}

fn receive_unreliable_benchmark(c: &mut Criterion) {
let mut connection = VirtualConnection::new(
SERVER_ADDR.parse().unwrap(),
Arc::new(NetworkConfig::default()),
);
let mut connection =
VirtualConnection::new(SERVER_ADDR.parse().unwrap(), Arc::new(Config::default()));

// setup fake received bytes.
let mut buffer = standard_header_bytes(DeliveryMethod::UnreliableUnordered);
Expand All @@ -99,10 +93,8 @@ fn receive_unreliable_benchmark(c: &mut Criterion) {
}

fn receive_reliable_benchmark(c: &mut Criterion) {
let mut connection = VirtualConnection::new(
SERVER_ADDR.parse().unwrap(),
Arc::new(NetworkConfig::default()),
);
let mut connection =
VirtualConnection::new(SERVER_ADDR.parse().unwrap(), Arc::new(Config::default()));

// setup fake received bytes.
let mut buffer = acked_header_bytes(DeliveryMethod::ReliableUnordered, 0, 1, 2);
Expand Down
59 changes: 35 additions & 24 deletions examples/server_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
//! Note that in practice you don't want to implement a chat client using UDP.
use std::io::stdin;

use laminar::{config::NetworkConfig, error::Result, net::UdpSocket, Packet};
use laminar::{Config, NetworkError, Packet, Socket, SocketEvent};
use std::thread;

const SERVER: &str = "localhost:12351";
const SERVER: &str = "127.0.0.1:12351";

fn server() -> Result<()> {
let mut socket = UdpSocket::bind(SERVER, NetworkConfig::default())?;
fn server() -> Result<(), NetworkError> {
let (mut socket, packet_sender, event_receiver) = Socket::bind(SERVER, Config::default())?;
let _thread = thread::spawn(move || socket.start_polling());

println!("Listening for connections to {}", SERVER);

loop {
match socket.recv()? {
Some(packet) => {
match event_receiver.recv().expect("Should get a message") {
SocketEvent::Packet(packet) => {
let msg = packet.payload();

if msg == b"Bye!" {
Expand All @@ -26,22 +28,30 @@ fn server() -> Result<()> {

println!("Received {:?} from {:?}", msg, ip);

socket.send(&Packet::reliable_unordered(
packet.addr(),
"Copy that!".as_bytes().to_vec(),
))?;
packet_sender
.send(Packet::reliable_unordered(
packet.addr(),
"Copy that!".as_bytes().to_vec(),
))
.unwrap();
}
None => {}
SocketEvent::Timeout(address) => {
println!("Client timed out: {}", address);
}
_ => {}
}
}

Ok(())
}

fn client() -> Result<()> {
let mut socket = UdpSocket::bind("localhost:12352", NetworkConfig::default())?;
fn client() -> Result<(), NetworkError> {
let addr = "127.0.0.1:12352";
let (mut socket, packet_sender, event_receiver) = Socket::bind(addr, Config::default())?;
println!("Connected on {}", addr);
let _thread = thread::spawn(move || socket.start_polling());

let server = SERVER.parse()?;
let server = SERVER.parse().unwrap();

println!("Type a message and press Enter to send. Send `Bye!` to quit.");

Expand All @@ -53,33 +63,34 @@ fn client() -> Result<()> {
stdin.read_line(&mut s_buffer)?;
let line = s_buffer.replace(|x| x == '\n' || x == '\r', "");

socket.send(&Packet::reliable_unordered(
server,
line.clone().into_bytes(),
))?;
packet_sender
.send(Packet::reliable_unordered(
server,
line.clone().into_bytes(),
))
.unwrap();

if line == "Bye!" {
break;
}

let back = socket.recv()?;

match back {
Some(packet) => {
match event_receiver.recv().unwrap() {
SocketEvent::Packet(packet) => {
if packet.addr() == server {
println!("Server sent: {}", String::from_utf8_lossy(packet.payload()));
} else {
println!("Unknown sender.");
}
}
None => println!("Silence.."),
SocketEvent::Timeout(_) => {}
_ => println!("Silence.."),
}
}

Ok(())
}

fn main() -> Result<()> {
fn main() -> Result<(), NetworkError> {
let stdin = stdin();

println!("Please type in `server` or `client`.");
Expand Down
63 changes: 36 additions & 27 deletions examples/simple_udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
//! 2. setting up client to send data.
//! 3. serialize data to send and deserialize when received.
use bincode::{deserialize, serialize};
use laminar::config::NetworkConfig;
use laminar::{net::UdpSocket, Packet};
use crossbeam_channel::{Receiver, Sender};
use laminar::{Config, NetworkError, Packet, Socket, SocketEvent};
use serde_derive::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::{thread, time};
Expand All @@ -25,9 +25,9 @@ fn server_address() -> SocketAddr {
/// This will run an simple example with client and server communicating.
#[allow(unused_must_use)]
pub fn main() {
let mut server = Server::new();
// set up or `Server` that will receive the messages we send with the `Client`
let handle = thread::spawn(|| loop {
let mut server = Server::new();
let handle = thread::spawn(move || loop {
server.receive();
});

Expand Down Expand Up @@ -73,41 +73,46 @@ enum DataType {

/// This is an test server we use to receive data from clients.
struct Server {
udp_socket: UdpSocket,
_packet_sender: Sender<Packet>,
event_receiver: Receiver<SocketEvent>,
_polling_thread: thread::JoinHandle<Result<(), NetworkError>>,
}

impl Server {
#[allow(unused_must_use)]
pub fn new() -> Self {
// you can change the config but if you want just go for the default.
let config = NetworkConfig::default();
let config = Config::default();

// setup an udp socket and bind it to the client address.
let mut udp_socket: UdpSocket = UdpSocket::bind(server_address(), config).unwrap();

// next we could specify if or socket should block the current thread when receiving data or not (default = false)
udp_socket.set_nonblocking(false);

Server { udp_socket }
let (mut socket, packet_sender, event_receiver) =
Socket::bind(server_address(), config).unwrap();
let polling_thread = thread::spawn(move || socket.start_polling());
Server {
_packet_sender: packet_sender,
event_receiver,
_polling_thread: polling_thread,
}
}

/// Receive and block the current thread.
pub fn receive(&mut self) {
// Next start receiving.
let result = self.udp_socket.recv();
let result = self.event_receiver.recv();

match result {
Ok(Some(packet)) => {
Ok(SocketEvent::Packet(packet)) => {
let received_data: &[u8] = packet.payload();

// deserialize bytes to `DataType` we passed in with `Client.send()`.
let deserialized: DataType = deserialize(&received_data).unwrap();

self.perform_action(deserialized);
}
Ok(None) => {
println!("This could happen when we have'n received all data from this packet yet");
Ok(SocketEvent::Timeout(address)) => {
println!("A client timed out: {}", address);
}
Ok(_) => {}
Err(e) => {
println!("Something went wrong when receiving, error: {:?}", e);
}
Expand Down Expand Up @@ -136,23 +141,27 @@ impl Server {

/// This is an test client to send data to the server.
struct Client {
udp_socket: UdpSocket,
packet_sender: Sender<Packet>,
_event_receiver: Receiver<SocketEvent>,
_polling_thread: thread::JoinHandle<Result<(), NetworkError>>,
}

impl Client {
#[allow(unused_must_use)]
pub fn new() -> Self {
// you can change the config but if you want just go for the default.
let config = NetworkConfig::default();
let config = Config::default();

// setup an udp socket and bind it to the client address.
let mut udp_socket = UdpSocket::bind(client_address(), config).unwrap();

// next we could specify if or socket should block the current thread when receiving data or not (default = false)

udp_socket.set_nonblocking(false);

Client { udp_socket }
let (mut socket, packet_sender, event_receiver) =
Socket::bind(client_address(), config).unwrap();
let polling_thread = thread::spawn(move || socket.start_polling());

Client {
packet_sender,
_event_receiver: event_receiver,
_polling_thread: polling_thread,
}
}

#[allow(unused_must_use)]
Expand All @@ -161,8 +170,8 @@ impl Client {

match serialized {
Ok(raw_data) => {
self.udp_socket
.send(&Packet::reliable_unordered(server_address(), raw_data));
self.packet_sender
.send(Packet::reliable_unordered(server_address(), raw_data));
}
Err(e) => println!("Some error occurred: {:?}", e),
}
Expand Down
Loading

0 comments on commit ce36c22

Please sign in to comment.