Skip to content

Commit

Permalink
fix: WireGuard unexpected connection abort
Browse files Browse the repository at this point in the history
Root cause: spawning unnecessary UDP task whose abort handle also binds to the real task.
  • Loading branch information
XOR-op committed Jan 18, 2024
1 parent 7476a66 commit cbf24a1
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 78 deletions.
63 changes: 41 additions & 22 deletions boltconn/src/adapter/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,62 +28,88 @@ impl ChainOutbound {
) -> JoinHandle<io::Result<()>> {
let mut handles = vec![];
let mut not_first_jump = false;
// When the spawned task don't use the provided outbound, i.e. it uses a multiplexed connection
let mut early_stop_flag = false;
let (first_part, last_one) = self.chains.split_at(self.chains.len() - 1);

// connect proxies
for tunnel in first_part {
if early_stop_flag {
return tokio::spawn(async move { Ok(()) });
}
if use_tcp {
let inbound = inbound_tcp_container.take().unwrap();
if tunnel.outbound_type().tcp_transfer_type() == TcpTransferType::TcpOverUdp {
use_tcp = false;
let (inner, outer) = AddrConnector::new_pair(10);
inbound_udp_container = Some(outer);
let mut udp_outbound: Option<Box<dyn UdpSocketAdapter>> =
Some(Box::new(AddrConnectorWrapper::from(inner)));
handles.push(tunnel.spawn_tcp_with_outbound(
inbound,
None,
Some(Box::new(AddrConnectorWrapper::from(inner))),
&mut None,
&mut udp_outbound,
abort_handle.clone(),
));
if udp_outbound.is_some() {
early_stop_flag = true
}
} else {
let (inner, outer) = Connector::new_pair(10);
let chan = Box::new(DuplexChan::new(inner));
let mut tcp_outbound: Option<Box<dyn StreamOutboundTrait>> =
Some(Box::new(DuplexChan::new(inner)));
inbound_tcp_container = Some(outer);
handles.push(tunnel.spawn_tcp_with_outbound(
inbound,
Some(chan),
None,
&mut tcp_outbound,
&mut None,
abort_handle.clone(),
));
if tcp_outbound.is_some() {
early_stop_flag = true
}
}
} else {
let inbound = inbound_udp_container.take().unwrap();
if tunnel.outbound_type().udp_transfer_type() == UdpTransferType::UdpOverTcp {
// UoT, then next jump will use TCP
use_tcp = true;
let (inner, outer) = Connector::new_pair(10);
let chan = Box::new(DuplexChan::new(inner));
let mut tcp_outbound: Option<Box<dyn StreamOutboundTrait>> =
Some(Box::new(DuplexChan::new(inner)));
inbound_tcp_container = Some(outer);
handles.push(tunnel.spawn_udp_with_outbound(
inbound,
Some(chan),
None,
&mut tcp_outbound,
&mut None,
abort_handle.clone(),
not_first_jump,
));
if tcp_outbound.is_some() {
early_stop_flag = true
}
} else {
let (inner, outer) = AddrConnector::new_pair(10);
inbound_udp_container = Some(outer);
let mut udp_outbound: Option<Box<dyn UdpSocketAdapter>> =
Some(Box::new(AddrConnectorWrapper::from(inner)));
handles.push(tunnel.spawn_udp_with_outbound(
inbound,
None,
Some(Box::new(AddrConnectorWrapper::from(inner))),
&mut None,
&mut udp_outbound,
abort_handle.clone(),
not_first_jump,
));
if udp_outbound.is_some() {
early_stop_flag = true
}
};
}
not_first_jump = true;
}
if early_stop_flag {
return tokio::spawn(async move { Ok(()) });
}

// connect last one
if use_tcp {
Expand All @@ -94,14 +120,7 @@ impl ChainOutbound {
handles.push(last_one[0].spawn_udp(inbound, abort_handle, true));
}

tokio::spawn(async move {
for i in handles {
if let Ok(Err(e)) = i.await {
return Err(e);
}
}
Ok(())
})
tokio::spawn(async move { Ok(()) })
}
}

