diff --git a/frontend/.kotlin/sessions/kotlin-compiler-16272178719523561019.salive b/frontend/.kotlin/sessions/kotlin-compiler-16272178719523561019.salive new file mode 100644 index 00000000..e69de29b diff --git a/frontend/app/src/main/java/de/amosproj3/ziofa/api/WriteEvent.kt b/frontend/app/src/main/java/de/amosproj3/ziofa/api/WriteEvent.kt index c75b2a54..000664e1 100644 --- a/frontend/app/src/main/java/de/amosproj3/ziofa/api/WriteEvent.kt +++ b/frontend/app/src/main/java/de/amosproj3/ziofa/api/WriteEvent.kt @@ -23,6 +23,6 @@ sealed class WriteEvent( val pid: UInt, val tid: UInt, val beginTimestamp: ULong, - val durationMicros: ULong, - ) : WriteEvent(fd, pid, beginTimestamp, durationMicros) + val durationNanos: ULong, + ) : WriteEvent(fd, pid, beginTimestamp, durationNanos) } diff --git a/frontend/app/src/main/java/de/amosproj3/ziofa/bl/DataStreamManager.kt b/frontend/app/src/main/java/de/amosproj3/ziofa/bl/DataStreamManager.kt index c8973358..08c4bd6a 100644 --- a/frontend/app/src/main/java/de/amosproj3/ziofa/bl/DataStreamManager.kt +++ b/frontend/app/src/main/java/de/amosproj3/ziofa/bl/DataStreamManager.kt @@ -10,10 +10,12 @@ import de.amosproj3.ziofa.api.WriteEvent import de.amosproj3.ziofa.client.ClientFactory import de.amosproj3.ziofa.client.Event import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.mapNotNull import timber.log.Timber +// TODO: use a single sharedFlow and then different filters on top of that +// otherwise we are sending all the data multiple times from server to client class DataStreamManager(private val clientFactory: ClientFactory) : DataStreamProvider { override suspend fun counter(ebpfProgramName: String): Flow { @@ -36,27 +38,28 @@ class DataStreamManager(private val clientFactory: ClientFactory) : DataStreamPr clientFactory .connect() .initStream() - .filter { it is Event.VfsWrite } + .mapNotNull { it as? Event.VfsWrite } .map { - if (it is Event.VfsWrite) { - WriteEvent.VfsWriteEvent(it.fp, it.pid, it.bytesWritten, it.beginTimeStamp) - } else throw Exception("only for the compiler") + WriteEvent.VfsWriteEvent( + fd = it.fp, + pid = it.pid, + size = it.bytesWritten, + timestampMillis = it.beginTimeStamp, + ) } override suspend fun sendMessageEvents(pids: List): Flow = clientFactory .connect() .initStream() - .filter { it is Event.SysSendmsg } + .mapNotNull { it as? Event.SysSendmsg } .map { - if (it is Event.SysSendmsg) { - WriteEvent.SendMessageEvent( - it.fd.toULong(), - it.pid, - it.tid, - it.beginTimeStamp, - it.durationMicroSec, - ) - } else throw Exception("only for the compiler") + WriteEvent.SendMessageEvent( + fd = it.fd, + pid = it.pid, + tid = it.tid, + beginTimestamp = it.beginTimeStamp, + durationNanos = it.durationNanoSecs, + ) } } diff --git a/frontend/app/src/main/java/de/amosproj3/ziofa/ui/visualization/composables/EventList.kt b/frontend/app/src/main/java/de/amosproj3/ziofa/ui/visualization/composables/EventList.kt index 787553a7..48b17133 100644 --- a/frontend/app/src/main/java/de/amosproj3/ziofa/ui/visualization/composables/EventList.kt +++ b/frontend/app/src/main/java/de/amosproj3/ziofa/ui/visualization/composables/EventList.kt @@ -39,7 +39,7 @@ fun EventList(events: List) { when (event) { is WriteEvent.SendMessageEvent -> { Text( - text = (event.durationMicros / 1_000u).toString(), + text = (event.durationNanos / 1_000_000u).toString(), modifier = Modifier.weight(1f), ) } diff --git a/frontend/client/src/main/java/de/amosproj3/ziofa/client/Client.kt b/frontend/client/src/main/java/de/amosproj3/ziofa/client/Client.kt index 7230e885..bb023662 100644 --- a/frontend/client/src/main/java/de/amosproj3/ziofa/client/Client.kt +++ b/frontend/client/src/main/java/de/amosproj3/ziofa/client/Client.kt @@ -31,8 +31,8 @@ sealed class Event { val pid: UInt, val tid: UInt, val beginTimeStamp: ULong, - val fd: Int, - val durationMicroSec: ULong, + val fd: ULong, + val durationNanoSecs: ULong, ) : Event() } diff --git a/frontend/client/src/mock/java/de/amosproj3/ziofa/client/RustClient.kt b/frontend/client/src/mock/java/de/amosproj3/ziofa/client/RustClient.kt index c5b40c91..8ff28acc 100644 --- a/frontend/client/src/mock/java/de/amosproj3/ziofa/client/RustClient.kt +++ b/frontend/client/src/mock/java/de/amosproj3/ziofa/client/RustClient.kt @@ -94,8 +94,8 @@ object RustClient : Client { Event.SysSendmsg( pid = 12345u, tid = 1234u, - fd = 125123123, - durationMicroSec = + fd = 125123123u, + durationNanoSecs = (System.currentTimeMillis() + Random.nextLong(1000)).toULong(), beginTimeStamp = System.currentTimeMillis().toULong(), ) diff --git a/frontend/client/src/real/java/de.amosproj3.ziofa.client/RustClient.kt b/frontend/client/src/real/java/de.amosproj3.ziofa.client/RustClient.kt index 3256b18f..dcb7cbe1 100644 --- a/frontend/client/src/real/java/de.amosproj3.ziofa.client/RustClient.kt +++ b/frontend/client/src/real/java/de.amosproj3.ziofa.client/RustClient.kt @@ -42,7 +42,7 @@ private fun uniffi.shared.Event.into() = tid = d.v1.tid, beginTimeStamp = d.v1.beginTimeStamp, fd = d.v1.fd, - durationMicroSec = d.v1.durationMicroSec, + durationNanoSecs = d.v1.durationNanoSec, ) null -> null } diff --git a/rust/backend/common/src/lib.rs b/rust/backend/common/src/lib.rs index 47722a85..eb595214 100644 --- a/rust/backend/common/src/lib.rs +++ b/rust/backend/common/src/lib.rs @@ -31,13 +31,13 @@ pub struct SysSendmsgCall { pub pid: u32, pub tid: u32, pub begin_time_stamp: u64, - pub fd: i32, - pub duration_micro_sec: u64, // in microseconds + pub fd: u64, + pub duration_nano_sec: u64, // in nanoseconds } impl SysSendmsgCall { - pub fn new(pid: u32, tid: u32, begin_time_stamp: u64, fd: i32, duration_micro_sec: u64) -> Self { - Self { pid, tid, begin_time_stamp, fd, duration_micro_sec } + pub fn new(pid: u32, tid: u32, begin_time_stamp: u64, fd: u64, duration_nano_sec: u64) -> Self { + Self { pid, tid, begin_time_stamp, fd, duration_nano_sec } } } @@ -47,4 +47,4 @@ pub fn generate_id(pid: u32, tgid: u32) -> u64{ let tgid_u64 = tgid as u64; (pid_u64 << 32) | tgid_u64 -} \ No newline at end of file +} diff --git a/rust/backend/daemon/src/collector.rs b/rust/backend/daemon/src/collector.rs index f90b238b..a470e168 100644 --- a/rust/backend/daemon/src/collector.rs +++ b/rust/backend/daemon/src/collector.rs @@ -54,7 +54,7 @@ impl CollectFromMap for SysSendmsgCollect { tid: data.tid, begin_time_stamp: data.begin_time_stamp, fd: data.fd, - duration_micro_sec: data.duration_micro_sec + duration_nano_sec: data.duration_nano_sec })) }) } diff --git a/rust/backend/daemon/src/features.rs b/rust/backend/daemon/src/features.rs index 7473da21..ce5ff08f 100644 --- a/rust/backend/daemon/src/features.rs +++ b/rust/backend/daemon/src/features.rs @@ -65,7 +65,7 @@ impl SysSendmsgFeature { } fn update_pids(&mut self, ebpf: &mut Ebpf, pids: &[u32]) -> Result<(), EbpfError> { - let mut pids_to_track: HashMap<_, u32, u32> = ebpf.map_mut("PIDS_TO_TRACK") + let mut pids_to_track: HashMap<_, u32, u64> = ebpf.map_mut("PIDS_TO_TRACK") .ok_or(EbpfError::MapError( aya::maps::MapError::InvalidName { name: "PIDS_TO_TRACK".to_string(), @@ -242,4 +242,4 @@ impl VfsFeature { Ok(()) } -} \ No newline at end of file +} diff --git a/rust/backend/ebpf/src/sys_sendmsg.rs b/rust/backend/ebpf/src/sys_sendmsg.rs index 4bef2c46..cb9b172b 100644 --- a/rust/backend/ebpf/src/sys_sendmsg.rs +++ b/rust/backend/ebpf/src/sys_sendmsg.rs @@ -10,7 +10,7 @@ use backend_common::{generate_id, SysSendmsgCall}; pub static SYS_SENDMSG_MAP: RingBuf = RingBuf::with_byte_size(1024, 0); #[map(name = "PIDS_TO_TRACK")] -static PIDS_TO_TRACK: HashMap = HashMap::with_max_entries(1024, 0); +static PIDS_TO_TRACK: HashMap = HashMap::with_max_entries(4096, 0); #[map] static SYS_SENDMSG_TIMESTAMPS: HashMap = HashMap::with_max_entries(1024, 0); @@ -18,21 +18,15 @@ static SYS_SENDMSG_TIMESTAMPS: HashMap = HashMap::with_ma struct SysSendmsgIntern { begin_time_stamp: u64, - fd: i32, + fd: u64, } #[tracepoint] pub fn sys_enter_sendmsg(ctx: TracePointContext) -> u32 { - let pid = ctx.pid(); - - if unsafe { PIDS_TO_TRACK.get(&pid).is_none() } { - return 1; - } - - let id = generate_id(pid, ctx.tgid()); + let id = generate_id(ctx.pid(), ctx.tgid()); let begin_time_stamp; - let fd: i32; + let fd: u64; unsafe { fd = match ctx.read_at(16) { Ok(arg) => arg, @@ -54,6 +48,13 @@ pub fn sys_enter_sendmsg(ctx: TracePointContext) -> u32 { pub fn sys_exit_sendmsg(ctx: TracePointContext) -> u32 { let end_time = unsafe { bpf_ktime_get_ns() }; let pid = ctx.pid(); + + let duration_threshold_nano_sec = match unsafe { PIDS_TO_TRACK.get(&pid) } { + None => return 0, // pid should not be tracked + Some(duration) => duration, + }; + + let tgid = ctx.tgid(); let call_id = generate_id(pid, tgid); let data = match unsafe { SYS_SENDMSG_TIMESTAMPS.get(&call_id) } { @@ -62,8 +63,13 @@ pub fn sys_exit_sendmsg(ctx: TracePointContext) -> u32 { }; let _ = SYS_SENDMSG_TIMESTAMPS.remove(&call_id); - let duration_micro_sec = (end_time - data.begin_time_stamp)/1000; - let result_data = SysSendmsgCall::new(pid, tgid, data.begin_time_stamp, data.fd, duration_micro_sec); + let duration_nano_sec = end_time - data.begin_time_stamp; + + if duration_nano_sec < *duration_threshold_nano_sec { + return 0; + } + + let result_data = SysSendmsgCall::new(pid, tgid, data.begin_time_stamp, data.fd, duration_nano_sec); let mut entry = match SYS_SENDMSG_MAP.reserve::(0) { Some(entry) => entry, @@ -78,4 +84,4 @@ pub fn sys_exit_sendmsg(ctx: TracePointContext) -> u32 { 0 -} \ No newline at end of file +} diff --git a/rust/shared/proto/ziofa.proto b/rust/shared/proto/ziofa.proto index e24760b5..0130428a 100644 --- a/rust/shared/proto/ziofa.proto +++ b/rust/shared/proto/ziofa.proto @@ -67,6 +67,6 @@ message SysSendmsgEvent { uint32 pid = 1; uint32 tid = 2; uint64 begin_time_stamp = 3; - int32 fd = 4; - uint64 duration_micro_sec = 5; + uint64 fd = 4; + uint64 duration_nano_sec = 5; }