Skip to content

Commit

Permalink
feat(rust): add periodic interface scan and ebpf attachment
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Nov 21, 2024
1 parent e358181 commit 0483daf
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@ use aya::programs::{tc, Link, ProgramError, SchedClassifier, TcAttachType};
use aya::{Ebpf, EbpfError};
use aya_log::EbpfLogger;
use core::fmt::{Debug, Formatter};
use log::error;
use ockam_core::compat::collections::HashMap;
use ockam_core::errcode::{Kind, Origin};
use ockam_core::{Address, Error, Result};
use ockam_node::compat::asynchronous::Mutex as AsyncMutex;
use ockam_node::Context;
use ockam_transport_core::TransportError;
use rand::random;
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing::{debug, info, warn};

/// Interval at which we will get all addresses and attach eBPF to newly added interfaces
pub const INTERFACE_LIST_UPDATE_INTERVAL: Duration = Duration::from_secs(30);

/// eBPF support for [`TcpTransport`]
#[derive(Clone)]
pub struct TcpTransportEbpfSupport {
Expand All @@ -33,6 +39,9 @@ pub struct TcpTransportEbpfSupport {
raw_socket_processor_address: Address,

bpf: Arc<Mutex<Option<OckamBpf>>>,

// May be replaced with AtomicBool but should be careful with choosing the right Ordering
attach_ebpf_task_running: Arc<Mutex<bool>>,
}

struct IfaceLink {
Expand Down Expand Up @@ -62,6 +71,7 @@ impl Default for TcpTransportEbpfSupport {
tcp_packet_writer: Default::default(),
raw_socket_processor_address: Address::random_tagged("RawSocketProcessor"),
bpf: Default::default(),
attach_ebpf_task_running: Arc::new(Mutex::new(false)),
}
}
}
Expand All @@ -73,6 +83,67 @@ impl Debug for TcpTransportEbpfSupport {
}

