Skip to content

Commit

Permalink
Fix "Try again" error during handshake for most cases
Browse files Browse the repository at this point in the history
  • Loading branch information
zmerp committed Aug 17, 2023
1 parent eaeb351 commit bbfc008
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 25 deletions.
7 changes: 4 additions & 3 deletions alvr/client_core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ fn connection_pipeline(
}),
})
.to_con()?;
let config_packet = proto_control_socket.recv::<StreamConfigPacket>()?;
let config_packet =
proto_control_socket.recv::<StreamConfigPacket>(HANDSHAKE_ACTION_TIMEOUT)?;

let settings = {
let mut session_desc = SessionConfig::default();
Expand Down Expand Up @@ -228,7 +229,7 @@ fn connection_pipeline(
.split(STREAMING_RECV_TIMEOUT)
.to_con()?;

match control_receiver.recv() {
match control_receiver.recv(HANDSHAKE_ACTION_TIMEOUT) {
Ok(ServerControlPacket::StartStream) => {
info!("Stream starting");
set_hud_message(STREAM_STARTING_MESSAGE);
Expand Down Expand Up @@ -463,7 +464,7 @@ fn connection_pipeline(

let control_receive_thread = thread::spawn(move || {
while IS_STREAMING.value() {
let maybe_packet = control_receiver.recv();
let maybe_packet = control_receiver.recv(STREAMING_RECV_TIMEOUT);

match maybe_packet {
Ok(ServerControlPacket::InitializeDecoder(config)) => {
Expand Down
15 changes: 5 additions & 10 deletions alvr/server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ fn try_connect(mut client_ips: HashMap<IpAddr, String>) -> ConResult {
display_name,
streaming_capabilities,
..
} = proto_socket.recv()?
} = proto_socket.recv(HANDSHAKE_ACTION_TIMEOUT)?
{
SERVER_DATA_MANAGER.write().update_client_list(
client_hostname.clone(),
Expand Down Expand Up @@ -511,14 +511,9 @@ fn try_connect(mut client_ips: HashMap<IpAddr, String>) -> ConResult {
.send(&ServerControlPacket::StartStream)
.to_con()?;

match control_receiver.recv() {
Ok(ClientControlPacket::StreamReady) => (),
Ok(_) => {
con_bail!("Got unexpected packet waiting for stream ack");
}
Err(e) => {
con_bail!("Error while waiting for stream ack: {e}");
}
let signal = control_receiver.recv(HANDSHAKE_ACTION_TIMEOUT)?;
if !matches!(signal, ClientControlPacket::StreamReady) {
con_bail!("Got unexpected packet waiting for stream ack");
}

*STATISTICS_MANAGER.lock() = Some(StatisticsManager::new(
Expand Down Expand Up @@ -858,7 +853,7 @@ fn try_connect(mut client_ips: HashMap<IpAddr, String>) -> ConResult {
let client_hostname = client_hostname.clone();
move || {
while IS_STREAMING.value() {
let packet = match control_receiver.recv() {
let packet = match control_receiver.recv(STREAMING_RECV_TIMEOUT) {
Ok(packet) => packet,
Err(ConnectionError::TryAgain(_)) => continue,
Err(e) => {
Expand Down
43 changes: 31 additions & 12 deletions alvr/sockets/src/control_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
marker::PhantomData,
mem,
net::{IpAddr, TcpListener, TcpStream},
time::Duration,
time::{Duration, Instant},
};

// This corresponds to the length of the payload
Expand Down Expand Up @@ -43,15 +43,24 @@ fn framed_recv<R: DeserializeOwned>(
socket: &mut TcpStream,
buffer: &mut Vec<u8>,
maybe_recv_state: &mut Option<RecvState>,
timeout: Duration,
) -> ConResult<R> {
let deadline = Instant::now() + timeout;

let recv_state_mut = if let Some(state) = maybe_recv_state {
state
} else {
let mut payload_length_bytes = [0; FRAMED_PREFIX_LENGTH];
let count = socket.peek(&mut payload_length_bytes).handle_try_again()?;
if count != FRAMED_PREFIX_LENGTH {
return alvr_common::try_again();

loop {
let count = socket.peek(&mut payload_length_bytes).handle_try_again()?;
if count == FRAMED_PREFIX_LENGTH {
break;
} else if Instant::now() > deadline {
return alvr_common::try_again();
}
}

let packet_length =
FRAMED_PREFIX_LENGTH + u32::from_be_bytes(payload_length_bytes) as usize;

Expand All @@ -65,10 +74,15 @@ fn framed_recv<R: DeserializeOwned>(
})
};

recv_state_mut.packet_cursor +=
socket.recv(&mut buffer[recv_state_mut.packet_cursor..recv_state_mut.packet_length])?;
if recv_state_mut.packet_cursor != recv_state_mut.packet_length {
return alvr_common::try_again();
loop {
recv_state_mut.packet_cursor +=
socket.recv(&mut buffer[recv_state_mut.packet_cursor..recv_state_mut.packet_length])?;

if recv_state_mut.packet_cursor == recv_state_mut.packet_length {
break;
} else if Instant::now() > deadline {
return alvr_common::try_again();
}
}

let data = bincode::deserialize(&buffer[FRAMED_PREFIX_LENGTH..recv_state_mut.packet_length])
Expand Down Expand Up @@ -99,8 +113,13 @@ pub struct ControlSocketReceiver<T> {
}

impl<R: DeserializeOwned> ControlSocketReceiver<R> {
pub fn recv(&mut self) -> ConResult<R> {
framed_recv(&mut self.inner, &mut self.buffer, &mut self.recv_state)
pub fn recv(&mut self, timeout: Duration) -> ConResult<R> {
framed_recv(
&mut self.inner,
&mut self.buffer,
&mut self.recv_state,
timeout,
)
}
}

Expand Down Expand Up @@ -151,8 +170,8 @@ impl ProtoControlSocket {
framed_send(&mut self.inner, &mut vec![], packet)
}

pub fn recv<R: DeserializeOwned>(&mut self) -> ConResult<R> {
framed_recv(&mut self.inner, &mut vec![], &mut None)
pub fn recv<R: DeserializeOwned>(&mut self, timeout: Duration) -> ConResult<R> {
framed_recv(&mut self.inner, &mut vec![], &mut None, timeout)
}

pub fn split<S: Serialize, R: DeserializeOwned>(
Expand Down

0 comments on commit bbfc008

Please sign in to comment.