Expand All @@ -121,8 +140,8 @@ impl Outbound for ChainOutbound {
fn spawn_tcp_with_outbound(
&self,
_inbound: Connector,
_tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_tcp_outbound: &mut Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: &mut Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>> {
tracing::error!("spawn_tcp_with_outbound() should not be called with ChainOutbound");
Expand All @@ -141,8 +160,8 @@ impl Outbound for ChainOutbound {
fn spawn_udp_with_outbound(
&self,
_inbound: AddrConnector,
_tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_tcp_outbound: &mut Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: &mut Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> JoinHandle<io::Result<()>> {
Expand Down
8 changes: 4 additions & 4 deletions boltconn/src/adapter/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ impl Outbound for DirectOutbound {
fn spawn_tcp_with_outbound(
&self,
_inbound: Connector,
_tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_tcp_outbound: &mut Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: &mut Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>> {
tracing::error!("spawn_tcp_with_outbound() should not be called with DirectOutbound");
Expand All @@ -99,8 +99,8 @@ impl Outbound for DirectOutbound {
fn spawn_udp_with_outbound(
&self,
_inbound: AddrConnector,
_tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_tcp_outbound: &mut Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: &mut Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> JoinHandle<io::Result<()>> {
Expand Down
11 changes: 6 additions & 5 deletions boltconn/src/adapter/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,19 @@ impl Outbound for HttpOutbound {
fn spawn_tcp_with_outbound(
&self,
inbound: Connector,
tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
tcp_outbound: &mut Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: &mut Option<Box<dyn UdpSocketAdapter>>,
abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>> {
if tcp_outbound.is_none() || udp_outbound.is_some() {
tracing::error!("Invalid HTTP proxy tcp spawn");
return empty_handle();
}
let self_clone = self.clone();
let outbound = tcp_outbound.take().unwrap();
tokio::spawn(async move {
self_clone
.run_tcp(inbound, tcp_outbound.unwrap(), abort_handle)
.run_tcp(inbound, outbound, abort_handle)
.await
.map_err(|e| io_err(e.to_string().as_str()))
})
Expand All @@ -150,8 +151,8 @@ impl Outbound for HttpOutbound {
fn spawn_udp_with_outbound(
&self,
_inbound: AddrConnector,
_tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_tcp_outbound: &mut Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: &mut Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> JoinHandle<io::Result<()>> {
Expand Down
8 changes: 4 additions & 4 deletions boltconn/src/adapter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ pub trait Outbound: Send + Sync {
fn spawn_tcp_with_outbound(
&self,
inbound: Connector,
tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
tcp_outbound: &mut Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: &mut Option<Box<dyn UdpSocketAdapter>>,
abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>>;

Expand All @@ -170,8 +170,8 @@ pub trait Outbound: Send + Sync {
fn spawn_udp_with_outbound(
&self,
inbound: AddrConnector,
tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
tcp_outbound: &mut Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: &mut Option<Box<dyn UdpSocketAdapter>>,
abort_handle: ConnAbortHandle,
tunnel_only: bool,
) -> JoinHandle<io::Result<()>>;
Expand Down
13 changes: 7 additions & 6 deletions boltconn/src/adapter/shadowsocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,20 @@ impl Outbound for SSOutbound {
fn spawn_tcp_with_outbound(
&self,
inbound: Connector,
tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
tcp_outbound: &mut Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: &mut Option<Box<dyn UdpSocketAdapter>>,
abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>> {
if tcp_outbound.is_none() || udp_outbound.is_some() {
tracing::error!("Invalid Shadowsocks tcp spawn");
return empty_handle();
}
let self_clone = self.clone();
let outbound = tcp_outbound.take().unwrap();
tokio::spawn(async move {
let server_addr = self_clone.get_server_addr().await?;
self_clone
.run_tcp(inbound, tcp_outbound.unwrap(), server_addr, abort_handle)
.run_tcp(inbound, outbound, server_addr, abort_handle)
.await
})
}
Expand Down Expand Up @@ -205,16 +206,16 @@ impl Outbound for SSOutbound {
fn spawn_udp_with_outbound(
&self,
inbound: AddrConnector,
tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
tcp_outbound: &mut Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: &mut Option<Box<dyn UdpSocketAdapter>>,
abort_handle: ConnAbortHandle,
tunnel_only: bool,
) -> JoinHandle<io::Result<()>> {
if tcp_outbound.is_some() || udp_outbound.is_none() {
tracing::error!("Invalid Shadowsocks UDP outbound ancestor");
return empty_handle();
}
let udp_outbound = udp_outbound.unwrap();
let udp_outbound = udp_outbound.take().unwrap();
let self_clone = self.clone();
tokio::spawn(async move {
let server_addr = self_clone.get_server_addr().await?;
Expand Down
14 changes: 6 additions & 8 deletions boltconn/src/adapter/socks5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,16 @@ impl Outbound for Socks5Outbound {
fn spawn_tcp_with_outbound(
&self,
inbound: Connector,
tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
tcp_outbound: &mut Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: &mut Option<Box<dyn UdpSocketAdapter>>,
abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>> {
if tcp_outbound.is_none() || udp_outbound.is_some() {
tracing::error!("Invalid Socks5 tcp spawn");
return empty_handle();
}
tokio::spawn(
self.clone()
.run_tcp(inbound, tcp_outbound.unwrap(), abort_handle),
)
let outbound = tcp_outbound.take().unwrap();
tokio::spawn(self.clone().run_tcp(inbound, outbound, abort_handle))
}

fn spawn_udp(
Expand All @@ -189,8 +187,8 @@ impl Outbound for Socks5Outbound {
fn spawn_udp_with_outbound(
&self,
_inbound: AddrConnector,
_tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_tcp_outbound: &mut Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: &mut Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> JoinHandle<io::Result<()>> {
Expand Down
18 changes: 8 additions & 10 deletions boltconn/src/adapter/trojan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,16 @@ impl Outbound for TrojanOutbound {
fn spawn_tcp_with_outbound(
&self,
inbound: Connector,
tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
tcp_outbound: &mut Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: &mut Option<Box<dyn UdpSocketAdapter>>,
abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>> {
if tcp_outbound.is_none() || udp_outbound.is_some() {
tracing::error!("Invalid Trojan UDP outbound ancestor");
return empty_handle();
}
tokio::spawn(
self.clone()
.run_tcp(inbound, tcp_outbound.unwrap(), abort_handle),
)
let outbound = tcp_outbound.take().unwrap();
tokio::spawn(self.clone().run_tcp(inbound, outbound, abort_handle))
}

fn spawn_udp(
Expand All @@ -227,20 +225,20 @@ impl Outbound for TrojanOutbound {
fn spawn_udp_with_outbound(
&self,
inbound: AddrConnector,
tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
tcp_outbound: &mut Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: &mut Option<Box<dyn UdpSocketAdapter>>,
abort_handle: ConnAbortHandle,
tunnel_only: bool,
) -> JoinHandle<io::Result<()>> {
if tcp_outbound.is_none() || udp_outbound.is_some() {
tracing::error!("Invalid Trojan UDP outbound ancestor");
return empty_handle();
}
let tcp_outbound = tcp_outbound.unwrap();
let outbound = tcp_outbound.take().unwrap();
let self_clone = self.clone();
tokio::spawn(async move {
self_clone
.run_udp(inbound, tcp_outbound, abort_handle, tunnel_only)
.run_udp(inbound, outbound, abort_handle, tunnel_only)
.await
})
}
Expand Down
Loading

0 comments on commit cbf24a1

Please sign in to comment.