diff --git a/boltconn/src/proxy/dispatcher.rs b/boltconn/src/proxy/dispatcher.rs index 39c1ef4..11e6859 100644 --- a/boltconn/src/proxy/dispatcher.rs +++ b/boltconn/src/proxy/dispatcher.rs @@ -22,9 +22,11 @@ use std::time::Duration; use tokio::net::{TcpStream, UdpSocket}; use tokio::sync::mpsc; -enum BlockType { +pub(crate) enum DispatchError { Reject, BlackHole, + BadMitmCert, + BadChain, } pub struct Dispatcher { @@ -200,7 +202,7 @@ impl Dispatcher { dst_addr: NetworkAddr, indicator: Arc, stream: TcpStream, - ) -> Result<(), ()> { + ) -> Result<(), DispatchError> { let process_info = process::get_pid(src_addr, process::NetworkType::Tcp) .map_or(None, process::get_process_info); let mut conn_info = ConnInfo { @@ -220,23 +222,28 @@ impl Dispatcher { let (outbounding, proxy_type): (Box, OutboundType) = match proxy_config.as_ref() { ProxyImpl::Chain(vec) => ( - Box::new(self.create_chain(vec, src_addr, &dst_addr, iface_name)?), + Box::new( + self.create_chain(vec, src_addr, &dst_addr, iface_name) + .map_err(|_| DispatchError::BadChain)?, + ), OutboundType::Chain, ), ProxyImpl::BlackHole => { - tokio::spawn(async { - tokio::time::interval(Duration::from_secs(30)).tick().await; + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(30)).await; drop(stream) }); - return Err(()); + return Err(DispatchError::BlackHole); } - _ => self.build_normal_outbound( - iface_name, - proxy_config.as_ref(), - src_addr, - &dst_addr, - conn_info.resolved_dst.as_ref(), - )?, + _ => self + .build_normal_outbound( + iface_name, + proxy_config.as_ref(), + src_addr, + &dst_addr, + conn_info.resolved_dst.as_ref(), + ) + .map_err(|_| DispatchError::Reject)?, }; // conn info @@ -321,7 +328,7 @@ impl Dispatcher { "[Dispatcher] sign certificate failed: {}", err ); - return Err(()); + return Err(DispatchError::BadMitmCert); } }; tokio::spawn(async move { @@ -358,7 +365,7 @@ impl Dispatcher { src_addr: SocketAddr, dst_addr: NetworkAddr, mut conn_info: ConnInfo, - ) -> Result<(Box, Arc, ConnAbortHandle), BlockType> { + ) -> Result<(Box, Arc, ConnAbortHandle), DispatchError> { let (proxy_config, iface) = self.dispatching.load().matches(&mut conn_info, true).await; let iface_name = iface .as_ref() @@ -368,11 +375,11 @@ impl Dispatcher { ProxyImpl::Chain(vec) => ( Box::new( self.create_chain(vec, src_addr, &dst_addr, iface_name) - .map_err(|_| BlockType::Reject)?, + .map_err(|_| DispatchError::Reject)?, ), OutboundType::Chain, ), - ProxyImpl::BlackHole => return Err(BlockType::BlackHole), + ProxyImpl::BlackHole => return Err(DispatchError::BlackHole), _ => self .build_normal_outbound( iface_name, @@ -381,7 +388,7 @@ impl Dispatcher { &dst_addr, conn_info.resolved_dst.as_ref(), ) - .map_err(|_| BlockType::Reject)?, + .map_err(|_| DispatchError::Reject)?, }; // conn info let abort_handle = ConnAbortHandle::new(); @@ -432,7 +439,7 @@ impl Dispatcher { send_rx: mpsc::Receiver<(Bytes, NetworkAddr)>, recv_tx: mpsc::Sender<(Bytes, SocketAddr)>, indicator: Arc, - ) -> Result<(), ()> { + ) -> Result<(), DispatchError> { let conn_info = ConnInfo { src: src_addr, dst: dst_addr.clone(), @@ -445,15 +452,15 @@ impl Dispatcher { let (outbounding, info, abort_handle) = match self.route_udp(src_addr, dst_addr, conn_info).await { Ok(r) => r, - Err(BlockType::Reject) => return Err(()), - Err(BlockType::BlackHole) => { - tokio::spawn(async { - tokio::time::interval(Duration::from_secs(30)).tick().await; + Err(DispatchError::BlackHole) => { + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(30)).await; drop(send_rx); drop(recv_tx); }); - return Err(()); + return Err(DispatchError::BlackHole); } + Err(e) => return Err(e), }; let mut handles = Vec::new(); @@ -494,7 +501,7 @@ impl Dispatcher { dst_addr: NetworkAddr, indicator: Arc, socket: UdpSocket, - ) -> Result<(), ()> { + ) -> Result<(), DispatchError> { let process_info = process::get_pid(src_addr, NetworkType::Udp).map_or(None, process::get_process_info); let conn_info = ConnInfo { @@ -512,14 +519,14 @@ impl Dispatcher { let (outbounding, info, abort_handle) = match self.route_udp(src_addr, dst_addr, conn_info).await { Ok(r) => r, - Err(BlockType::Reject) => return Err(()), - Err(BlockType::BlackHole) => { + Err(DispatchError::BlackHole) => { tokio::spawn(async { tokio::time::interval(Duration::from_secs(30)).tick().await; drop(socket); }); - return Err(()); + return Err(DispatchError::BlackHole); } + Err(e) => return Err(e), }; let mut handles = Vec::new(); diff --git a/boltconn/src/proxy/tun_inbound.rs b/boltconn/src/proxy/tun_inbound.rs index 7da9e40..16a3531 100644 --- a/boltconn/src/proxy/tun_inbound.rs +++ b/boltconn/src/proxy/tun_inbound.rs @@ -1,4 +1,5 @@ use crate::dispatch::InboundInfo; +use crate::proxy::dispatcher::DispatchError; use crate::proxy::manager::SessionManager; use crate::proxy::{Dispatcher, NetworkAddr}; use crate::Dns; @@ -56,7 +57,7 @@ impl TunTcpInbound { port: dst_addr.port(), }, }; - if self + match self .dispatcher .submit_tcp( InboundInfo::Tun, @@ -66,10 +67,11 @@ impl TunTcpInbound { socket, ) .await - .is_err() { - indicator.store(0, Ordering::Relaxed) - }; + Ok(_) => {} + Err(DispatchError::BlackHole) => {} + Err(_) => indicator.store(0, Ordering::Relaxed), + } } else { tracing::warn!("Unexpected: no record found by port {}", addr.port()) } diff --git a/boltconn/src/proxy/tun_udp_inbound.rs b/boltconn/src/proxy/tun_udp_inbound.rs index 036f6bc..f2713ff 100644 --- a/boltconn/src/proxy/tun_udp_inbound.rs +++ b/boltconn/src/proxy/tun_udp_inbound.rs @@ -2,6 +2,7 @@ use crate::network::dns::Dns; use crate::network::packet::transport_layer::create_raw_udp_pkt; use crate::platform::process; use crate::platform::process::{NetworkType, ProcessInfo}; +use crate::proxy::dispatcher::DispatchError; use crate::proxy::{Dispatcher, NetworkAddr, SessionManager}; use bytes::Bytes; use smoltcp::wire::{Ipv4Packet, Ipv6Packet, UdpPacket}; @@ -161,7 +162,7 @@ impl TunUdpInbound { let tun_tx = self.tun_tx.clone(); tokio::spawn(Self::back_prop(recv_rx, tun_tx, src)); - if self + match self .dispatcher .submit_tun_udp_session( src, @@ -172,10 +173,10 @@ impl TunUdpInbound { probe.clone(), ) .await - .is_err() { - probe.store(false, Ordering::Relaxed) - }; + Ok(_) | Err(DispatchError::BlackHole) => {} + Err(_) => probe.store(false, Ordering::Relaxed), + } true } }