diff --git a/src/simple_http.rs b/src/simple_http.rs index 2b708d93..19918365 100644 --- a/src/simple_http.rs +++ b/src/simple_http.rs @@ -133,6 +133,29 @@ impl SimpleHttpTransport { } } + #[cfg(feature = "proxy")] + fn fresh_socket(&self) -> Result { + let stream = if let Some((username, password)) = &self.proxy_auth { + Socks5Stream::connect_with_password( + self.proxy_addr, + self.addr, + username.as_str(), + password.as_str(), + )? + } else { + Socks5Stream::connect(self.proxy_addr, self.addr)? + }; + Ok(stream.into_inner()) + } + + #[cfg(not(feature = "proxy"))] + fn fresh_socket(&self) -> Result { + let stream = TcpStream::connect_timeout(&self.addr, self.timeout)?; + stream.set_read_timeout(Some(self.timeout))?; + stream.set_write_timeout(Some(self.timeout))?; + Ok(stream) + } + fn try_request( &self, req: impl serde::Serialize, @@ -143,30 +166,7 @@ impl SimpleHttpTransport { // No part of this codebase should panic, so unwrapping a mutex lock is fine let mut sock_lock: MutexGuard> = self.sock.lock().expect("poisoned mutex"); if sock_lock.is_none() { - *sock_lock = Some(BufReader::new({ - #[cfg(feature = "proxy")] - { - if let Some((username, password)) = &self.proxy_auth { - Socks5Stream::connect_with_password( - self.proxy_addr, - self.addr, - username.as_str(), - password.as_str(), - )? - .into_inner() - } else { - Socks5Stream::connect(self.proxy_addr, self.addr)?.into_inner() - } - } - - #[cfg(not(feature = "proxy"))] - { - let stream = TcpStream::connect_timeout(&self.addr, self.timeout)?; - stream.set_read_timeout(Some(self.timeout))?; - stream.set_write_timeout(Some(self.timeout))?; - stream - } - })); + *sock_lock = Some(BufReader::new(self.fresh_socket()?)); }; // In the immediately preceding block, we made sure that `sock` is non-`None`, // so unwrapping here is fine. @@ -177,24 +177,35 @@ impl SimpleHttpTransport { // Send HTTP request { - let mut sock = BufWriter::new(sock.get_mut()); - sock.write_all(b"POST ")?; - sock.write_all(self.path.as_bytes())?; - sock.write_all(b" HTTP/1.1\r\n")?; + let mut write_sock = BufWriter::new(sock.get_mut()); + // When we write to a socket, it may have died but we do not detect it. In this case we + // want to detect this ASAP and reconnect. We do this by writing the literal text POST + // in two pieces and checking for error returns on either one, and retrying in this + // case. + // + // From http://www.softlab.ntua.gr/facilities/documentation/unix/unix-socket-faq/unix-socket-faq-2.html + // "If the peer calls close() or exits...I would expect EPIPE, not on the next call, + // but the one after." + if write_sock.write_all(b"PO").is_err() || write_sock.write_all(b"ST ").is_err() { + **write_sock.get_mut() = self.fresh_socket()?; + write_sock.write_all(b"POST ")?; + } + write_sock.write_all(self.path.as_bytes())?; + write_sock.write_all(b" HTTP/1.1\r\n")?; // Write headers - sock.write_all(b"Content-Type: application/json\r\n")?; - sock.write_all(b"Content-Length: ")?; - sock.write_all(body.len().to_string().as_bytes())?; - sock.write_all(b"\r\n")?; + write_sock.write_all(b"Content-Type: application/json\r\n")?; + write_sock.write_all(b"Content-Length: ")?; + write_sock.write_all(body.len().to_string().as_bytes())?; + write_sock.write_all(b"\r\n")?; if let Some(ref auth) = self.basic_auth { - sock.write_all(b"Authorization: ")?; - sock.write_all(auth.as_ref())?; - sock.write_all(b"\r\n")?; + write_sock.write_all(b"Authorization: ")?; + write_sock.write_all(auth.as_ref())?; + write_sock.write_all(b"\r\n")?; } // Write body - sock.write_all(b"\r\n")?; - sock.write_all(&body)?; - sock.flush()?; + write_sock.write_all(b"\r\n")?; + write_sock.write_all(&body)?; + write_sock.flush()?; } // Parse first HTTP response header line