Skip to content

Commit

Permalink
fix ipv6 direct connector not work
Browse files Browse the repository at this point in the history
  • Loading branch information
KKRainbow committed Oct 3, 2024
1 parent 984ed8f commit ba3da97
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 38 deletions.
5 changes: 4 additions & 1 deletion easytier/locales/app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,7 @@ core_clap:
zh-CN: "转发所有对等节点的RPC数据包,即使对等节点不在转发网络白名单中。这可以帮助白名单外网络中的对等节点建立P2P连接。"
socks5:
en: "enable socks5 server, allow socks5 client to access virtual network. format: <port>, e.g.: 1080"
zh-CN: "启用 socks5 服务器,允许 socks5 客户端访问虚拟网络. 格式: <端口>,例如:1080"
zh-CN: "启用 socks5 服务器,允许 socks5 客户端访问虚拟网络. 格式: <端口>,例如:1080"
ipv6_listener:
en: "the url of the ipv6 listener, e.g.: tcp://[::]:11010, if not set, will listen on random udp port"
zh-CN: "IPv6 监听器的URL,例如:tcp://[::]:11010,如果未设置,将在随机UDP端口上监听"
6 changes: 2 additions & 4 deletions easytier/src/common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ pub struct Flags {
pub relay_all_peer_rpc: bool,
#[derivative(Default(value = "false"))]
pub disable_udp_hole_punching: bool,
#[derivative(Default(value = "\"udp://[::]:0\".to_string()"))]
pub ipv6_listener: String,
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
Expand Down Expand Up @@ -260,8 +262,6 @@ impl TomlConfigLoader {
serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(&default_flags_json)
.unwrap();

tracing::debug!("default_flags_hashmap: {:?}", default_flags_hashmap);

let mut merged_hashmap = serde_json::Map::new();
for (key, value) in default_flags_hashmap {
if let Some(v) = flags_hashmap.remove(&key) {
Expand All @@ -271,8 +271,6 @@ impl TomlConfigLoader {
}
}

tracing::debug!("merged_hashmap: {:?}", merged_hashmap);

serde_json::from_value(serde_json::Value::Object(merged_hashmap)).unwrap()
}
}
Expand Down
125 changes: 93 additions & 32 deletions easytier/src/connector/direct.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// try connect peers directly, with either its public ip or lan ip

use std::{net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, sync::Arc, time::Duration};

use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
Expand All @@ -19,6 +19,7 @@ use crate::{

use crate::proto::cli::PeerConnInfo;
use anyhow::Context;
use rand::Rng;
use tokio::{task::JoinSet, time::timeout};
use tracing::Instrument;
use url::Host;
Expand Down Expand Up @@ -64,13 +65,13 @@ impl PeerManagerForDirectConnector for PeerManager {
struct DstBlackListItem(PeerId, String);

#[derive(Hash, Eq, PartialEq, Clone)]
struct DstSchemeBlackListItem(PeerId, String);
struct DstListenerUrlBlackListItem(PeerId, url::Url);

struct DirectConnectorManagerData {
global_ctx: ArcGlobalCtx,
peer_manager: Arc<PeerManager>,
dst_blacklist: timedmap::TimedMap<DstBlackListItem, ()>,
dst_sceme_blacklist: timedmap::TimedMap<DstSchemeBlackListItem, ()>,
dst_listener_blacklist: timedmap::TimedMap<DstListenerUrlBlackListItem, ()>,
}

impl DirectConnectorManagerData {
Expand All @@ -79,7 +80,7 @@ impl DirectConnectorManagerData {
global_ctx,
peer_manager,
dst_blacklist: timedmap::TimedMap::new(),
dst_sceme_blacklist: timedmap::TimedMap::new(),
dst_listener_blacklist: timedmap::TimedMap::new(),
}
}
}
Expand Down Expand Up @@ -147,7 +148,7 @@ impl DirectConnectorManager {
}

while let Some(task_ret) = tasks.join_next().await {
tracing::trace!(?task_ret, "direct connect task ret");
tracing::debug!(?task_ret, ?my_peer_id, "direct connect task ret");
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
Expand All @@ -168,7 +169,7 @@ impl DirectConnectorManager {
.dst_blacklist
.contains(&DstBlackListItem(dst_peer_id.clone(), addr.clone()))
{
tracing::trace!("try_connect_to_ip failed, addr in blacklist: {}", addr);
tracing::debug!("try_connect_to_ip failed, addr in blacklist: {}", addr);
return Err(Error::UrlInBlacklist);
}

Expand Down Expand Up @@ -203,24 +204,38 @@ impl DirectConnectorManager {
dst_peer_id: PeerId,
addr: String,
) -> Result<(), Error> {
let ret = Self::do_try_connect_to_ip(data.clone(), dst_peer_id, addr.clone()).await;
if let Err(e) = ret {
if !matches!(e, Error::UrlInBlacklist) {
tracing::info!(
"try_connect_to_ip failed: {:?}, peer_id: {}",
e,
dst_peer_id
);
let mut rand_gen = rand::rngs::OsRng::default();
let backoff_ms = vec![1000, 2000, 4000];
let mut backoff_idx = 0;

loop {
let ret = Self::do_try_connect_to_ip(data.clone(), dst_peer_id, addr.clone()).await;
tracing::debug!(?ret, ?dst_peer_id, ?addr, "try_connect_to_ip return");
if matches!(ret, Err(Error::UrlInBlacklist) | Ok(_)) {
return ret;
}

if backoff_idx < backoff_ms.len() {
let delta = backoff_ms[backoff_idx] >> 1;
assert!(delta > 0);
assert!(delta < backoff_ms[backoff_idx]);

tokio::time::sleep(Duration::from_millis(
(backoff_ms[backoff_idx] + rand_gen.gen_range(-delta..delta)) as u64,
))
.await;

backoff_idx += 1;
continue;
} else {
data.dst_blacklist.insert(
DstBlackListItem(dst_peer_id.clone(), addr.clone()),
(),
std::time::Duration::from_secs(DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC),
);

return ret;
}
return Err(e);
} else {
tracing::info!("try_connect_to_ip success, peer_id: {}", dst_peer_id);
return Ok(());
}
}

Expand All @@ -230,6 +245,8 @@ impl DirectConnectorManager {
dst_peer_id: PeerId,
ip_list: GetIpListResponse,
) -> Result<(), Error> {
data.dst_listener_blacklist.cleanup();

let enable_ipv6 = data.global_ctx.get_flags().enable_ipv6;
let available_listeners = ip_list
.listeners
Expand All @@ -238,14 +255,15 @@ impl DirectConnectorManager {
.filter_map(|l| if l.scheme() != "ring" { Some(l) } else { None })
.filter(|l| l.port().is_some() && l.host().is_some())
.filter(|l| {
!data.dst_sceme_blacklist.contains(&DstSchemeBlackListItem(
dst_peer_id.clone(),
l.scheme().to_string(),
))
!data
.dst_listener_blacklist
.contains(&DstListenerUrlBlackListItem(dst_peer_id.clone(), l.clone()))
})
.filter(|l| enable_ipv6 || !matches!(l.host().unwrap().to_owned(), Host::Ipv6(_)))
.collect::<Vec<_>>();

tracing::debug!(?available_listeners, "got available listeners");

let mut listener = available_listeners.get(0).ok_or(anyhow::anyhow!(
"peer {} have no valid listener",
dst_peer_id
Expand All @@ -270,6 +288,13 @@ impl DirectConnectorManager {
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv4"
);
}
});

Expand All @@ -284,6 +309,13 @@ impl DirectConnectorManager {
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv4,
?listener,
?dst_peer_id,
"failed to set host for public ipv4"
);
}
}
}
Expand All @@ -299,6 +331,13 @@ impl DirectConnectorManager {
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv6"
);
}
});

Expand All @@ -313,6 +352,13 @@ impl DirectConnectorManager {
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv6,
?listener,
?dst_peer_id,
"failed to set host for public ipv6"
);
}
}
}
Expand All @@ -323,16 +369,28 @@ impl DirectConnectorManager {

let mut has_succ = false;
while let Some(ret) = tasks.join_next().await {
if let Err(e) = ret {
tracing::error!("join direct connect task failed: {:?}", e);
} else if let Ok(Ok(_)) = ret {
has_succ = true;
match ret {
Ok(Ok(_)) => {
has_succ = true;
tracing::info!(
?dst_peer_id,
?listener,
"try direct connect to peer success"
);
break;
}
Ok(Err(e)) => {
tracing::info!(?e, "try direct connect to peer failed");
}
Err(e) => {
tracing::error!(?e, "try direct connect to peer task join failed");
}
}
}

if !has_succ {
data.dst_sceme_blacklist.insert(
DstSchemeBlackListItem(dst_peer_id.clone(), listener.scheme().to_string()),
data.dst_listener_blacklist.insert(
DstListenerUrlBlackListItem(dst_peer_id.clone(), listener.clone()),
(),
std::time::Duration::from_secs(DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC),
);
Expand All @@ -355,7 +413,7 @@ impl DirectConnectorManager {
}
}

tracing::trace!("try direct connect to peer: {}", dst_peer_id);
tracing::debug!("try direct connect to peer: {}", dst_peer_id);

let rpc_stub = peer_manager
.get_peer_rpc_mgr()
Expand Down Expand Up @@ -384,7 +442,7 @@ mod tests {
use crate::{
connector::direct::{
DirectConnectorManager, DirectConnectorManagerData, DstBlackListItem,
DstSchemeBlackListItem,
DstListenerUrlBlackListItem,
},
instance::listeners::ListenerManager,
peers::tests::{
Expand Down Expand Up @@ -461,8 +519,11 @@ mod tests {
.unwrap();

assert!(data
.dst_sceme_blacklist
.contains(&DstSchemeBlackListItem(1, "tcp".into())));
.dst_listener_blacklist
.contains(&DstListenerUrlBlackListItem(
1,
"tcp://127.0.0.1:10222".parse().unwrap()
)));

assert!(data
.dst_blacklist
Expand Down
12 changes: 12 additions & 0 deletions easytier/src/easytier-core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ struct Cli {
help = t!("core_clap.socks5").to_string()
)]
socks5: Option<u16>,

#[arg(
long,
help = t!("core_clap.ipv6_listener").to_string()
)]
ipv6_listener: Option<String>,
}

rust_i18n::i18n!("locales", fallback = "en");
Expand Down Expand Up @@ -512,6 +518,12 @@ impl From<Cli> for TomlConfigLoader {
}
f.disable_p2p = cli.disable_p2p;
f.relay_all_peer_rpc = cli.relay_all_peer_rpc;
if let Some(ipv6_listener) = cli.ipv6_listener {
f.ipv6_listener = ipv6_listener
.parse()
.with_context(|| format!("failed to parse ipv6 listener: {}", ipv6_listener))
.unwrap();
}
cfg.set_flags(f);

cfg.set_exit_nodes(cli.exit_nodes.clone());
Expand Down
3 changes: 2 additions & 1 deletion easytier/src/instance/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,10 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
}

if self.global_ctx.config.get_flags().enable_ipv6 {
let ipv6_listener = self.global_ctx.config.get_flags().ipv6_listener.clone();
let _ = self
.add_listener(
UdpTunnelListener::new("udp://[::]:0".parse().unwrap()),
UdpTunnelListener::new(ipv6_listener.parse().unwrap()),
false,
)
.await?;
Expand Down

0 comments on commit ba3da97

Please sign in to comment.