From 63221028c9aa3a985dd375039002b28923cd1e42 Mon Sep 17 00:00:00 2001 From: Ryan Dearing Date: Tue, 13 Feb 2024 20:30:52 -0700 Subject: [PATCH] fix: flush DataChannelCmd::StartForward* commands (#316) Without flushing this may sit in a kernel buffer and we won't know if the channel is still alive. This is particularly problematic for the TCP connection pool. --- src/helper.rs | 14 +++++++++++++- src/server.rs | 13 ++++--------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/helper.rs b/src/helper.rs index b795932f..7e1e5d32 100644 --- a/src/helper.rs +++ b/src/helper.rs @@ -1,8 +1,9 @@ -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use async_http_proxy::{http_connect_tokio, http_connect_tokio_with_basic_auth}; use backoff::{backoff::Backoff, Notify}; use socket2::{SockRef, TcpKeepalive}; use std::{future::Future, net::SocketAddr, time::Duration}; +use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::{ net::{lookup_host, TcpStream, ToSocketAddrs, UdpSocket}, sync::broadcast, @@ -144,3 +145,14 @@ where } } } + +pub async fn write_and_flush(conn: &mut T, data: &[u8]) -> Result<()> +where + T: AsyncWrite + Unpin, +{ + conn.write_all(data) + .await + .with_context(|| "Failed to write data")?; + conn.flush().await.with_context(|| "Failed to flush data")?; + Ok(()) +} diff --git a/src/server.rs b/src/server.rs index a36e3c21..83ae976a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,7 @@ use crate::config::{Config, ServerConfig, ServerServiceConfig, ServiceType, TransportType}; use crate::config_watcher::{ConfigChange, ServerServiceChange}; use crate::constants::{listen_backoff, UDP_BUFFER_SIZE}; -use crate::helper::retry_notify_with_deadline; +use crate::helper::{retry_notify_with_deadline, write_and_flush}; use crate::multi_map::MultiMap; use crate::protocol::Hello::{ControlChannelHello, DataChannelHello}; use crate::protocol::{ @@ -498,14 +498,9 @@ struct ControlChannel { impl ControlChannel { async fn write_and_flush(&mut self, data: &[u8]) -> Result<()> { - self.conn - .write_all(data) + write_and_flush(&mut self.conn, data) .await .with_context(|| "Failed to write control cmds")?; - self.conn - .flush() - .await - .with_context(|| "Failed to flush control cmds")?; Ok(()) } // Run a control channel @@ -640,7 +635,7 @@ async fn run_tcp_connection_pool( 'pool: while let Some(mut visitor) = visitor_rx.recv().await { loop { if let Some(mut ch) = data_ch_rx.recv().await { - if ch.write_all(&cmd).await.is_ok() { + if write_and_flush(&mut ch, &cmd).await.is_ok() { tokio::spawn(async move { let _ = copy_bidirectional(&mut ch, &mut visitor).await; }); @@ -690,7 +685,7 @@ async fn run_udp_connection_pool( .recv() .await .ok_or_else(|| anyhow!("No available data channels"))?; - conn.write_all(&cmd).await?; + write_and_flush(&mut conn, &cmd).await?; let mut buf = [0u8; UDP_BUFFER_SIZE]; loop {