Skip to content

Commit

Permalink
add cli command for global foreign network info
Browse files Browse the repository at this point in the history
  • Loading branch information
KKRainbow committed Sep 22, 2024
1 parent aca9a0e commit 783ba50
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 7 deletions.
28 changes: 28 additions & 0 deletions easytier/src/easytier-cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ enum PeerSubCommand {
Remove,
List(PeerListArgs),
ListForeign,
ListGlobalForeign,
}

#[derive(Args, Debug)]
Expand Down Expand Up @@ -342,6 +343,30 @@ impl CommandHandler {
Ok(())
}

async fn handle_global_foreign_network_list(&self) -> Result<(), Error> {
let client = self.get_peer_manager_client().await?;
let request = ListGlobalForeignNetworkRequest::default();
let response = client
.list_global_foreign_network(BaseController {}, request)
.await?;
if self.verbose {
println!("{:#?}", response);
return Ok(());
}

for (k, v) in response.foreign_networks.iter() {
println!("Peer ID: {}", k);
for n in v.foreign_networks.iter() {
println!(
" Network Name: {}, Last Updated: {}, Version: {}, PeerIds: {:?}",
n.network_name, n.last_updated, n.version, n.peer_ids
);
}
}

Ok(())
}

async fn handle_route_list(&self) -> Result<(), Error> {
#[derive(tabled::Tabled)]
struct RouteTableItem {
Expand Down Expand Up @@ -464,6 +489,9 @@ async fn main() -> Result<(), Error> {
Some(PeerSubCommand::ListForeign) => {
handler.handle_foreign_network_list().await?;
}
Some(PeerSubCommand::ListGlobalForeign) => {
handler.handle_global_foreign_network_list().await?;
}
None => {
handler.handle_peer_list(&peer_args).await?;
}
Expand Down
33 changes: 31 additions & 2 deletions easytier/src/peers/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
fmt::Debug,
net::Ipv4Addr,
sync::{Arc, Weak},
time::{Instant, SystemTime},
time::SystemTime,
};

use anyhow::Context;
Expand Down Expand Up @@ -32,7 +32,10 @@ use crate::{
PeerPacketFilter,
},
proto::{
cli,
cli::{
self, list_global_foreign_network_response::OneForeignNetwork,
ListGlobalForeignNetworkResponse,
},
peer_rpc::{ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey},
},
tunnel::{
Expand Down Expand Up @@ -499,6 +502,10 @@ impl PeerManager {

let networks = foreign_mgr.list_foreign_networks().await;
for (network_name, info) in networks.foreign_networks.iter() {
if info.peers.is_empty() {
continue;
}

let last_update = foreign_mgr
.get_foreign_network_last_update(network_name)
.unwrap_or(SystemTime::now());
Expand Down Expand Up @@ -548,6 +555,28 @@ impl PeerManager {
self.get_route().dump().await
}

pub async fn list_global_foreign_network(&self) -> ListGlobalForeignNetworkResponse {
let mut resp = ListGlobalForeignNetworkResponse::default();
let ret = self.get_route().list_foreign_network_info().await;
for info in ret.infos.iter() {
let entry = resp
.foreign_networks
.entry(info.key.as_ref().unwrap().peer_id)
.or_insert_with(|| Default::default());

let mut f = OneForeignNetwork::default();
f.network_name = info.key.as_ref().unwrap().network_name.clone();
f.peer_ids
.extend(info.value.as_ref().unwrap().foreign_peer_ids.iter());
f.last_updated = format!("{}", info.value.as_ref().unwrap().last_update.unwrap());
f.version = info.value.as_ref().unwrap().version;

entry.foreign_networks.push(f);
}

resp
}

async fn run_nic_packet_process_pipeline(&self, data: &mut ZCPacket) {
for pipeline in self.nic_packet_process_pipeline.read().await.iter().rev() {
pipeline.try_process_packet_from_nic(data).await;
Expand Down
48 changes: 46 additions & 2 deletions easytier/src/peers/peer_ospf_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ impl SyncedRouteInfo {
my_peer_id: PeerId,
foreign_networks: ForeignNetworkRouteInfoMap,
) -> bool {
let now = SystemTime::now();
let mut updated = false;
for mut item in self
.foreign_network
Expand All @@ -432,8 +433,14 @@ impl SyncedRouteInfo {
{
let (key, entry) = item.pair_mut();
if let Some(mut new_entry) = foreign_networks.get_mut(key) {
assert!(!new_entry.foreign_peer_ids.is_empty());
if let Some(is_newer) = is_foreign_network_info_newer(&new_entry, entry) {
if is_newer {
let need_renew = is_newer
|| now
.duration_since(entry.last_update.unwrap().try_into().unwrap())
.unwrap()
> UPDATE_PEER_INFO_PERIOD;
if need_renew {
new_entry.version = entry.version + 1;
*entry = new_entry.clone();
updated = true;
Expand All @@ -450,6 +457,7 @@ impl SyncedRouteInfo {
}

for item in foreign_networks.iter() {
assert!(!item.value().foreign_peer_ids.is_empty());
self.foreign_network
.entry(item.key().clone())
.and_modify(|v| panic!("key should not exist, {:?}", v))
Expand Down Expand Up @@ -1222,6 +1230,7 @@ impl PeerRouteServiceImpl {

if peer_infos.is_none()
&& conn_bitmap.is_none()
&& foreign_network.is_none()
&& !session.need_sync_initiator_info.load(Ordering::Relaxed)
{
return true;
Expand Down Expand Up @@ -1250,7 +1259,7 @@ impl PeerRouteServiceImpl {
is_initiator: session.we_are_initiator.load(Ordering::Relaxed),
peer_infos: peer_infos.clone().map(|x| RoutePeerInfos { items: x }),
conn_bitmap: conn_bitmap.clone().map(Into::into),
foreign_network_infos: foreign_network,
foreign_network_infos: foreign_network.clone(),
},
)
.await;
Expand Down Expand Up @@ -1294,6 +1303,10 @@ impl PeerRouteServiceImpl {
if let Some(conn_bitmap) = &conn_bitmap {
session.update_dst_saved_conn_bitmap_version(&conn_bitmap);
}

if let Some(foreign_network) = &foreign_network {
session.update_dst_saved_foreign_network_version(&foreign_network);
}
}
}
return false;
Expand Down Expand Up @@ -1329,6 +1342,7 @@ impl OspfRouteRpc for RouteSessionManager {
let is_initiator = request.is_initiator;
let peer_infos = request.peer_infos.map(|x| x.items);
let conn_bitmap = request.conn_bitmap.map(Into::into);
let foreign_network = request.foreign_network_infos;

let ret = self
.do_sync_route_info(
Expand All @@ -1337,6 +1351,7 @@ impl OspfRouteRpc for RouteSessionManager {
is_initiator,
peer_infos,
conn_bitmap,
foreign_network,
)
.await;

Expand Down Expand Up @@ -1565,6 +1580,7 @@ impl RouteSessionManager {
is_initiator: bool,
peer_infos: Option<Vec<RoutePeerInfo>>,
conn_bitmap: Option<RouteConnBitmap>,
foreign_network: Option<RouteForeignNetworkInfos>,
) -> Result<SyncRouteInfoResponse, Error> {
let Some(service_impl) = self.service_impl.upgrade() else {
return Err(Error::Stopped);
Expand All @@ -1591,6 +1607,13 @@ impl RouteSessionManager {
session.update_dst_saved_conn_bitmap_version(conn_bitmap);
}

if let Some(foreign_network) = &foreign_network {
service_impl
.synced_route_info
.update_foreign_network(&foreign_network);
session.update_dst_saved_foreign_network_version(foreign_network);
}

service_impl.update_route_table_and_cached_local_conn_bitmap();

tracing::info!(
Expand Down Expand Up @@ -1803,6 +1826,27 @@ impl Route for PeerRoute {
async fn dump(&self) -> String {
format!("{:#?}", self)
}

async fn list_foreign_network_info(&self) -> RouteForeignNetworkInfos {
let route_table = &self.service_impl.route_table;
let mut foreign_networks = RouteForeignNetworkInfos::default();
for item in self
.service_impl
.synced_route_info
.foreign_network
.iter()
.filter(|x| !x.value().foreign_peer_ids.is_empty())
.filter(|x| route_table.peer_reachable(x.key().peer_id))
{
foreign_networks
.infos
.push(route_foreign_network_infos::Info {
key: Some(item.key().clone()),
value: Some(item.value().clone()),
});
}
foreign_networks
}
}

impl PeerPacketFilter for Arc<PeerRoute> {}
Expand Down
8 changes: 7 additions & 1 deletion easytier/src/peers/route_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use dashmap::DashMap;

use crate::{
common::PeerId,
proto::peer_rpc::{ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey},
proto::peer_rpc::{
ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, RouteForeignNetworkInfos,
},
};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -79,6 +81,10 @@ pub trait Route {
None
}

async fn list_foreign_network_info(&self) -> RouteForeignNetworkInfos {
Default::default()
}

async fn set_route_cost_fn(&self, _cost_fn: RouteCostCalculator) {}

async fn dump(&self) -> String {
Expand Down
13 changes: 11 additions & 2 deletions easytier/src/peers/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::sync::Arc;
use crate::proto::{
cli::{
DumpRouteRequest, DumpRouteResponse, ListForeignNetworkRequest, ListForeignNetworkResponse,
ListPeerRequest, ListPeerResponse, ListRouteRequest, ListRouteResponse, PeerInfo,
PeerManageRpc, ShowNodeInfoRequest, ShowNodeInfoResponse,
ListGlobalForeignNetworkRequest, ListGlobalForeignNetworkResponse, ListPeerRequest,
ListPeerResponse, ListRouteRequest, ListRouteResponse, PeerInfo, PeerManageRpc,
ShowNodeInfoRequest, ShowNodeInfoResponse,
},
rpc_types::{self, controller::BaseController},
};
Expand Down Expand Up @@ -90,6 +91,14 @@ impl PeerManageRpc for PeerManagerRpcService {
Ok(reply)
}

async fn list_global_foreign_network(
&self,
_: BaseController,
_request: ListGlobalForeignNetworkRequest,
) -> Result<ListGlobalForeignNetworkResponse, rpc_types::error::Error> {
Ok(self.peer_manager.list_global_foreign_network().await)
}

async fn show_node_info(
&self,
_: BaseController,
Expand Down
19 changes: 19 additions & 0 deletions easytier/src/proto/cli.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,34 @@ message ListForeignNetworkRequest {}
message ForeignNetworkEntryPb { repeated PeerInfo peers = 1; }

message ListForeignNetworkResponse {
// foreign network in local
map<string, ForeignNetworkEntryPb> foreign_networks = 1;
}

message ListGlobalForeignNetworkRequest {}

message ListGlobalForeignNetworkResponse {
// foreign network in the entire network
message OneForeignNetwork {
string network_name = 1;
repeated uint32 peer_ids = 2;
string last_updated = 3;
uint32 version = 4;
}

message ForeignNetworks { repeated OneForeignNetwork foreign_networks = 1; }

map<uint32, ForeignNetworks> foreign_networks = 1;
}

service PeerManageRpc {
rpc ListPeer(ListPeerRequest) returns (ListPeerResponse);
rpc ListRoute(ListRouteRequest) returns (ListRouteResponse);
rpc DumpRoute(DumpRouteRequest) returns (DumpRouteResponse);
rpc ListForeignNetwork(ListForeignNetworkRequest)
returns (ListForeignNetworkResponse);
rpc ListGlobalForeignNetwork(ListGlobalForeignNetworkRequest)
returns (ListGlobalForeignNetworkResponse);
rpc ShowNodeInfo(ShowNodeInfoRequest) returns (ShowNodeInfoResponse);
}

Expand Down

0 comments on commit 783ba50

Please sign in to comment.