-
Notifications
You must be signed in to change notification settings - Fork 0
/
e2e_server_and_client_transport.rs
120 lines (103 loc) · 4.28 KB
/
e2e_server_and_client_transport.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use comms::{
command::{self, UserCommand},
event::{self, Event},
transport,
};
use tokio::net::{TcpListener, TcpStream};
use tokio_stream::StreamExt;
const PORT: usize = 8081;
#[tokio::test]
async fn assert_server_client_transport() {
let (server_collected_commands, client_collected_events) =
tokio::join!(execute_server(), execute_client());
assert!(server_collected_commands.is_ok());
assert!(client_collected_events.is_ok());
assert_eq!(
server_collected_commands.unwrap(),
vec![
UserCommand::JoinRoom(command::JoinRoomCommand {
room: "room-1".into(),
}),
UserCommand::SendMessage(command::SendMessageCommand {
room: "room-1".into(),
content: "content-1".into(),
}),
]
);
assert_eq!(
client_collected_events.unwrap(),
vec![Event::LoginSuccessful(event::LoginSuccessfulReplyEvent {
user_id: "user-id-1".into(),
session_id: "session-id-1".into(),
rooms: Vec::default(),
}),]
);
}
async fn execute_server() -> anyhow::Result<Vec<command::UserCommand>> {
// bind to the example port to wait for client connection
let listener = TcpListener::bind(format!("0.0.0.0:{}", PORT))
.await
.expect("could not bind to the port");
// accept the only client connection we will have
let tcp_stream = match listener.accept().await {
Ok((tcp_stream, _addr)) => tcp_stream,
Err(e) => return Err(anyhow::anyhow!("failed to accept client: {}", e)),
};
// break the client connection into higher level API for ease of use
let (mut command_stream, mut event_writer) = transport::server::split_tcp_stream(tcp_stream);
// store commands received from the client
let mut collected_commands = Vec::new();
// welcome the user with some login successful reply event
event_writer
.write(&Event::LoginSuccessful(event::LoginSuccessfulReplyEvent {
user_id: "user-id-1".into(),
session_id: "session-id-1".into(),
rooms: Vec::default(),
}))
.await?;
// listen for commands from the client until the connection is closed
while let Some(result) = command_stream.next().await {
match result {
// client has sent a valid command which we could read and parse
Ok(command) => collected_commands.push(command),
// client has sent a command which we could not read or parse
// could be a bug in the client, malicious client, breaking api changes etc.
Err(e) => return Err(anyhow::anyhow!("failed to read command: {}", e)),
}
}
Ok(collected_commands)
}
async fn execute_client() -> anyhow::Result<Vec<event::Event>> {
// create a client connection to the server
let tcp_stream = match TcpStream::connect(format!("localhost:{}", PORT)).await {
Ok(tcp_stream) => tcp_stream,
Err(e) => return Err(anyhow::anyhow!("failed to connect to server: {}", e)),
};
// break the server connection into higher level API for ease of use
let (mut event_stream, mut command_writer) = transport::client::split_tcp_stream(tcp_stream);
// store events received from the server
let mut collected_events = Vec::new();
// read the welcome event from the server
match event_stream.next().await {
// server has sent a valid event which we could read and parse
Some(Ok(event)) => collected_events.push(event),
// server has sent an event which we could not read or parse
// could be a bug in the server, malicious server, breaking api changes etc.
Some(Err(e)) => return Err(anyhow::anyhow!("could not parse event: {}", e)),
// server has closed the connection, return an error
None => return Err(anyhow::anyhow!("server closed the connection")),
}
// send some commands to the server
command_writer
.write(&UserCommand::JoinRoom(command::JoinRoomCommand {
room: "room-1".into(),
}))
.await?;
command_writer
.write(&UserCommand::SendMessage(command::SendMessageCommand {
room: "room-1".into(),
content: "content-1".into(),
}))
.await?;
Ok(collected_events)
}