Skip to content

Commit

Permalink
fix: several WireGuard bugs (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
XOR-op authored Oct 14, 2024
2 parents 2ce3d60 + 1098b4b commit 0431906
Show file tree
Hide file tree
Showing 26 changed files with 703 additions and 375 deletions.
8 changes: 6 additions & 2 deletions boltapi/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
ConnectionSchema, GetGroupRespSchema, GetInterceptDataResp, HttpInterceptSchema, TrafficResp,
TunStatusSchema,
ConnectionSchema, GetGroupRespSchema, GetInterceptDataResp, HttpInterceptSchema,
MasterConnectionStatus, TrafficResp, TunStatusSchema,
};

pub const MAX_CODEC_FRAME_LENGTH: usize = 512 * 1024 * 1024;
Expand Down Expand Up @@ -55,6 +55,10 @@ pub trait ControlService {

async fn get_conn_log_limit() -> u32;

async fn get_master_conn_stat() -> Vec<MasterConnectionStatus>;

async fn stop_master_conn(id: String);

async fn reload();

// Streaming
Expand Down
10 changes: 10 additions & 0 deletions boltapi/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,13 @@ pub struct TrafficResp {
pub struct TunStatusSchema {
pub enabled: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct MasterConnectionStatus {
pub name: String,
pub alive: bool,
pub last_active: u64,
pub last_handshake: u64,
pub hand_shake_is_expired: bool,
}
24 changes: 15 additions & 9 deletions boltconn/src/adapter/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,25 @@ use crate::adapter::{
UdpTransferType,
};
use async_trait::async_trait;
use std::io;
use std::sync::Arc;

use crate::common::duplex_chan::DuplexChan;
use crate::common::StreamOutboundTrait;
use crate::proxy::error::TransportError;
use crate::proxy::ConnAbortHandle;
use crate::transport::UdpSocketAdapter;
use tokio::task::JoinHandle;

#[derive(Clone)]
pub struct ChainOutbound {
name: String,
chains: Vec<Arc<dyn Outbound>>,
}

impl ChainOutbound {
pub fn new(chains: Vec<Box<dyn Outbound>>) -> Self {
pub fn new(name: &str, chains: Vec<Box<dyn Outbound>>) -> Self {
Self {
name: name.to_string(),
chains: chains.into_iter().map(Arc::from).collect(),
}
}
Expand All @@ -30,7 +32,7 @@ impl ChainOutbound {
mut inbound_tcp_container: Option<Connector>,
mut inbound_udp_container: Option<AddrConnector>,
abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
tokio::spawn(async move {
let mut not_first_jump = false;
let mut need_next_jump = true;
Expand Down Expand Up @@ -122,6 +124,10 @@ impl ChainOutbound {

#[async_trait]
impl Outbound for ChainOutbound {
fn id(&self) -> String {
self.name.clone()
}

fn outbound_type(&self) -> OutboundType {
OutboundType::Chain
}
Expand All @@ -130,7 +136,7 @@ impl Outbound for ChainOutbound {
&self,
inbound: Connector,
abort_handle: ConnAbortHandle,
) -> JoinHandle<std::io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
self.clone().spawn(true, Some(inbound), None, abort_handle)
}

Expand All @@ -140,17 +146,17 @@ impl Outbound for ChainOutbound {
_tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
) -> io::Result<bool> {
) -> Result<bool, TransportError> {
tracing::error!("spawn_tcp_with_outbound() should not be called with ChainOutbound");
return Err(io::ErrorKind::InvalidData.into());
Err(TransportError::Internal("Invalid outbound"))
}

fn spawn_udp(
&self,
inbound: AddrConnector,
abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> JoinHandle<io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
self.clone().spawn(false, None, Some(inbound), abort_handle)
}

Expand All @@ -161,8 +167,8 @@ impl Outbound for ChainOutbound {
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> io::Result<bool> {
) -> Result<bool, TransportError> {
tracing::error!("spawn_udp_with_outbound() should not be called with ChainUdpOutbound");
return Err(io::ErrorKind::InvalidData.into());
Err(TransportError::Internal("Invalod outbound"))
}
}
30 changes: 19 additions & 11 deletions boltconn/src/adapter/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::proxy::error::TransportError;
use crate::proxy::{ConnAbortHandle, NetworkAddr};
use crate::transport::UdpSocketAdapter;
use async_trait::async_trait;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
Expand Down Expand Up @@ -37,25 +36,30 @@ impl DirectOutbound {
}
}

async fn run_tcp(self, inbound: Connector, abort_handle: ConnAbortHandle) -> io::Result<()> {
async fn run_tcp(
self,
inbound: Connector,
abort_handle: ConnAbortHandle,
) -> Result<(), TransportError> {
let dst_addr = if let Some(dst) = self.resolved_dst {
dst
} else {
lookup(self.dns.as_ref(), &self.dst).await?
};
let outbound = Egress::new(&self.iface_name).tcp_stream(dst_addr).await?;

established_tcp(inbound, outbound, abort_handle).await;
established_tcp(self.id(), inbound, outbound, abort_handle).await;
Ok(())
}

async fn run_udp(
self,
inbound: AddrConnector,
abort_handle: ConnAbortHandle,
) -> io::Result<()> {
) -> Result<(), TransportError> {
let outbound = Arc::new(Egress::new(&self.iface_name).udpv4_socket().await?);
established_udp(
self.id(),
inbound,
DirectUdpAdapter(outbound, self.dns.clone()),
None,
Expand All @@ -68,6 +72,10 @@ impl DirectOutbound {

#[async_trait]
impl Outbound for DirectOutbound {
fn id(&self) -> String {
"DIRECT".to_string()
}

fn outbound_type(&self) -> OutboundType {
OutboundType::Direct
}
Expand All @@ -76,7 +84,7 @@ impl Outbound for DirectOutbound {
&self,
inbound: Connector,
abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
tokio::spawn(self.clone().run_tcp(inbound, abort_handle))
}

Expand All @@ -86,17 +94,17 @@ impl Outbound for DirectOutbound {
_tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
) -> io::Result<bool> {
) -> Result<bool, TransportError> {
tracing::error!("spawn_tcp_with_outbound() should not be called with DirectOutbound");
return Err(io::ErrorKind::InvalidData.into());
Err(TransportError::Internal("Invalid outbound"))
}

fn spawn_udp(
&self,
inbound: AddrConnector,
abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> JoinHandle<io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
tokio::spawn(self.clone().run_udp(inbound, abort_handle))
}

Expand All @@ -107,9 +115,9 @@ impl Outbound for DirectOutbound {
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> io::Result<bool> {
) -> Result<bool, TransportError> {
tracing::error!("spawn_udp_with_outbound() should not be called with DirectOutbound");
return Err(io::ErrorKind::InvalidData.into());
Err(TransportError::Internal("Invalid outbound"))
}
}

Expand All @@ -122,7 +130,7 @@ impl UdpSocketAdapter for DirectUdpAdapter {
let addr = match addr {
NetworkAddr::Raw(s) => s,
NetworkAddr::DomainName { domain_name, port } => {
let Some(ip) = self.1.genuine_lookup(domain_name.as_str()).await else {
let Ok(Some(ip)) = self.1.genuine_lookup(domain_name.as_str()).await else {
// drop
return Ok(());
};
Expand Down
47 changes: 28 additions & 19 deletions boltconn/src/adapter/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use crate::common::{io_err, StreamOutboundTrait};
use crate::config::AuthData;
use crate::network::dns::Dns;
use crate::network::egress::Egress;
use crate::proxy::error::TransportError;
use crate::proxy::{ConnAbortHandle, NetworkAddr};
use crate::transport::UdpSocketAdapter;
use async_trait::async_trait;
use base64::Engine;
use httparse::Response;
use std::io;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::task::JoinHandle;
Expand All @@ -24,15 +24,23 @@ pub struct HttpConfig {

#[derive(Clone)]
pub struct HttpOutbound {
name: String,
iface_name: String,
dst: NetworkAddr,
dns: Arc<Dns>,
config: HttpConfig,
}

impl HttpOutbound {
pub fn new(iface_name: &str, dst: NetworkAddr, dns: Arc<Dns>, config: HttpConfig) -> Self {
pub fn new(
name: &str,
iface_name: &str,
dst: NetworkAddr,
dns: Arc<Dns>,
config: HttpConfig,
) -> Self {
Self {
name: name.to_string(),
iface_name: iface_name.to_string(),
dst,
dns,
Expand All @@ -45,7 +53,7 @@ impl HttpOutbound {
inbound: Connector,
mut outbound: S,
abort_handle: ConnAbortHandle,
) -> io::Result<()>
) -> Result<(), TransportError>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
Expand All @@ -71,31 +79,35 @@ impl HttpOutbound {
let mut resp = String::new();
while !resp.ends_with("\r\n\r\n") {
if buf_reader.read_line(&mut resp).await? == 0 {
return Err(io_err("EOF"));
return Err(TransportError::Http("EOF"));
}
if resp.len() > 4096 {
return Err(io_err("Too long resp"));
return Err(TransportError::Http("Response too long"));
}
}
let mut buf = [httparse::EMPTY_HEADER; 16];
let mut resp_struct = Response::new(buf.as_mut());
resp_struct
.parse(resp.as_bytes())
.map_err(|_| io_err("Parse response failed"))?;
.map_err(|_| TransportError::Http("Parsing failed"))?;
if let Some(200) = resp_struct.code {
let tcp_stream = buf_reader.into_inner();
established_tcp(inbound, tcp_stream, abort_handle).await;
established_tcp(self.name, inbound, tcp_stream, abort_handle).await;
Ok(())
} else {
Err(io_err(
Err(TransportError::Io(io_err(
format!("Http Connect Failed: {:?}", resp_struct.code).as_str(),
))
)))
}
}
}

#[async_trait]
impl Outbound for HttpOutbound {
fn id(&self) -> String {
self.name.clone()
}

fn outbound_type(&self) -> OutboundType {
OutboundType::Http
}
Expand All @@ -104,18 +116,15 @@ impl Outbound for HttpOutbound {
&self,
inbound: Connector,
abort_handle: ConnAbortHandle,
) -> JoinHandle<io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
let self_clone = self.clone();
tokio::spawn(async move {
let server_addr =
lookup(self_clone.dns.as_ref(), &self_clone.config.server_addr).await?;
let tcp_stream = Egress::new(&self_clone.iface_name)
.tcp_stream(server_addr)
.await?;
self_clone
.run_tcp(inbound, tcp_stream, abort_handle)
.await
.map_err(|e| io_err(e.to_string().as_str()))
self_clone.run_tcp(inbound, tcp_stream, abort_handle).await
})
}

Expand All @@ -125,10 +134,10 @@ impl Outbound for HttpOutbound {
tcp_outbound: Option<Box<dyn StreamOutboundTrait>>,
udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
abort_handle: ConnAbortHandle,
) -> io::Result<bool> {
) -> Result<bool, TransportError> {
if tcp_outbound.is_none() || udp_outbound.is_some() {
tracing::error!("Invalid HTTP proxy tcp spawn");
return Err(io::ErrorKind::InvalidData.into());
return Err(TransportError::Internal("Invalid outbound"));
}
let self_clone = self.clone();
tokio::spawn(async move {
Expand All @@ -145,7 +154,7 @@ impl Outbound for HttpOutbound {
_inbound: AddrConnector,
_abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> JoinHandle<io::Result<()>> {
) -> JoinHandle<Result<(), TransportError>> {
tracing::error!("spawn_udp() should not be called with HttpOutbound");
empty_handle()
}
Expand All @@ -157,8 +166,8 @@ impl Outbound for HttpOutbound {
_udp_outbound: Option<Box<dyn UdpSocketAdapter>>,
_abort_handle: ConnAbortHandle,
_tunnel_only: bool,
) -> io::Result<bool> {
) -> Result<bool, TransportError> {
tracing::error!("spawn_udp_with_outbound() should not be called with HttpOutbound");
return Err(io::ErrorKind::InvalidData.into());
Err(TransportError::Internal("Invalid outbound"))
}
}
Loading

0 comments on commit 0431906

Please sign in to comment.