From 0483daf9f6c61ffecdc3e7d50346156c36bfc7ab Mon Sep 17 00:00:00 2001 From: Oleksandr Deundiak Date: Wed, 20 Nov 2024 16:49:39 +0100 Subject: [PATCH] feat(rust): add periodic interface scan and ebpf attachment --- .../src/privileged_portal/ebpf_support.rs | 75 ++++++++++++++++++- .../privileged_portal/privileged_portals.rs | 51 +++++-------- .../src/privileged_portal/transport.rs | 32 ++------ 3 files changed, 95 insertions(+), 63 deletions(-) diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/ebpf_support.rs b/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/ebpf_support.rs index 1efb101bfb1..32bc87ffb42 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/ebpf_support.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/ebpf_support.rs @@ -9,6 +9,7 @@ use aya::programs::{tc, Link, ProgramError, SchedClassifier, TcAttachType}; use aya::{Ebpf, EbpfError}; use aya_log::EbpfLogger; use core::fmt::{Debug, Formatter}; +use log::error; use ockam_core::compat::collections::HashMap; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{Address, Error, Result}; @@ -16,9 +17,14 @@ use ockam_node::compat::asynchronous::Mutex as AsyncMutex; use ockam_node::Context; use ockam_transport_core::TransportError; use rand::random; +use std::collections::HashSet; use std::sync::{Arc, Mutex}; +use std::time::Duration; use tracing::{debug, info, warn}; +/// Interval at which we will get all addresses and attach eBPF to newly added interfaces +pub const INTERFACE_LIST_UPDATE_INTERVAL: Duration = Duration::from_secs(30); + /// eBPF support for [`TcpTransport`] #[derive(Clone)] pub struct TcpTransportEbpfSupport { @@ -33,6 +39,9 @@ pub struct TcpTransportEbpfSupport { raw_socket_processor_address: Address, bpf: Arc>>, + + // May be replaced with AtomicBool but should be careful with choosing the right Ordering + attach_ebpf_task_running: Arc>, } struct IfaceLink { @@ -62,6 +71,7 @@ impl Default for TcpTransportEbpfSupport { tcp_packet_writer: Default::default(), raw_socket_processor_address: Address::random_tagged("RawSocketProcessor"), bpf: Default::default(), + attach_ebpf_task_running: Arc::new(Mutex::new(false)), } } } @@ -73,6 +83,67 @@ impl Debug for TcpTransportEbpfSupport { } impl TcpTransportEbpfSupport { + pub(crate) async fn attach_ebpf_to_all_interfaces(&self) -> Result<()> { + let interfaces_with_ebpf_attached: HashSet = + self.links.lock().unwrap().keys().cloned().collect(); + + let ifaddrs = nix::ifaddrs::getifaddrs() + .map_err(|e| TransportError::ReadingNetworkInterfaces(e as i32))?; + + for ifaddr in ifaddrs { + let addr = match ifaddr.address { + Some(addr) => addr, + None => continue, + }; + + // Check if it's an IPv4 address + if addr.as_sockaddr_in().is_none() { + continue; + }; + + let iface = ifaddr.interface_name; + + if interfaces_with_ebpf_attached.contains(&iface) { + continue; + } + + self.attach_ebpf_if_needed(iface)?; + } + + Ok(()) + } + + pub(crate) async fn attach_ebpf_to_all_interfaces_loop(self) { + loop { + let res = self.attach_ebpf_to_all_interfaces().await; + + if let Err(err) = res { + error!("Error attaching eBPF: {}", err) + } + + tokio::time::sleep(INTERFACE_LIST_UPDATE_INTERVAL).await; + } + } + + /// Will periodically get list of all interfaces and attach ockam eBPF + /// to both ingress and egress to each device (if wasn't attached yet) that has an IPv4 address. + /// More optimized approach would be to only attach to the interfaces we use, but that would + /// require figuring out which we can potentially use, which is currently tricky, especially + /// for the outlet, since figuring out which IP will be used to send a packet requires extra + /// effort, so the whole optimization may not be worth it. + pub(crate) async fn attach_ebpf_to_all_interfaces_start_task(&self) { + let mut is_running = self.attach_ebpf_task_running.lock().unwrap(); + + if *is_running { + return; + } + + *is_running = true; + drop(is_running); + let s = self.clone(); + tokio::spawn(s.attach_ebpf_to_all_interfaces_loop()); + } + /// Start [`RawSocketProcessor`]. Should be done once. pub(crate) async fn start_raw_socket_processor_if_needed( &self, @@ -113,7 +184,6 @@ impl TcpTransportEbpfSupport { /// Init eBPF system pub fn init_ebpf(&self) -> Result<()> { - // FIXME: eBPF I doubt we can reuse that instance for different interfaces. let mut bpf_lock = self.bpf.lock().unwrap(); if bpf_lock.is_some() { debug!("Skipping eBPF initialization"); @@ -146,7 +216,7 @@ impl TcpTransportEbpfSupport { if let Err(e) = EbpfLogger::init(&mut ebpf) { // This can happen if you remove all log statements from your eBPF program. - warn!("failed to initialize eBPF logger for ingress: {}", e); + warn!("failed to initialize eBPF logger: {}", e); } let inlet_port_map = aya::maps::HashMap::<_, Port, Proto>::try_from( @@ -187,7 +257,6 @@ impl TcpTransportEbpfSupport { let mut bpf_lock = self.bpf.lock().unwrap(); let bpf = bpf_lock.as_mut().unwrap(); - // TODO: eBPF Avoid loading multiple times let ingress_link = self.attach_ebpf_ingress(iface.clone(), bpf, skip_load)?; let egress_link = self.attach_ebpf_egress(iface.clone(), bpf, skip_load)?; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/privileged_portals.rs b/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/privileged_portals.rs index b9b162f182d..93b73a65326 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/privileged_portals.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/privileged_portals.rs @@ -10,7 +10,7 @@ use ockam_core::{Address, DenyAll, Result, Route}; use ockam_node::compat::asynchronous::{resolve_peer, RwLock}; use ockam_node::{ProcessorBuilder, WorkerBuilder}; use ockam_transport_core::{HostnamePort, TransportError}; -use std::net::{IpAddr, SocketAddrV4}; +use std::net::IpAddr; use std::sync::Arc; use tokio::net::TcpListener; use tokio::sync::mpsc::channel; @@ -67,7 +67,6 @@ impl TcpTransport { let next = outlet_route.next().cloned()?; - // TODO: eBPF Find correlation between bind_addr and iface? let bind_addr = bind_addr.into(); let tcp_listener = TcpListener::bind(bind_addr.clone()) .await @@ -75,33 +74,19 @@ impl TcpTransport { let local_address = tcp_listener .local_addr() .map_err(|_| TransportError::BindFailed)?; - let ip = match local_address.ip() { - IpAddr::V4(ip) => ip, - IpAddr::V6(_) => return Err(TransportError::ExpectedIPv4Address)?, + + if !local_address.ip().is_ipv4() { + return Err(TransportError::ExpectedIPv4Address)?; }; + let port = local_address.port(); - let ifaddrs = nix::ifaddrs::getifaddrs() - .map_err(|e| TransportError::ReadingNetworkInterfaces(e as i32))?; - for ifaddr in ifaddrs { - let addr = match ifaddr.address { - Some(addr) => addr, - None => continue, - }; - - let addr = match addr.as_sockaddr_in() { - Some(addr) => *addr, - None => continue, - }; - - let addr = SocketAddrV4::from(addr); - - if &ip == addr.ip() || ip.is_unspecified() { - // TODO: eBPF Should we instead attach to all interfaces & run a periodic task - // to identify network interfaces change? - self.attach_ebpf_if_needed(ifaddr.interface_name)?; - } - } + // Trigger immediate attach + self.ebpf_support.attach_ebpf_to_all_interfaces().await?; + // Start periodic updates if needed + self.ebpf_support + .attach_ebpf_to_all_interfaces_start_task() + .await; let tcp_packet_writer = self.start_raw_socket_processor_if_needed().await?; @@ -160,7 +145,7 @@ impl TcpTransport { /// Stop the Privileged Inlet #[instrument(skip(self), fields(port=port))] - pub async fn stop_privilegged_inlet(&self, port: Port) -> Result<()> { + pub async fn stop_privileged_inlet(&self, port: Port) -> Result<()> { self.ebpf_support.inlet_registry.delete_inlet(port); Ok(()) @@ -194,12 +179,12 @@ impl TcpTransport { }; let dst_port = destination.port(); - // TODO: eBPF Figure out which ifaces might be used and only attach to them - // TODO: eBPF Should we indeed attach to all interfaces & run a periodic task - // to identify network interfaces change? - for ifname in TcpTransport::all_interfaces_with_address()? { - self.attach_ebpf_if_needed(ifname)?; - } + // Trigger immediate attach + self.ebpf_support.attach_ebpf_to_all_interfaces().await?; + // Start periodic updates if needed + self.ebpf_support + .attach_ebpf_to_all_interfaces_start_task() + .await; let write_handle = self.start_raw_socket_processor_if_needed().await?; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/transport.rs b/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/transport.rs index cf348e0f0a3..b839e7a23be 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/transport.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/transport.rs @@ -3,7 +3,7 @@ use crate::TcpTransport; use aya::programs::tc::{qdisc_detach_program, TcAttachType}; use log::{error, info, warn}; use ockam_core::Result; -use ockam_transport_core::TransportError; +use std::collections::HashSet; impl TcpTransport { /// Start [`RawSocketProcessor`]. Should be done once. @@ -15,41 +15,16 @@ impl TcpTransport { .await } - // TODO: eBPF Should we dispatch it to the sync thread? - pub(crate) fn attach_ebpf_if_needed(&self, iface: Iface) -> Result<()> { - self.ebpf_support.attach_ebpf_if_needed(iface) - } - /// Detach the eBPFs. pub fn detach_ebpfs(&self) { self.ebpf_support.detach_ebpfs() } - /// List all interfaces with defined IPv4 address - pub fn all_interfaces_with_address() -> Result> { - let ifaddrs = nix::ifaddrs::getifaddrs() - .map_err(|e| TransportError::ReadingNetworkInterfaces(e as i32))?; - let ifaddrs = ifaddrs - .filter_map(|ifaddr| { - let addr = match ifaddr.address { - Some(addr) => addr, - None => return None, - }; - - addr.as_sockaddr_in()?; - - Some(ifaddr.interface_name) - }) - .collect::>(); - - Ok(ifaddrs) - } - /// Detach all ockam eBPFs from all interfaces for all processes pub fn detach_all_ockam_ebpfs_globally() { // TODO: Not sure that the best way to do it, but it works. info!("Detaching all ebpfs globally"); - let ifaces = match Self::all_interfaces_with_address() { + let ifaces = match nix::ifaddrs::getifaddrs() { Ok(ifaces) => ifaces, Err(err) => { error!("Error reading network interfaces: {}", err); @@ -57,6 +32,9 @@ impl TcpTransport { } }; + // Remove duplicates + let ifaces: HashSet = ifaces.into_iter().map(|i| i.interface_name).collect(); + for iface in ifaces { match qdisc_detach_program(&iface, TcAttachType::Ingress, "ockam_ingress") { Ok(_) => {