impl TcpTransportEbpfSupport {
pub(crate) async fn attach_ebpf_to_all_interfaces(&self) -> Result<()> {
let interfaces_with_ebpf_attached: HashSet<Iface> =
self.links.lock().unwrap().keys().cloned().collect();

let ifaddrs = nix::ifaddrs::getifaddrs()
.map_err(|e| TransportError::ReadingNetworkInterfaces(e as i32))?;

for ifaddr in ifaddrs {
let addr = match ifaddr.address {
Some(addr) => addr,
None => continue,
};

// Check if it's an IPv4 address
if addr.as_sockaddr_in().is_none() {
continue;
};

let iface = ifaddr.interface_name;

if interfaces_with_ebpf_attached.contains(&iface) {
continue;
}

self.attach_ebpf_if_needed(iface)?;
}

Ok(())
}

pub(crate) async fn attach_ebpf_to_all_interfaces_loop(self) {
loop {
let res = self.attach_ebpf_to_all_interfaces().await;

if let Err(err) = res {
error!("Error attaching eBPF: {}", err)
}

tokio::time::sleep(INTERFACE_LIST_UPDATE_INTERVAL).await;
}
}

/// Will periodically get list of all interfaces and attach ockam eBPF
/// to both ingress and egress to each device (if wasn't attached yet) that has an IPv4 address.
/// More optimized approach would be to only attach to the interfaces we use, but that would
/// require figuring out which we can potentially use, which is currently tricky, especially
/// for the outlet, since figuring out which IP will be used to send a packet requires extra
/// effort, so the whole optimization may not be worth it.
pub(crate) async fn attach_ebpf_to_all_interfaces_start_task(&self) {
let mut is_running = self.attach_ebpf_task_running.lock().unwrap();

if *is_running {
return;
}

*is_running = true;
drop(is_running);
let s = self.clone();
tokio::spawn(s.attach_ebpf_to_all_interfaces_loop());
}

/// Start [`RawSocketProcessor`]. Should be done once.
pub(crate) async fn start_raw_socket_processor_if_needed(
&self,
Expand Down Expand Up @@ -113,7 +184,6 @@ impl TcpTransportEbpfSupport {

/// Init eBPF system
pub fn init_ebpf(&self) -> Result<()> {
// FIXME: eBPF I doubt we can reuse that instance for different interfaces.
let mut bpf_lock = self.bpf.lock().unwrap();
if bpf_lock.is_some() {
debug!("Skipping eBPF initialization");
Expand Down Expand Up @@ -146,7 +216,7 @@ impl TcpTransportEbpfSupport {

if let Err(e) = EbpfLogger::init(&mut ebpf) {
// This can happen if you remove all log statements from your eBPF program.
warn!("failed to initialize eBPF logger for ingress: {}", e);
warn!("failed to initialize eBPF logger: {}", e);
}

let inlet_port_map = aya::maps::HashMap::<_, Port, Proto>::try_from(
Expand Down Expand Up @@ -187,7 +257,6 @@ impl TcpTransportEbpfSupport {
let mut bpf_lock = self.bpf.lock().unwrap();
let bpf = bpf_lock.as_mut().unwrap();

// TODO: eBPF Avoid loading multiple times
let ingress_link = self.attach_ebpf_ingress(iface.clone(), bpf, skip_load)?;
let egress_link = self.attach_ebpf_egress(iface.clone(), bpf, skip_load)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use ockam_core::{Address, DenyAll, Result, Route};
use ockam_node::compat::asynchronous::{resolve_peer, RwLock};
use ockam_node::{ProcessorBuilder, WorkerBuilder};
use ockam_transport_core::{HostnamePort, TransportError};
use std::net::{IpAddr, SocketAddrV4};
use std::net::IpAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::mpsc::channel;
Expand Down Expand Up @@ -67,41 +67,26 @@ impl TcpTransport {

let next = outlet_route.next().cloned()?;

// TODO: eBPF Find correlation between bind_addr and iface?
let bind_addr = bind_addr.into();
let tcp_listener = TcpListener::bind(bind_addr.clone())
.await
.map_err(|_| TransportError::BindFailed)?;
let local_address = tcp_listener
.local_addr()
.map_err(|_| TransportError::BindFailed)?;
let ip = match local_address.ip() {
IpAddr::V4(ip) => ip,
IpAddr::V6(_) => return Err(TransportError::ExpectedIPv4Address)?,

if !local_address.ip().is_ipv4() {
return Err(TransportError::ExpectedIPv4Address)?;
};

let port = local_address.port();

let ifaddrs = nix::ifaddrs::getifaddrs()
.map_err(|e| TransportError::ReadingNetworkInterfaces(e as i32))?;
for ifaddr in ifaddrs {
let addr = match ifaddr.address {
Some(addr) => addr,
None => continue,
};

let addr = match addr.as_sockaddr_in() {
Some(addr) => *addr,
None => continue,
};

let addr = SocketAddrV4::from(addr);

if &ip == addr.ip() || ip.is_unspecified() {
// TODO: eBPF Should we instead attach to all interfaces & run a periodic task
// to identify network interfaces change?
self.attach_ebpf_if_needed(ifaddr.interface_name)?;
}
}
// Trigger immediate attach
self.ebpf_support.attach_ebpf_to_all_interfaces().await?;
// Start periodic updates if needed
self.ebpf_support
.attach_ebpf_to_all_interfaces_start_task()
.await;

let tcp_packet_writer = self.start_raw_socket_processor_if_needed().await?;

Expand Down Expand Up @@ -160,7 +145,7 @@ impl TcpTransport {

/// Stop the Privileged Inlet
#[instrument(skip(self), fields(port=port))]
pub async fn stop_privilegged_inlet(&self, port: Port) -> Result<()> {
pub async fn stop_privileged_inlet(&self, port: Port) -> Result<()> {
self.ebpf_support.inlet_registry.delete_inlet(port);

Ok(())
Expand Down Expand Up @@ -194,12 +179,12 @@ impl TcpTransport {
};
let dst_port = destination.port();

// TODO: eBPF Figure out which ifaces might be used and only attach to them
// TODO: eBPF Should we indeed attach to all interfaces & run a periodic task
// to identify network interfaces change?
for ifname in TcpTransport::all_interfaces_with_address()? {
self.attach_ebpf_if_needed(ifname)?;
}
// Trigger immediate attach
self.ebpf_support.attach_ebpf_to_all_interfaces().await?;
// Start periodic updates if needed
self.ebpf_support
.attach_ebpf_to_all_interfaces_start_task()
.await;

let write_handle = self.start_raw_socket_processor_if_needed().await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::TcpTransport;
use aya::programs::tc::{qdisc_detach_program, TcAttachType};
use log::{error, info, warn};
use ockam_core::Result;
use ockam_transport_core::TransportError;
use std::collections::HashSet;

impl TcpTransport {
/// Start [`RawSocketProcessor`]. Should be done once.
Expand All @@ -15,48 +15,26 @@ impl TcpTransport {
.await
}

// TODO: eBPF Should we dispatch it to the sync thread?
pub(crate) fn attach_ebpf_if_needed(&self, iface: Iface) -> Result<()> {
self.ebpf_support.attach_ebpf_if_needed(iface)
}

/// Detach the eBPFs.
pub fn detach_ebpfs(&self) {
self.ebpf_support.detach_ebpfs()
}

/// List all interfaces with defined IPv4 address
pub fn all_interfaces_with_address() -> Result<Vec<String>> {
let ifaddrs = nix::ifaddrs::getifaddrs()
.map_err(|e| TransportError::ReadingNetworkInterfaces(e as i32))?;
let ifaddrs = ifaddrs
.filter_map(|ifaddr| {
let addr = match ifaddr.address {
Some(addr) => addr,
None => return None,
};

addr.as_sockaddr_in()?;

Some(ifaddr.interface_name)
})
.collect::<Vec<_>>();

Ok(ifaddrs)
}

/// Detach all ockam eBPFs from all interfaces for all processes
pub fn detach_all_ockam_ebpfs_globally() {
// TODO: Not sure that the best way to do it, but it works.
info!("Detaching all ebpfs globally");
let ifaces = match Self::all_interfaces_with_address() {
let ifaces = match nix::ifaddrs::getifaddrs() {
Ok(ifaces) => ifaces,
Err(err) => {
error!("Error reading network interfaces: {}", err);
return;
}
};

// Remove duplicates
let ifaces: HashSet<Iface> = ifaces.into_iter().map(|i| i.interface_name).collect();

for iface in ifaces {
match qdisc_detach_program(&iface, TcAttachType::Ingress, "ockam_ingress") {
Ok(_) => {
Expand Down

0 comments on commit 0483daf

Please sign in to comment.