Skip to content

Commit

Permalink
nat4-nat4 punch (EasyTier#388)
Browse files Browse the repository at this point in the history
this patch optimize the udp hole punch logic:

1. allow start punch hole before stun test complete.
2. add lock to symmetric punch, avoid conflict between concurrent hole punching task.
3. support punching hole for predictable nat4-nat4.
4. make backoff of retry reasonable
  • Loading branch information
KKRainbow authored Oct 6, 2024
1 parent ba3da97 commit 37ceb77
Show file tree
Hide file tree
Showing 24 changed files with 2,746 additions and 1,308 deletions.
134 changes: 100 additions & 34 deletions easytier/src/common/stun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,16 @@ impl StunClientBuilder {
pub struct UdpNatTypeDetectResult {
source_addr: SocketAddr,
stun_resps: Vec<BindRequestResponse>,
// if we are easy symmetric nat, we need to test with another port to check inc or dec
extra_bind_test: Option<BindRequestResponse>,
}

impl UdpNatTypeDetectResult {
fn new(source_addr: SocketAddr, stun_resps: Vec<BindRequestResponse>) -> Self {
Self {
source_addr,
stun_resps,
extra_bind_test: None,
}
}

Expand Down Expand Up @@ -406,7 +409,7 @@ impl UdpNatTypeDetectResult {
.filter_map(|x| x.mapped_socket_addr)
.collect::<BTreeSet<_>>()
.len();
mapped_addr_count < self.stun_server_count()
mapped_addr_count == 1
}

pub fn nat_type(&self) -> NatType {
Expand All @@ -429,7 +432,32 @@ impl UdpNatTypeDetectResult {
return NatType::PortRestricted;
}
} else if !self.stun_resps.is_empty() {
return NatType::Symmetric;
if self.public_ips().len() != 1
|| self.usable_stun_resp_count() <= 1
|| self.max_port() - self.min_port() > 15
|| self.extra_bind_test.is_none()
|| self
.extra_bind_test
.as_ref()
.unwrap()
.mapped_socket_addr
.is_none()
{
return NatType::Symmetric;
} else {
let extra_bind_test = self.extra_bind_test.as_ref().unwrap();
let extra_port = extra_bind_test.mapped_socket_addr.unwrap().port();

let max_port_diff = extra_port.saturating_sub(self.max_port());
let min_port_diff = self.min_port().saturating_sub(extra_port);
if max_port_diff != 0 && max_port_diff < 100 {
return NatType::SymmetricEasyInc;
} else if min_port_diff != 0 && min_port_diff < 100 {
return NatType::SymmetricEasyDec;
} else {
return NatType::Symmetric;
}
}
} else {
return NatType::Unknown;
}
Expand Down Expand Up @@ -477,6 +505,13 @@ impl UdpNatTypeDetectResult {
.max()
.unwrap_or(u16::MAX)
}

pub fn usable_stun_resp_count(&self) -> usize {
self.stun_resps
.iter()
.filter(|x| x.mapped_socket_addr.is_some())
.count()
}
}

pub struct UdpNatTypeDetector {
Expand All @@ -492,6 +527,19 @@ impl UdpNatTypeDetector {
}
}

async fn get_extra_bind_result(
&self,
source_port: u16,
stun_server: SocketAddr,
) -> Result<BindRequestResponse, Error> {
let udp = Arc::new(UdpSocket::bind(format!("0.0.0.0:{}", source_port)).await?);
let client_builder = StunClientBuilder::new(udp.clone());
client_builder
.new_stun_client(stun_server)
.bind_request(false, false)
.await
}

pub async fn detect_nat_type(&self, source_port: u16) -> Result<UdpNatTypeDetectResult, Error> {
let udp = Arc::new(UdpSocket::bind(format!("0.0.0.0:{}", source_port)).await?);
self.detect_nat_type_with_socket(udp).await
Expand Down Expand Up @@ -578,13 +626,28 @@ impl StunInfoCollectorTrait for StunInfoCollector {
async fn get_udp_port_mapping(&self, local_port: u16) -> Result<SocketAddr, Error> {
self.start_stun_routine();

let stun_servers = self
let mut stun_servers = self
.udp_nat_test_result
.read()
.unwrap()
.clone()
.map(|x| x.collect_available_stun_server())
.ok_or(Error::NotFound)?;
.unwrap_or(vec![]);

if stun_servers.is_empty() {
let mut host_resolver =
HostResolverIter::new(self.stun_servers.read().unwrap().clone(), 2);
while let Some(addr) = host_resolver.next().await {
stun_servers.push(addr);
if stun_servers.len() >= 2 {
break;
}
}
}

if stun_servers.is_empty() {
return Err(Error::NotFound);
}

let udp = Arc::new(UdpSocket::bind(format!("0.0.0.0:{}", local_port)).await?);
let mut client_builder = StunClientBuilder::new(udp.clone());
Expand Down Expand Up @@ -630,9 +693,9 @@ impl StunInfoCollector {
// stun server cross nation may return a external ip address with high latency and loss rate
vec![
"stun.miwifi.com",
"stun.cdnbye.com",
"stun.hitv.com",
"stun.chat.bilibili.com",
"stun.hitv.com",
"stun.cdnbye.com",
"stun.douyucdn.cn:18000",
"fwa.lifesizecloud.com",
"global.turn.twilio.com",
Expand Down Expand Up @@ -673,38 +736,41 @@ impl StunInfoCollector {
.map(|x| x.to_string())
.collect();
let detector = UdpNatTypeDetector::new(servers, 1);
let ret = detector.detect_nat_type(0).await;
let mut ret = detector.detect_nat_type(0).await;
tracing::debug!(?ret, "finish udp nat type detect");

let mut nat_type = NatType::Unknown;
let sleep_sec = match &ret {
Ok(resp) => {
*udp_nat_test_result.write().unwrap() = Some(resp.clone());
udp_test_time.store(Local::now());
nat_type = resp.nat_type();
if nat_type == NatType::Unknown {
15
} else {
600
}
}
_ => 15,
};
if let Ok(resp) = &ret {
tracing::debug!(?resp, "got udp nat type detect result");
nat_type = resp.nat_type();
}

// if nat type is symmtric, detect with another port to gather more info
if nat_type == NatType::Symmetric {
let old_resp = ret.unwrap();
let old_local_port = old_resp.local_addr().port();
let new_port = if old_local_port >= 65535 {
old_local_port - 1
} else {
old_local_port + 1
};
let ret = detector.detect_nat_type(new_port).await;
tracing::debug!(?ret, "finish udp nat type detect with another port");
if let Ok(resp) = ret {
udp_nat_test_result.write().unwrap().as_mut().map(|x| {
x.extend_result(resp);
});
let old_resp = ret.as_mut().unwrap();
tracing::debug!(?old_resp, "start get extra bind result");
let available_stun_servers = old_resp.collect_available_stun_server();
for server in available_stun_servers.iter() {
let ret = detector
.get_extra_bind_result(0, *server)
.await
.with_context(|| "get extra bind result failed");
tracing::debug!(?ret, "finish udp nat type detect with another port");
if let Ok(resp) = ret {
old_resp.extra_bind_test = Some(resp);
break;
}
}
}

let mut sleep_sec = 10;
if let Ok(resp) = &ret {
udp_test_time.store(Local::now());
*udp_nat_test_result.write().unwrap() = Some(resp.clone());
if nat_type != NatType::Unknown
&& (nat_type != NatType::Symmetric || resp.extra_bind_test.is_some())
{
sleep_sec = 600
}
}

Expand Down Expand Up @@ -734,7 +800,7 @@ impl StunInfoCollectorTrait for MockStunInfoCollector {
last_update_time: std::time::Instant::now().elapsed().as_secs() as i64,
min_port: 100,
max_port: 200,
..Default::default()
public_ip: vec!["127.0.0.1".to_string()],
}
}

Expand Down
2 changes: 1 addition & 1 deletion easytier/src/connector/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ impl DirectConnectorManager {
);

let ip_list = rpc_stub
.get_ip_list(BaseController {}, GetIpListRequest {})
.get_ip_list(BaseController::default(), GetIpListRequest {})
.await
.with_context(|| format!("get ip list from peer {}", dst_peer_id))?;

Expand Down
Loading

0 comments on commit 37ceb77

Please sign in to comment.