Skip to content

Commit

Permalink
Merge pull request #108 from amosproj/sendmsg-setup
Browse files Browse the repository at this point in the history
feat: sendmsg collection and config
  • Loading branch information
BenediktZinn authored Nov 26, 2024
2 parents e341301 + 6aefa46 commit 72e2a70
Show file tree
Hide file tree
Showing 13 changed files with 277 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class ConfigurationManager(val clientFactory: ClientFactory) :
client!!.getConfiguration()
} catch (e: ClientException) {
// TODO this should be handled on the backend
client!!.setConfiguration(Configuration(vfsWrite = null, uprobes = listOf()))
client!!.setConfiguration(
Configuration(vfsWrite = null, sysSendmsg = null, uprobes = listOf())
)
client!!.getConfiguration()
}
configuration.update { ConfigurationUpdate.Valid(initializedConfiguration) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class ConfigurationViewModel(val configurationAccess: ConfigurationAccess) : Vie
VfsWriteConfig(this.vfsWriteOption.pids)
} else null

return Configuration(vfsWrite = vfsConfig, uprobes = listOf())
// TODO: sysSendmsg
return Configuration(vfsWrite = vfsConfig, sysSendmsg = null, uprobes = listOf())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ const val alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"

object RustClient : Client {
private var configuration: Configuration =
Configuration(vfsWrite = VfsWriteConfig(listOf(1234u, 43124u)), uprobes = listOf())
Configuration(
vfsWrite = VfsWriteConfig(listOf(1234u, 43124u)),
sysSendmsg = null,
uprobes = listOf(),
)

override suspend fun serverCount(): Flow<UInt> = flow {
var ctr = 0u
Expand Down
4 changes: 3 additions & 1 deletion rust/backend/daemon/src/bin/cli.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// SPDX-FileCopyrightText: 2024 Benedikt Zinn <[email protected]>
// SPDX-FileCopyrightText: 2024 Felix Hilgers <[email protected]>
// SPDX-FileCopyrightText: 2024 Robin Seidl <[email protected]>
//
// SPDX-License-Identifier: MIT

use clap::Parser;
use shared::{
config::{Configuration, VfsWriteConfig},
config::{Configuration, SysSendmsgConfig, VfsWriteConfig},
ziofa::ziofa_client::ZiofaClient,
};
use tonic::transport::Channel;
Expand Down Expand Up @@ -48,6 +49,7 @@ async fn test_get_configuration(client: &mut ZiofaClient<Channel>, verbose: bool
Configuration {
uprobes: vec![],
vfs_write: Some(VfsWriteConfig { pids: vec![] }),
sys_sendmsg: Some(SysSendmsgConfig { pids: vec![] }),
}
}
};
Expand Down
135 changes: 115 additions & 20 deletions rust/backend/daemon/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,134 @@
//
// SPDX-License-Identifier: MIT

use std::marker::PhantomData;

use async_broadcast::Sender;
use aya::maps::ring_buf::RingBufItem;
use aya::Ebpf;
use aya::maps::{MapData, MapError, RingBuf};
use tokio::io::unix::AsyncFd;
use tokio::select;
use tokio::{join, select};
use tonic::Status;
use tracing::error;
use backend_common::VfsWriteCall;
use shared::ziofa::{Event, VfsWriteEvent};
use backend_common::{SysSendmsgCall, VfsWriteCall};
use shared::ziofa::{Event, SysSendmsgEvent, VfsWriteEvent};
use shared::ziofa::event::{EventData};

pub struct VfsWriteCollector {
map: AsyncFd<RingBuf<MapData>>
pub trait CollectFromMap {
const MAP_NAME: &'static str;

fn convert(item: RingBufItem<'_>) -> Result<Event, Status>;
}

struct VfsWriteCollect;

impl CollectFromMap for VfsWriteCollect {
const MAP_NAME: &'static str = "VFS_WRITE_MAP";

fn convert(item: RingBufItem<'_>) -> Result<Event, Status> {
let data = unsafe { &*(item.as_ptr() as *const VfsWriteCall) };
Ok(Event {
event_data: Some(EventData::VfsWrite(VfsWriteEvent {
pid: data.pid,
tid: data.tid,
begin_time_stamp: data.begin_time_stamp,
fp: data.fp,
bytes_written: data.bytes_written as u64
}))
})
}
}

struct SysSendmsgCollect;

impl CollectFromMap for SysSendmsgCollect {
const MAP_NAME: &'static str = "SYS_SENDMSG_MAP";

fn convert(item: RingBufItem<'_>) -> Result<Event, Status> {
let data = unsafe { &*(item.as_ptr() as *const SysSendmsgCall) };
Ok(Event {
event_data: Some(EventData::SysSendmsg(SysSendmsgEvent {
pid: data.pid,
tid: data.tid,
begin_time_stamp: data.begin_time_stamp,
fd: data.fd,
duration_micro_sec: data.duration_micro_sec
}))
})
}
}

pub struct MultiCollector {
vfs_write: Option<Collector<VfsWriteCollect>>,
sys_sendmsg: Option<Collector<SysSendmsgCollect>>,
}

impl MultiCollector {
pub fn from_ebpf(ebpf: &mut Ebpf) -> Result<Self, MapError> {
let vfs_write = Collector::<VfsWriteCollect>::from_ebpf(ebpf)?;
let sys_sendmsg = Collector::<SysSendmsgCollect>::from_ebpf(ebpf)?;
Ok(Self { vfs_write: Some(vfs_write), sys_sendmsg: Some(sys_sendmsg) })
}

pub async fn collect(&mut self, tx: Sender<Result<Event, Status>>, shutdown: tokio::sync::oneshot::Receiver<()>) -> Result<(), std::io::Error> {

let (vfs_write_shutdown_tx, vfs_write_shutdown_rx) = tokio::sync::oneshot::channel();
let (sys_sendmsg_shutdown_tx, sys_sendmsg_shutdown_rx) = tokio::sync::oneshot::channel();

let cancellation_task = async move {
if shutdown.await.is_err() {
error!("Error while waiting for shutdown signal");
}
if vfs_write_shutdown_tx.send(()).is_err() {
error!("Error while cancelling vfs_write collector");
}
if sys_sendmsg_shutdown_tx.send(()).is_err() {
error!("Error while cancelling sys_sendmsg collector");
}
};

let vfs_write_tx = tx.clone();
let mut vfs_write = self.vfs_write.take().expect("vfs_write should be initialized");
let vfs_write_task = async {
vfs_write.collect(vfs_write_tx, vfs_write_shutdown_rx).await?;
Ok::<(), std::io::Error>(())
};

let sys_sendmsg_tx = tx;
let mut sys_sendmsg = self.sys_sendmsg.take().expect("sys_sendmsg should be initialized");
let sys_sendmsg_task = async {
sys_sendmsg.collect(sys_sendmsg_tx, sys_sendmsg_shutdown_rx).await?;
Ok::<(), std::io::Error>(())
};

let (_, vfs_write_result, sys_sendmsg_result) = join!(cancellation_task, vfs_write_task, sys_sendmsg_task);

self.vfs_write = Some(vfs_write);
self.sys_sendmsg = Some(sys_sendmsg);

// TODO: multiple errors
vfs_write_result?;
sys_sendmsg_result?;

Ok(())
}
}

pub struct Collector<T: CollectFromMap> {
map: AsyncFd<RingBuf<MapData>>,
_collector: PhantomData<T>,
}

impl VfsWriteCollector {
impl<T: CollectFromMap> Collector<T> {
pub fn from_ebpf(ebpf: &mut Ebpf) -> Result<Self, MapError> {
let map: RingBuf<_> = ebpf.take_map("VFS_WRITE_MAP")
.ok_or(MapError::InvalidName { name: "VFS_WRITE_MAP".to_string() })?
let map: RingBuf<_> = ebpf.take_map(T::MAP_NAME)
.ok_or(MapError::InvalidName { name: T::MAP_NAME.to_string() })?
.try_into()?;

let map = AsyncFd::new(map)?;

Ok(Self { map })
Ok(Self { map, _collector: PhantomData })
}

pub async fn collect(&mut self, tx: Sender<Result<Event, Status>>, mut shutdown: tokio::sync::oneshot::Receiver<()>) -> Result<(), std::io::Error> {
Expand All @@ -36,17 +140,8 @@ impl VfsWriteCollector {
let rb = handle.get_inner_mut();

while let Some(item) = rb.next() {
let data = unsafe { &*(item.as_ptr() as *const VfsWriteCall) };
let event = Event {
event_data: Some(EventData::VfsWrite(VfsWriteEvent {
pid: data.pid,
tid: data.tid,
begin_time_stamp: data.begin_time_stamp,
fp: data.fp,
bytes_written: data.bytes_written as u64
}))
};
match tx.broadcast(Ok(event)).await {
let event = T::convert(item);
match tx.broadcast(event).await {
Ok(_) => {},
Err(async_broadcast::SendError(event)) => {
error!(
Expand Down
10 changes: 8 additions & 2 deletions rust/backend/daemon/src/ebpf_utils.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
// SPDX-FileCopyrightText: 2024 Felix Hilgers <[email protected]>
// SPDX-FileCopyrightText: 2024 Franz Schlicht <[email protected]>
// SPDX-FileCopyrightText: 2024 Robin Seidl <[email protected]>
// SPDX-FileCopyrightText: 2024 Tom Weisshuhn <[email protected]>
//
// SPDX-License-Identifier: MIT

use aya::{Ebpf, EbpfError};
use shared::config::Configuration;
use thiserror::Error;

use crate::features::VfsFeature;
use crate::features::{SysSendmsgFeature, VfsFeature};

#[derive(Debug, Error)]
pub enum EbpfErrorWrapper {
Expand All @@ -23,27 +25,31 @@ impl From<EbpfErrorWrapper> for tonic::Status {

pub struct State {
vfs_write_feature: VfsFeature,
sys_sendmsg_feature: SysSendmsgFeature,
}

impl State {
pub fn new() -> State {
State {
vfs_write_feature: VfsFeature::new(),
sys_sendmsg_feature: SysSendmsgFeature::new(),
}
}

pub fn init(&mut self, ebpf: &mut Ebpf) -> Result<(), EbpfError> {
self.vfs_write_feature.create(ebpf)?;
self.sys_sendmsg_feature.create(ebpf)?;

Ok(())
}

pub fn update_from_config(
&mut self,
ebpf: &mut Ebpf,
_config_path: &str,
config: &Configuration,
) -> Result<(), EbpfError> {
self.vfs_write_feature.attach(ebpf)?;
self.sys_sendmsg_feature.apply(ebpf, config.sys_sendmsg.as_ref())?;

Ok(())
}
Expand Down
Loading

0 comments on commit 72e2a70

Please sign in to comment.