diff --git a/alvr/client_core/src/connection.rs b/alvr/client_core/src/connection.rs index 5db56528a7..b6d4d8ee48 100644 --- a/alvr/client_core/src/connection.rs +++ b/alvr/client_core/src/connection.rs @@ -182,7 +182,8 @@ fn connection_pipeline( }), }) .to_con()?; - let config_packet = proto_control_socket.recv::()?; + let config_packet = + proto_control_socket.recv::(HANDSHAKE_ACTION_TIMEOUT)?; let settings = { let mut session_desc = SessionConfig::default(); @@ -228,7 +229,7 @@ fn connection_pipeline( .split(STREAMING_RECV_TIMEOUT) .to_con()?; - match control_receiver.recv() { + match control_receiver.recv(HANDSHAKE_ACTION_TIMEOUT) { Ok(ServerControlPacket::StartStream) => { info!("Stream starting"); set_hud_message(STREAM_STARTING_MESSAGE); @@ -463,7 +464,7 @@ fn connection_pipeline( let control_receive_thread = thread::spawn(move || { while IS_STREAMING.value() { - let maybe_packet = control_receiver.recv(); + let maybe_packet = control_receiver.recv(STREAMING_RECV_TIMEOUT); match maybe_packet { Ok(ServerControlPacket::InitializeDecoder(config)) => { diff --git a/alvr/server/src/connection.rs b/alvr/server/src/connection.rs index 8d5a613949..737f90c202 100644 --- a/alvr/server/src/connection.rs +++ b/alvr/server/src/connection.rs @@ -369,7 +369,7 @@ fn try_connect(mut client_ips: HashMap) -> ConResult { display_name, streaming_capabilities, .. - } = proto_socket.recv()? + } = proto_socket.recv(HANDSHAKE_ACTION_TIMEOUT)? { SERVER_DATA_MANAGER.write().update_client_list( client_hostname.clone(), @@ -511,14 +511,9 @@ fn try_connect(mut client_ips: HashMap) -> ConResult { .send(&ServerControlPacket::StartStream) .to_con()?; - match control_receiver.recv() { - Ok(ClientControlPacket::StreamReady) => (), - Ok(_) => { - con_bail!("Got unexpected packet waiting for stream ack"); - } - Err(e) => { - con_bail!("Error while waiting for stream ack: {e}"); - } + let signal = control_receiver.recv(HANDSHAKE_ACTION_TIMEOUT)?; + if !matches!(signal, ClientControlPacket::StreamReady) { + con_bail!("Got unexpected packet waiting for stream ack"); } *STATISTICS_MANAGER.lock() = Some(StatisticsManager::new( @@ -858,7 +853,7 @@ fn try_connect(mut client_ips: HashMap) -> ConResult { let client_hostname = client_hostname.clone(); move || { while IS_STREAMING.value() { - let packet = match control_receiver.recv() { + let packet = match control_receiver.recv(STREAMING_RECV_TIMEOUT) { Ok(packet) => packet, Err(ConnectionError::TryAgain(_)) => continue, Err(e) => { diff --git a/alvr/sockets/src/control_socket.rs b/alvr/sockets/src/control_socket.rs index 2102fa5c3e..11703ab58c 100644 --- a/alvr/sockets/src/control_socket.rs +++ b/alvr/sockets/src/control_socket.rs @@ -8,7 +8,7 @@ use std::{ marker::PhantomData, mem, net::{IpAddr, TcpListener, TcpStream}, - time::Duration, + time::{Duration, Instant}, }; // This corresponds to the length of the payload @@ -43,15 +43,24 @@ fn framed_recv( socket: &mut TcpStream, buffer: &mut Vec, maybe_recv_state: &mut Option, + timeout: Duration, ) -> ConResult { + let deadline = Instant::now() + timeout; + let recv_state_mut = if let Some(state) = maybe_recv_state { state } else { let mut payload_length_bytes = [0; FRAMED_PREFIX_LENGTH]; - let count = socket.peek(&mut payload_length_bytes).handle_try_again()?; - if count != FRAMED_PREFIX_LENGTH { - return alvr_common::try_again(); + + loop { + let count = socket.peek(&mut payload_length_bytes).handle_try_again()?; + if count == FRAMED_PREFIX_LENGTH { + break; + } else if Instant::now() > deadline { + return alvr_common::try_again(); + } } + let packet_length = FRAMED_PREFIX_LENGTH + u32::from_be_bytes(payload_length_bytes) as usize; @@ -65,10 +74,15 @@ fn framed_recv( }) }; - recv_state_mut.packet_cursor += - socket.recv(&mut buffer[recv_state_mut.packet_cursor..recv_state_mut.packet_length])?; - if recv_state_mut.packet_cursor != recv_state_mut.packet_length { - return alvr_common::try_again(); + loop { + recv_state_mut.packet_cursor += + socket.recv(&mut buffer[recv_state_mut.packet_cursor..recv_state_mut.packet_length])?; + + if recv_state_mut.packet_cursor == recv_state_mut.packet_length { + break; + } else if Instant::now() > deadline { + return alvr_common::try_again(); + } } let data = bincode::deserialize(&buffer[FRAMED_PREFIX_LENGTH..recv_state_mut.packet_length]) @@ -99,8 +113,13 @@ pub struct ControlSocketReceiver { } impl ControlSocketReceiver { - pub fn recv(&mut self) -> ConResult { - framed_recv(&mut self.inner, &mut self.buffer, &mut self.recv_state) + pub fn recv(&mut self, timeout: Duration) -> ConResult { + framed_recv( + &mut self.inner, + &mut self.buffer, + &mut self.recv_state, + timeout, + ) } } @@ -151,8 +170,8 @@ impl ProtoControlSocket { framed_send(&mut self.inner, &mut vec![], packet) } - pub fn recv(&mut self) -> ConResult { - framed_recv(&mut self.inner, &mut vec![], &mut None) + pub fn recv(&mut self, timeout: Duration) -> ConResult { + framed_recv(&mut self.inner, &mut vec![], &mut None, timeout) } pub fn split(