Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(eBPF) set the threshold for a blocking call individually per pid #110

Merged
merged 4 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ sealed class WriteEvent(
val pid: UInt,
val tid: UInt,
val beginTimestamp: ULong,
val durationMicros: ULong,
) : WriteEvent(fd, pid, beginTimestamp, durationMicros)
val durationNanos: ULong,
) : WriteEvent(fd, pid, beginTimestamp, durationNanos)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import de.amosproj3.ziofa.api.WriteEvent
import de.amosproj3.ziofa.client.ClientFactory
import de.amosproj3.ziofa.client.Event
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import timber.log.Timber

// TODO: use a single sharedFlow and then different filters on top of that
// otherwise we are sending all the data multiple times from server to client
class DataStreamManager(private val clientFactory: ClientFactory) : DataStreamProvider {

override suspend fun counter(ebpfProgramName: String): Flow<UInt> {
Expand All @@ -36,27 +38,28 @@ class DataStreamManager(private val clientFactory: ClientFactory) : DataStreamPr
clientFactory
.connect()
.initStream()
.filter { it is Event.VfsWrite }
.mapNotNull { it as? Event.VfsWrite }
.map {
if (it is Event.VfsWrite) {
WriteEvent.VfsWriteEvent(it.fp, it.pid, it.bytesWritten, it.beginTimeStamp)
} else throw Exception("only for the compiler")
WriteEvent.VfsWriteEvent(
fd = it.fp,
pid = it.pid,
size = it.bytesWritten,
timestampMillis = it.beginTimeStamp,
)
}

override suspend fun sendMessageEvents(pids: List<UInt>): Flow<WriteEvent.SendMessageEvent> =
clientFactory
.connect()
.initStream()
.filter { it is Event.SysSendmsg }
.mapNotNull { it as? Event.SysSendmsg }
.map {
if (it is Event.SysSendmsg) {
WriteEvent.SendMessageEvent(
it.fd.toULong(),
it.pid,
it.tid,
it.beginTimeStamp,
it.durationMicroSec,
)
} else throw Exception("only for the compiler")
WriteEvent.SendMessageEvent(
fd = it.fd,
pid = it.pid,
tid = it.tid,
beginTimestamp = it.beginTimeStamp,
durationNanos = it.durationNanoSecs,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fun EventList(events: List<WriteEvent>) {
when (event) {
is WriteEvent.SendMessageEvent -> {
Text(
text = (event.durationMicros / 1_000u).toString(),
text = (event.durationNanos / 1_000_000u).toString(),
modifier = Modifier.weight(1f),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ sealed class Event {
val pid: UInt,
val tid: UInt,
val beginTimeStamp: ULong,
val fd: Int,
val durationMicroSec: ULong,
val fd: ULong,
val durationNanoSecs: ULong,
) : Event()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ object RustClient : Client {
Event.SysSendmsg(
pid = 12345u,
tid = 1234u,
fd = 125123123,
durationMicroSec =
fd = 125123123u,
durationNanoSecs =
(System.currentTimeMillis() + Random.nextLong(1000)).toULong(),
beginTimeStamp = System.currentTimeMillis().toULong(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private fun uniffi.shared.Event.into() =
tid = d.v1.tid,
beginTimeStamp = d.v1.beginTimeStamp,
fd = d.v1.fd,
durationMicroSec = d.v1.durationMicroSec,
durationNanoSecs = d.v1.durationNanoSec,
)
null -> null
}
Expand Down
10 changes: 5 additions & 5 deletions rust/backend/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ pub struct SysSendmsgCall {
pub pid: u32,
pub tid: u32,
pub begin_time_stamp: u64,
pub fd: i32,
pub duration_micro_sec: u64, // in microseconds
pub fd: u64,
pub duration_nano_sec: u64, // in nanoseconds
}

impl SysSendmsgCall {
pub fn new(pid: u32, tid: u32, begin_time_stamp: u64, fd: i32, duration_micro_sec: u64) -> Self {
Self { pid, tid, begin_time_stamp, fd, duration_micro_sec }
pub fn new(pid: u32, tid: u32, begin_time_stamp: u64, fd: u64, duration_nano_sec: u64) -> Self {
Self { pid, tid, begin_time_stamp, fd, duration_nano_sec }
}
}

Expand All @@ -47,4 +47,4 @@ pub fn generate_id(pid: u32, tgid: u32) -> u64{
let tgid_u64 = tgid as u64;

(pid_u64 << 32) | tgid_u64
}
}
2 changes: 1 addition & 1 deletion rust/backend/daemon/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl CollectFromMap for SysSendmsgCollect {
tid: data.tid,
begin_time_stamp: data.begin_time_stamp,
fd: data.fd,
duration_micro_sec: data.duration_micro_sec
duration_nano_sec: data.duration_nano_sec
}))
})
}
Expand Down
4 changes: 2 additions & 2 deletions rust/backend/daemon/src/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl SysSendmsgFeature {
}

fn update_pids(&mut self, ebpf: &mut Ebpf, pids: &[u32]) -> Result<(), EbpfError> {
let mut pids_to_track: HashMap<_, u32, u32> = ebpf.map_mut("PIDS_TO_TRACK")
let mut pids_to_track: HashMap<_, u32, u64> = ebpf.map_mut("PIDS_TO_TRACK")
.ok_or(EbpfError::MapError(
aya::maps::MapError::InvalidName {
name: "PIDS_TO_TRACK".to_string(),
Expand Down Expand Up @@ -242,4 +242,4 @@ impl VfsFeature {

Ok(())
}
}
}
32 changes: 19 additions & 13 deletions rust/backend/ebpf/src/sys_sendmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,23 @@ use backend_common::{generate_id, SysSendmsgCall};
pub static SYS_SENDMSG_MAP: RingBuf = RingBuf::with_byte_size(1024, 0);

#[map(name = "PIDS_TO_TRACK")]
static PIDS_TO_TRACK: HashMap<u32, u32> = HashMap::with_max_entries(1024, 0);
static PIDS_TO_TRACK: HashMap<u32, u64> = HashMap::with_max_entries(4096, 0);

#[map]
static SYS_SENDMSG_TIMESTAMPS: HashMap<u64, SysSendmsgIntern> = HashMap::with_max_entries(1024, 0);


struct SysSendmsgIntern {
begin_time_stamp: u64,
fd: i32,
fd: u64,
}

#[tracepoint]
pub fn sys_enter_sendmsg(ctx: TracePointContext) -> u32 {
let pid = ctx.pid();

if unsafe { PIDS_TO_TRACK.get(&pid).is_none() } {
return 1;
}

let id = generate_id(pid, ctx.tgid());
der-whity marked this conversation as resolved.
Show resolved Hide resolved
let id = generate_id(ctx.pid(), ctx.tgid());

let begin_time_stamp;
let fd: i32;
let fd: u64;
unsafe {
fd = match ctx.read_at(16) {
Ok(arg) => arg,
Expand All @@ -54,6 +48,13 @@ pub fn sys_enter_sendmsg(ctx: TracePointContext) -> u32 {
pub fn sys_exit_sendmsg(ctx: TracePointContext) -> u32 {
let end_time = unsafe { bpf_ktime_get_ns() };
let pid = ctx.pid();

let duration_threshold_nano_sec = match unsafe { PIDS_TO_TRACK.get(&pid) } {
None => return 0, // pid should not be tracked
Some(duration) => duration,
};


let tgid = ctx.tgid();
let call_id = generate_id(pid, tgid);
let data = match unsafe { SYS_SENDMSG_TIMESTAMPS.get(&call_id) } {
Expand All @@ -62,8 +63,13 @@ pub fn sys_exit_sendmsg(ctx: TracePointContext) -> u32 {
};
let _ = SYS_SENDMSG_TIMESTAMPS.remove(&call_id);

let duration_micro_sec = (end_time - data.begin_time_stamp)/1000;
let result_data = SysSendmsgCall::new(pid, tgid, data.begin_time_stamp, data.fd, duration_micro_sec);
let duration_nano_sec = end_time - data.begin_time_stamp;

if duration_nano_sec < *duration_threshold_nano_sec {
return 0;
}

let result_data = SysSendmsgCall::new(pid, tgid, data.begin_time_stamp, data.fd, duration_nano_sec);

let mut entry = match SYS_SENDMSG_MAP.reserve::<SysSendmsgCall>(0) {
Some(entry) => entry,
Expand All @@ -78,4 +84,4 @@ pub fn sys_exit_sendmsg(ctx: TracePointContext) -> u32 {


0
}
}
4 changes: 2 additions & 2 deletions rust/shared/proto/ziofa.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ message SysSendmsgEvent {
uint32 pid = 1;
uint32 tid = 2;
uint64 begin_time_stamp = 3;
int32 fd = 4;
uint64 duration_micro_sec = 5;
uint64 fd = 4;
uint64 duration_nano_sec = 5;
}