Skip to content

Commit

Permalink
Merge pull request #110 from amosproj/80-sendmsgConfigure
Browse files Browse the repository at this point in the history
chore(eBPF) set the threshold for a blocking call individually per pid
  • Loading branch information
fhilgers authored Dec 3, 2024
2 parents e8cb14b + 8cd794c commit ea47b0f
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 46 deletions.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ sealed class WriteEvent(
val pid: UInt,
val tid: UInt,
val beginTimestamp: ULong,
val durationMicros: ULong,
) : WriteEvent(fd, pid, beginTimestamp, durationMicros)
val durationNanos: ULong,
) : WriteEvent(fd, pid, beginTimestamp, durationNanos)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import de.amosproj3.ziofa.api.WriteEvent
import de.amosproj3.ziofa.client.ClientFactory
import de.amosproj3.ziofa.client.Event
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import timber.log.Timber

// TODO: use a single sharedFlow and then different filters on top of that
// otherwise we are sending all the data multiple times from server to client
class DataStreamManager(private val clientFactory: ClientFactory) : DataStreamProvider {

override suspend fun counter(ebpfProgramName: String): Flow<UInt> {
Expand All @@ -36,27 +38,28 @@ class DataStreamManager(private val clientFactory: ClientFactory) : DataStreamPr
clientFactory
.connect()
.initStream()
.filter { it is Event.VfsWrite }
.mapNotNull { it as? Event.VfsWrite }
.map {
if (it is Event.VfsWrite) {
WriteEvent.VfsWriteEvent(it.fp, it.pid, it.bytesWritten, it.beginTimeStamp)
} else throw Exception("only for the compiler")
WriteEvent.VfsWriteEvent(
fd = it.fp,
pid = it.pid,
size = it.bytesWritten,
timestampMillis = it.beginTimeStamp,
)
}

override suspend fun sendMessageEvents(pids: List<UInt>): Flow<WriteEvent.SendMessageEvent> =
clientFactory
.connect()
.initStream()
.filter { it is Event.SysSendmsg }
.mapNotNull { it as? Event.SysSendmsg }
.map {
if (it is Event.SysSendmsg) {
WriteEvent.SendMessageEvent(
it.fd.toULong(),
it.pid,
it.tid,
it.beginTimeStamp,
it.durationMicroSec,
)
} else throw Exception("only for the compiler")
WriteEvent.SendMessageEvent(
fd = it.fd,
pid = it.pid,
tid = it.tid,
beginTimestamp = it.beginTimeStamp,
durationNanos = it.durationNanoSecs,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fun EventList(events: List<WriteEvent>) {
when (event) {
is WriteEvent.SendMessageEvent -> {
Text(
text = (event.durationMicros / 1_000u).toString(),
text = (event.durationNanos / 1_000_000u).toString(),
modifier = Modifier.weight(1f),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ sealed class Event {
val pid: UInt,
val tid: UInt,
val beginTimeStamp: ULong,
val fd: Int,
val durationMicroSec: ULong,
val fd: ULong,
val durationNanoSecs: ULong,
) : Event()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ object RustClient : Client {
Event.SysSendmsg(
pid = 12345u,
tid = 1234u,
fd = 125123123,
durationMicroSec =
fd = 125123123u,
durationNanoSecs =
(System.currentTimeMillis() + Random.nextLong(1000)).toULong(),
beginTimeStamp = System.currentTimeMillis().toULong(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private fun uniffi.shared.Event.into() =
tid = d.v1.tid,
beginTimeStamp = d.v1.beginTimeStamp,
fd = d.v1.fd,
durationMicroSec = d.v1.durationMicroSec,
durationNanoSecs = d.v1.durationNanoSec,
)
null -> null
}
Expand Down
10 changes: 5 additions & 5 deletions rust/backend/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ pub struct SysSendmsgCall {
pub pid: u32,
pub tid: u32,
pub begin_time_stamp: u64,
pub fd: i32,
pub duration_micro_sec: u64, // in microseconds
pub fd: u64,
pub duration_nano_sec: u64, // in nanoseconds
}

impl SysSendmsgCall {
pub fn new(pid: u32, tid: u32, begin_time_stamp: u64, fd: i32, duration_micro_sec: u64) -> Self {
Self { pid, tid, begin_time_stamp, fd, duration_micro_sec }
pub fn new(pid: u32, tid: u32, begin_time_stamp: u64, fd: u64, duration_nano_sec: u64) -> Self {
Self { pid, tid, begin_time_stamp, fd, duration_nano_sec }
}
}

Expand All @@ -47,4 +47,4 @@ pub fn generate_id(pid: u32, tgid: u32) -> u64{
let tgid_u64 = tgid as u64;

(pid_u64 << 32) | tgid_u64
}
}
2 changes: 1 addition & 1 deletion rust/backend/daemon/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl CollectFromMap for SysSendmsgCollect {
tid: data.tid,
begin_time_stamp: data.begin_time_stamp,
fd: data.fd,
duration_micro_sec: data.duration_micro_sec
duration_nano_sec: data.duration_nano_sec
}))
})
}
Expand Down
4 changes: 2 additions & 2 deletions rust/backend/daemon/src/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl SysSendmsgFeature {
}

fn update_pids(&mut self, ebpf: &mut Ebpf, pids: &[u32]) -> Result<(), EbpfError> {
let mut pids_to_track: HashMap<_, u32, u32> = ebpf.map_mut("PIDS_TO_TRACK")
let mut pids_to_track: HashMap<_, u32, u64> = ebpf.map_mut("PIDS_TO_TRACK")
.ok_or(EbpfError::MapError(
aya::maps::MapError::InvalidName {
name: "PIDS_TO_TRACK".to_string(),
Expand Down Expand Up @@ -242,4 +242,4 @@ impl VfsFeature {

Ok(())
}
}
}
32 changes: 19 additions & 13 deletions rust/backend/ebpf/src/sys_sendmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,23 @@ use backend_common::{generate_id, SysSendmsgCall};
pub static SYS_SENDMSG_MAP: RingBuf = RingBuf::with_byte_size(1024, 0);

#[map(name = "PIDS_TO_TRACK")]
static PIDS_TO_TRACK: HashMap<u32, u32> = HashMap::with_max_entries(1024, 0);
static PIDS_TO_TRACK: HashMap<u32, u64> = HashMap::with_max_entries(4096, 0);

#[map]
static SYS_SENDMSG_TIMESTAMPS: HashMap<u64, SysSendmsgIntern> = HashMap::with_max_entries(1024, 0);


struct SysSendmsgIntern {
begin_time_stamp: u64,
fd: i32,
fd: u64,
}

#[tracepoint]
pub fn sys_enter_sendmsg(ctx: TracePointContext) -> u32 {
let pid = ctx.pid();

if unsafe { PIDS_TO_TRACK.get(&pid).is_none() } {
return 1;
}

let id = generate_id(pid, ctx.tgid());
let id = generate_id(ctx.pid(), ctx.tgid());

let begin_time_stamp;
let fd: i32;
let fd: u64;
unsafe {
fd = match ctx.read_at(16) {
Ok(arg) => arg,
Expand All @@ -54,6 +48,13 @@ pub fn sys_enter_sendmsg(ctx: TracePointContext) -> u32 {
pub fn sys_exit_sendmsg(ctx: TracePointContext) -> u32 {
let end_time = unsafe { bpf_ktime_get_ns() };
let pid = ctx.pid();

let duration_threshold_nano_sec = match unsafe { PIDS_TO_TRACK.get(&pid) } {
None => return 0, // pid should not be tracked
Some(duration) => duration,
};


let tgid = ctx.tgid();
let call_id = generate_id(pid, tgid);
let data = match unsafe { SYS_SENDMSG_TIMESTAMPS.get(&call_id) } {
Expand All @@ -62,8 +63,13 @@ pub fn sys_exit_sendmsg(ctx: TracePointContext) -> u32 {
};
let _ = SYS_SENDMSG_TIMESTAMPS.remove(&call_id);

let duration_micro_sec = (end_time - data.begin_time_stamp)/1000;
let result_data = SysSendmsgCall::new(pid, tgid, data.begin_time_stamp, data.fd, duration_micro_sec);
let duration_nano_sec = end_time - data.begin_time_stamp;

if duration_nano_sec < *duration_threshold_nano_sec {
return 0;
}

let result_data = SysSendmsgCall::new(pid, tgid, data.begin_time_stamp, data.fd, duration_nano_sec);

let mut entry = match SYS_SENDMSG_MAP.reserve::<SysSendmsgCall>(0) {
Some(entry) => entry,
Expand All @@ -78,4 +84,4 @@ pub fn sys_exit_sendmsg(ctx: TracePointContext) -> u32 {


0
}
}
4 changes: 2 additions & 2 deletions rust/shared/proto/ziofa.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ message SysSendmsgEvent {
uint32 pid = 1;
uint32 tid = 2;
uint64 begin_time_stamp = 3;
int32 fd = 4;
uint64 duration_micro_sec = 5;
uint64 fd = 4;
uint64 duration_nano_sec = 5;
}

0 comments on commit ea47b0f

Please sign in to comment.