Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
fhilgers committed Dec 11, 2024
1 parent fa8ca9d commit 4cdee1e
Show file tree
Hide file tree
Showing 21 changed files with 628 additions and 700 deletions.
73 changes: 73 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/backend/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ user = ["aya"]

[dependencies]
aya = { workspace = true, optional = true }
bytemuck = { version = "1.20.0", features = ["derive"] }

[lib]
path = "src/lib.rs"
35 changes: 30 additions & 5 deletions rust/backend/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,33 @@
//
// SPDX-License-Identifier: MIT

use bytemuck::{checked::CheckedCastError, AnyBitPattern, CheckedBitPattern};


pub trait TryFromRaw: Sized {
fn try_from_raw(raw: &[u8]) -> Result<Self, CheckedCastError>;
}

impl TryFromRaw for VfsWriteCall {
fn try_from_raw(raw: &[u8]) -> Result<Self, CheckedCastError> {
Ok(*bytemuck::try_from_bytes(raw)?)
}
}

impl TryFromRaw for SysSendmsgCall {
fn try_from_raw(raw: &[u8]) -> Result<Self, CheckedCastError> {
Ok(*bytemuck::try_from_bytes(raw)?)
}
}

impl TryFromRaw for JNICall {
fn try_from_raw(raw: &[u8]) -> Result<Self, CheckedCastError> {
Ok(*bytemuck::checked::try_from_bytes(raw)?)
}
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, AnyBitPattern)]
pub struct VfsWriteCall {
pub pid: u32,
pub tid: u32,
Expand All @@ -30,7 +55,7 @@ impl VfsWriteCall {
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, AnyBitPattern)]
pub struct SysSendmsgCall {
pub pid: u32,
pub tid: u32,
Expand All @@ -51,8 +76,8 @@ impl SysSendmsgCall {
}
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
#[repr(u8)]
#[derive(Debug, Copy, Clone, CheckedBitPattern)]
pub enum JNIMethodName {
AddLocalRef,
DeleteLocalRef,
Expand All @@ -61,7 +86,7 @@ pub enum JNIMethodName {
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, CheckedBitPattern)]
pub struct JNICall {
pub pid: u32,
pub tid: u32,
Expand Down
2 changes: 2 additions & 0 deletions rust/backend/daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ aya-log = { workspace = true }
async-broadcast = { workspace = true }
object = { workspace = true }
serde = { workspace = true }
bytemuck = "1.20.0"
crossbeam = "0.8.4"

[build-dependencies]
cargo_metadata = { workspace = true }
Expand Down
139 changes: 55 additions & 84 deletions rust/backend/daemon/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,103 +4,83 @@
//
// SPDX-License-Identifier: MIT

use std::marker::PhantomData;
use std::io;

use async_broadcast::Sender;
use aya::maps::ring_buf::RingBufItem;
use aya::Ebpf;
use aya::maps::{MapData, MapError, RingBuf};
use tokio::io::unix::AsyncFd;
use tokio::{join, select};
use tonic::Status;
use tracing::error;
use backend_common::{JNICall, JNIMethodName, SysSendmsgCall, VfsWriteCall};
use backend_common::{JNICall, JNIMethodName, SysSendmsgCall, TryFromRaw, VfsWriteCall};
use shared::ziofa::{Event, JniReferencesEvent, SysSendmsgEvent, VfsWriteEvent};
use shared::ziofa::event::{EventData};
use shared::ziofa::jni_references_event;
use shared::ziofa::jni_references_event::{JniMethodName};

pub trait CollectFromMap {
const MAP_NAME: &'static str;
use crate::registry::{EbpfRegistry, RegistryGuard, RegistryItem, TypedRingBuffer};

fn convert(item: RingBufItem<'_>) -> Result<Event, Status>;
pub trait IntoEvent {
fn into_event(self) -> Event;
}

struct VfsWriteCollect;
struct JNICollect;
struct SysSendmsgCollect;

impl CollectFromMap for VfsWriteCollect {
const MAP_NAME: &'static str = "VFS_WRITE_EVENTS";

fn convert(item: RingBufItem<'_>) -> Result<Event, Status> {
let data = unsafe { &*(item.as_ptr() as *const VfsWriteCall) };
Ok(Event {
impl IntoEvent for VfsWriteCall {
fn into_event(self) -> Event {
Event {
event_data: Some(EventData::VfsWrite(VfsWriteEvent {
pid: data.pid,
tid: data.tid,
begin_time_stamp: data.begin_time_stamp,
fp: data.fp,
bytes_written: data.bytes_written as u64
pid: self.pid,
tid: self.tid,
begin_time_stamp: self.begin_time_stamp,
fp: self.fp,
bytes_written: self.bytes_written as u64
}))
})
}
}
}

impl CollectFromMap for JNICollect {
const MAP_NAME: &'static str = "JNI_REF_CALLS";

fn convert(item: RingBufItem<'_>) -> Result<Event, Status> {
let data = unsafe { &*(item.as_ptr() as *const JNICall) };

// manual cast from the ebpf (c rep.) typ to protobuf (rust rep.) type
let jni_method_name = match data.method_name {
JNIMethodName::AddLocalRef => jni_references_event::JniMethodName::AddLocalRef,
JNIMethodName::DeleteLocalRef => jni_references_event::JniMethodName::DeleteLocalRef,
JNIMethodName::AddGlobalRef => jni_references_event::JniMethodName::AddGlobalRef,
JNIMethodName::DeleteGlobalRef => jni_references_event::JniMethodName::DeleteGlobalRef,
};

Ok(Event {
event_data: Some(EventData::JniReferences(JniReferencesEvent {
pid: data.pid,
tid: data.tid,
begin_time_stamp: data.begin_time_stamp,
jni_method_name: i32::from(jni_method_name),
impl IntoEvent for SysSendmsgCall {
fn into_event(self) -> Event {
Event {
event_data: Some(EventData::SysSendmsg(SysSendmsgEvent {
pid: self.pid,
tid: self.tid,
begin_time_stamp: self.begin_time_stamp,
fd: self.fd,
duration_nano_sec: self.duration_nano_sec
}))
})
}
}
}


impl CollectFromMap for SysSendmsgCollect {
const MAP_NAME: &'static str = "SYS_SENDMSG_EVENTS";

fn convert(item: RingBufItem<'_>) -> Result<Event, Status> {
let data = unsafe { &*(item.as_ptr() as *const SysSendmsgCall) };
Ok(Event {
event_data: Some(EventData::SysSendmsg(SysSendmsgEvent {
pid: data.pid,
tid: data.tid,
begin_time_stamp: data.begin_time_stamp,
fd: data.fd,
duration_nano_sec: data.duration_nano_sec
impl IntoEvent for JNICall {
fn into_event(self) -> Event {
Event {
event_data: Some(EventData::JniReferences(JniReferencesEvent {
pid: self.pid,
tid: self.tid,
begin_time_stamp: self.begin_time_stamp,
jni_method_name: (match self.method_name {
JNIMethodName::AddLocalRef => JniMethodName::AddLocalRef,
JNIMethodName::DeleteLocalRef => JniMethodName::DeleteLocalRef,
JNIMethodName::AddGlobalRef => JniMethodName::AddGlobalRef,
JNIMethodName::DeleteGlobalRef => JniMethodName::DeleteGlobalRef,
}).into(),
}))
})
}
}
}

pub struct MultiCollector {
vfs_write: Option<Collector<VfsWriteCollect>>,
sys_sendmsg: Option<Collector<SysSendmsgCollect>>,
jni_event: Option<Collector<JNICollect>>,
vfs_write: Option<Collector<VfsWriteCall>>,
sys_sendmsg: Option<Collector<SysSendmsgCall>>,
jni_event: Option<Collector<JNICall>>,
}

impl MultiCollector {
pub fn from_ebpf(ebpf: &mut Ebpf) -> Result<Self, MapError> {
let vfs_write = Collector::<VfsWriteCollect>::from_ebpf(ebpf)?;
let sys_sendmsg = Collector::<SysSendmsgCollect>::from_ebpf(ebpf)?;
let jni_collect = Collector::<JNICollect>::from_ebpf(ebpf)?;
Ok(Self { vfs_write: Some(vfs_write), sys_sendmsg: Some(sys_sendmsg), jni_event: Some(jni_collect) })
pub fn from_registry(registry: &EbpfRegistry) -> Result<Self, io::Error> {
Ok(Self {
vfs_write: Some(Collector::from_registry_item(registry.event.vfs_write_events.clone())?),
sys_sendmsg: Some(Collector::from_registry_item(registry.event.sys_sendmsg_events.clone())?),
jni_event: Some(Collector::from_registry_item(registry.event.jni_ref_calls.clone())?),
})
}

pub async fn collect(&mut self, tx: Sender<Result<Event, Status>>, shutdown: tokio::sync::oneshot::Receiver<()>) -> Result<(), std::io::Error> {
Expand Down Expand Up @@ -160,32 +140,23 @@ impl MultiCollector {
}
}

pub struct Collector<T: CollectFromMap> {
map: AsyncFd<RingBuf<MapData>>,
_collector: PhantomData<T>,
}

impl<T: CollectFromMap> Collector<T> {
pub fn from_ebpf(ebpf: &mut Ebpf) -> Result<Self, MapError> {
let map: RingBuf<_> = ebpf.take_map(T::MAP_NAME)
.ok_or(MapError::InvalidName { name: T::MAP_NAME.to_string() })?
.try_into()?;

let map = AsyncFd::new(map)?;
pub struct Collector<T: IntoEvent + TryFromRaw>(AsyncFd<RegistryGuard<TypedRingBuffer<T>>>);

Ok(Self { map, _collector: PhantomData })
impl<T: IntoEvent + TryFromRaw> Collector<T> {
pub fn from_registry_item(item: RegistryItem<TypedRingBuffer<T>>) -> Result<Self, io::Error> {
let map = AsyncFd::new(item.take())?;
Ok(Self(map))
}

pub async fn collect(&mut self, tx: Sender<Result<Event, Status>>, mut shutdown: tokio::sync::oneshot::Receiver<()>) -> Result<(), std::io::Error> {
loop {
select! {
handle = self.map.readable_mut() => {
handle = self.0.readable_mut() => {
let mut handle = handle?;
let rb = handle.get_inner_mut();

while let Some(item) = rb.next() {
let event = T::convert(item);
match tx.broadcast(event).await {
match tx.broadcast(Ok(item.into_event())).await {
Ok(_) => {},
Err(async_broadcast::SendError(event)) => {
error!(
Expand Down
3 changes: 2 additions & 1 deletion rust/backend/daemon/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use std::net::SocketAddr;
pub(crate) const DEV_DEFAULT_FILE_PATH: &str = "./ziofa.json";

pub fn sock_addr() -> SocketAddr {
"[::1]:50051".parse().expect("is valid address")
"127.0.0.1:50051".parse().expect("is valid address")
}

pub const OATDUMP_PATH: &str = "/data/local/tmp/dump.json";
pub const ZIOFA_EBPF_PATH: &str = "/sys/fs/bpf/ziofa";
Loading

0 comments on commit 4cdee1e

Please sign in to comment.