Skip to content

Commit

Permalink
fix: blackhole outbound not hold the connection
Browse files Browse the repository at this point in the history
  • Loading branch information
XOR-op committed Jan 15, 2024
1 parent 938cdc5 commit 393ec1b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 36 deletions.
63 changes: 35 additions & 28 deletions boltconn/src/proxy/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ use std::time::Duration;
use tokio::net::{TcpStream, UdpSocket};
use tokio::sync::mpsc;

enum BlockType {
pub(crate) enum DispatchError {
Reject,
BlackHole,
BadMitmCert,
BadChain,
}

pub struct Dispatcher {
Expand Down Expand Up @@ -200,7 +202,7 @@ impl Dispatcher {
dst_addr: NetworkAddr,
indicator: Arc<AtomicU8>,
stream: TcpStream,
) -> Result<(), ()> {
) -> Result<(), DispatchError> {
let process_info = process::get_pid(src_addr, process::NetworkType::Tcp)
.map_or(None, process::get_process_info);
let mut conn_info = ConnInfo {
Expand All @@ -220,23 +222,28 @@ impl Dispatcher {
let (outbounding, proxy_type): (Box<dyn Outbound>, OutboundType) =
match proxy_config.as_ref() {
ProxyImpl::Chain(vec) => (
Box::new(self.create_chain(vec, src_addr, &dst_addr, iface_name)?),
Box::new(
self.create_chain(vec, src_addr, &dst_addr, iface_name)
.map_err(|_| DispatchError::BadChain)?,
),
OutboundType::Chain,
),
ProxyImpl::BlackHole => {
tokio::spawn(async {
tokio::time::interval(Duration::from_secs(30)).tick().await;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(30)).await;
drop(stream)
});
return Err(());
return Err(DispatchError::BlackHole);
}
_ => self.build_normal_outbound(
iface_name,
proxy_config.as_ref(),
src_addr,
&dst_addr,
conn_info.resolved_dst.as_ref(),
)?,
_ => self
.build_normal_outbound(
iface_name,
proxy_config.as_ref(),
src_addr,
&dst_addr,
conn_info.resolved_dst.as_ref(),
)
.map_err(|_| DispatchError::Reject)?,
};

// conn info
Expand Down Expand Up @@ -321,7 +328,7 @@ impl Dispatcher {
"[Dispatcher] sign certificate failed: {}",
err
);
return Err(());
return Err(DispatchError::BadMitmCert);
}
};
tokio::spawn(async move {
Expand Down Expand Up @@ -358,7 +365,7 @@ impl Dispatcher {
src_addr: SocketAddr,
dst_addr: NetworkAddr,
mut conn_info: ConnInfo,
) -> Result<(Box<dyn Outbound>, Arc<ConnContext>, ConnAbortHandle), BlockType> {
) -> Result<(Box<dyn Outbound>, Arc<ConnContext>, ConnAbortHandle), DispatchError> {
let (proxy_config, iface) = self.dispatching.load().matches(&mut conn_info, true).await;
let iface_name = iface
.as_ref()
Expand All @@ -368,11 +375,11 @@ impl Dispatcher {
ProxyImpl::Chain(vec) => (
Box::new(
self.create_chain(vec, src_addr, &dst_addr, iface_name)
.map_err(|_| BlockType::Reject)?,
.map_err(|_| DispatchError::Reject)?,
),
OutboundType::Chain,
),
ProxyImpl::BlackHole => return Err(BlockType::BlackHole),
ProxyImpl::BlackHole => return Err(DispatchError::BlackHole),
_ => self
.build_normal_outbound(
iface_name,
Expand All @@ -381,7 +388,7 @@ impl Dispatcher {
&dst_addr,
conn_info.resolved_dst.as_ref(),
)
.map_err(|_| BlockType::Reject)?,
.map_err(|_| DispatchError::Reject)?,
};
// conn info
let abort_handle = ConnAbortHandle::new();
Expand Down Expand Up @@ -432,7 +439,7 @@ impl Dispatcher {
send_rx: mpsc::Receiver<(Bytes, NetworkAddr)>,
recv_tx: mpsc::Sender<(Bytes, SocketAddr)>,
indicator: Arc<AtomicBool>,
) -> Result<(), ()> {
) -> Result<(), DispatchError> {
let conn_info = ConnInfo {
src: src_addr,
dst: dst_addr.clone(),
Expand All @@ -445,15 +452,15 @@ impl Dispatcher {
let (outbounding, info, abort_handle) =
match self.route_udp(src_addr, dst_addr, conn_info).await {
Ok(r) => r,
Err(BlockType::Reject) => return Err(()),
Err(BlockType::BlackHole) => {
tokio::spawn(async {
tokio::time::interval(Duration::from_secs(30)).tick().await;
Err(DispatchError::BlackHole) => {
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(30)).await;
drop(send_rx);
drop(recv_tx);
});
return Err(());
return Err(DispatchError::BlackHole);
}
Err(e) => return Err(e),
};

let mut handles = Vec::new();
Expand Down Expand Up @@ -494,7 +501,7 @@ impl Dispatcher {
dst_addr: NetworkAddr,
indicator: Arc<AtomicBool>,
socket: UdpSocket,
) -> Result<(), ()> {
) -> Result<(), DispatchError> {
let process_info =
process::get_pid(src_addr, NetworkType::Udp).map_or(None, process::get_process_info);
let conn_info = ConnInfo {
Expand All @@ -512,14 +519,14 @@ impl Dispatcher {
let (outbounding, info, abort_handle) =
match self.route_udp(src_addr, dst_addr, conn_info).await {
Ok(r) => r,
Err(BlockType::Reject) => return Err(()),
Err(BlockType::BlackHole) => {
Err(DispatchError::BlackHole) => {
tokio::spawn(async {
tokio::time::interval(Duration::from_secs(30)).tick().await;
drop(socket);
});
return Err(());
return Err(DispatchError::BlackHole);
}
Err(e) => return Err(e),
};
let mut handles = Vec::new();

Expand Down
10 changes: 6 additions & 4 deletions boltconn/src/proxy/tun_inbound.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::dispatch::InboundInfo;
use crate::proxy::dispatcher::DispatchError;
use crate::proxy::manager::SessionManager;
use crate::proxy::{Dispatcher, NetworkAddr};
use crate::Dns;
Expand Down Expand Up @@ -56,7 +57,7 @@ impl TunTcpInbound {
port: dst_addr.port(),
},
};
if self
match self
.dispatcher
.submit_tcp(
InboundInfo::Tun,
Expand All @@ -66,10 +67,11 @@ impl TunTcpInbound {
socket,
)
.await
.is_err()
{
indicator.store(0, Ordering::Relaxed)
};
Ok(_) => {}
Err(DispatchError::BlackHole) => {}
Err(_) => indicator.store(0, Ordering::Relaxed),
}
} else {
tracing::warn!("Unexpected: no record found by port {}", addr.port())
}
Expand Down
9 changes: 5 additions & 4 deletions boltconn/src/proxy/tun_udp_inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::network::dns::Dns;
use crate::network::packet::transport_layer::create_raw_udp_pkt;
use crate::platform::process;
use crate::platform::process::{NetworkType, ProcessInfo};
use crate::proxy::dispatcher::DispatchError;
use crate::proxy::{Dispatcher, NetworkAddr, SessionManager};
use bytes::Bytes;
use smoltcp::wire::{Ipv4Packet, Ipv6Packet, UdpPacket};
Expand Down Expand Up @@ -161,7 +162,7 @@ impl TunUdpInbound {
let tun_tx = self.tun_tx.clone();
tokio::spawn(Self::back_prop(recv_rx, tun_tx, src));

if self
match self
.dispatcher
.submit_tun_udp_session(
src,
Expand All @@ -172,10 +173,10 @@ impl TunUdpInbound {
probe.clone(),
)
.await
.is_err()
{
probe.store(false, Ordering::Relaxed)
};
Ok(_) | Err(DispatchError::BlackHole) => {}
Err(_) => probe.store(false, Ordering::Relaxed),
}
true
}
}
Expand Down

0 comments on commit 393ec1b

Please sign in to comment.