Skip to content

Commit

Permalink
Implemented buffered writing of symbols to the client
Browse files Browse the repository at this point in the history
Co-Authored-By: Mr-Kanister <[email protected]>
Signed-off-by: Benedikt Zinn <[email protected]>
  • Loading branch information
BenediktZinn and Mr-Kanister committed Dec 4, 2024
1 parent ebf12aa commit 0f468a4
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 332 deletions.
92 changes: 39 additions & 53 deletions rust/backend/daemon/src/bin/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,11 @@
// SPDX-License-Identifier: MIT

use clap::Parser;
use shared::ziofa::{
GetAddressOfSymbolRequest, GetSymbolsOfProcessRequest,
};
use shared::{
config::{Configuration, SysSendmsgConfig, VfsWriteConfig},
ziofa::ziofa_client::ZiofaClient,
};
use tonic::transport::Channel;
use tonic::Request;

#[derive(Parser, Debug)]
struct Args {
Expand All @@ -30,59 +26,49 @@ struct Args {
pid: i32,
}

async fn test_get_symbols_of_process(
client: &mut ZiofaClient<Channel>,
pid: i32,
package_name: String,
verbose: bool,
async fn test_get_symbols(
_client: &mut ZiofaClient<Channel>,
_pid: i32,
_package_name: String,
_verbose: bool,
) {
println!("TEST get_symbols_of_process");

match client
.get_symbols_of_process(Request::new(GetSymbolsOfProcessRequest {
pid,
package_name,
}))
.await
{
Ok(res) => {
let names = res.into_inner().names;
println!("SUCCESS");
if verbose {
for (i, s) in names.iter().enumerate() {
println!("Symbol {}: {}", i, s);
}
}
}
Err(e) => println!("ERROR: {:?}", e),
};
println!();
todo!("implement");
// println!("TEST get_symbols_of_process");
//
// match () {
// Ok(res) => {
// let names = res.into_inner().names;
// println!("SUCCESS");
// if verbose {
// for (i, s) in names.iter().enumerate() {
// println!("Symbol {}: {}", i, s);
// }
// }
// }
// Err(e) => println!("ERROR: {:?}", e),
// };
// println!();
}

async fn test_get_address_of_symbol(
client: &mut ZiofaClient<Channel>,
name: String,
pid: i32,
package_name: String,
_client: &mut ZiofaClient<Channel>,
_name: String,
_pid: i32,
_package_name: String,
) {
println!("TEST get_address_of_symbol");

match client
.get_address_of_symbol(Request::new(GetAddressOfSymbolRequest {
name,
pid,
package_name,
}))
.await
{
Ok(res) => {
let offset = res.into_inner().offset;
println!("SUCCESS: {}", offset);
}
Err(e) => println!("ERROR: {:?}", e),
};

println!();
todo!("implement");
// println!("TEST get_address_of_symbol");
//
// match Ok(())
// {
// Ok(res) => {
// let offset = res.into_inner().offset;
// println!("SUCCESS: {}", offset);
// }
// Err(e) => println!("ERROR: {:?}", e),
// };
//
// println!();
}

async fn test_check_server(client: &mut ZiofaClient<Channel>) {
Expand Down Expand Up @@ -168,7 +154,7 @@ async fn main() {
let config = test_get_configuration(&mut client, args.verbose).await;
test_set_configuration(&mut client, config).await;
test_list_processes(&mut client, args.verbose).await;
test_get_symbols_of_process(
test_get_symbols(
&mut client,
args.pid,
"de.amosproj3.ziofa".to_string(),
Expand Down
2 changes: 0 additions & 2 deletions rust/backend/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ mod helpers;
mod procfs_utils;
mod server;
mod features;

mod symbols;

mod collector;
mod symbols_helpers;

pub async fn run_server() {
helpers::bump_rlimit();
Expand Down
1 change: 0 additions & 1 deletion rust/backend/daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ mod server;
mod features;
mod collector;
mod symbols;
mod symbols_helpers;

#[tokio::main]
async fn main() {
Expand Down
87 changes: 58 additions & 29 deletions rust/backend/daemon/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// SPDX-License-Identifier: MIT

use crate::collector::MultiCollector;
use crate::symbols::{self, get_symbol_offset_for_function_of_process, SymbolHandler};
use crate::symbols::SymbolHandler;
use crate::{
configuration, constants,
counter::Counter,
Expand All @@ -17,7 +17,7 @@ use async_broadcast::{broadcast, Receiver, Sender};
use aya::Ebpf;
use aya_log::EbpfLogger;
use shared::ziofa::{
Event, GetAddressOfSymbolRequest, GetAddressOfSymbolResponse, GetSymbolsOfProcessRequest,
Event, GetSymbolsRequest,
PidMessage, StringResponse, Symbol,
};
use shared::{
Expand All @@ -28,6 +28,7 @@ use shared::{
CheckServerResponse, ProcessList, SetConfigurationResponse,
},
};
use std::path::PathBuf;
use std::{ops::DerefMut, sync::Arc};
use tokio::join;
use tokio::sync::{mpsc, Mutex};
Expand Down Expand Up @@ -121,55 +122,83 @@ impl Ziofa for ZiofaImpl {
}

type GetOdexFilesStream = ReceiverStream<Result<StringResponse, Status>>;

// TODO: What is this function for?
async fn get_odex_files(
&self,
request: Request<PidMessage>,
) -> Result<Response<Self::GetOdexFilesStream>, Status> {
let pid = request.into_inner().pid;
let symbol_handler_guard = self.symbol_handler.lock().await;
let odex_paths = symbol_handler_guard.get_odex_paths(pid)?.clone();

let (tx, rx) = mpsc::channel(4);

let symbol_handler_clone = self.symbol_handler.clone();

// let symbol_handler_guard = self.symbol_handler.lock().await;
// let odex_paths = symbol_handler_guard.get_odex_paths(pid)?;

tokio::spawn(async move {
let mut symbol_handler_guard = symbol_handler_clone.lock().await;
// TODO Error Handling
let odex_paths = match symbol_handler_guard.get_odex_paths(pid) {
Ok(paths) => paths,
Err(e) => {
tx.send(Err(Status::from(e)))
.await
.expect("Error sending Error to client ._.");
return;
}
};

for path in odex_paths {
tx.send(Ok(StringResponse {
name: path.to_str().unwrap().to_string(),
}));
}))
.await
.expect("Error sending odex file to client");
}
});

Ok(Response::new(ReceiverStream::new(rx)))
}

type GetSymbolsOfProcessStream = ReceiverStream<Result<Symbol, Status>>;
type GetSymbolsStream = ReceiverStream<Result<Symbol, Status>>;

async fn get_symbols_of_process(
async fn get_symbols(
&self,
request: Request<GetSymbolsOfProcessRequest>,
) -> Result<Response<Self::GetSymbolsOfProcessStream>, Status> {
let process = request.into_inner();
let pid = process.pid;
let package_name = process.package_name;

Ok(Response::new(SymbolList {
names: symbols::get_symbols_of_pid(pid, &package_name).await?,
}))
}
request: Request<GetSymbolsRequest>,
) -> Result<Response<Self::GetSymbolsStream>, Status> {
let process_request = request.into_inner();
let pid = process_request.pid;
let odex_file_path_string = process_request.odex_file_path;
let odex_file_path = PathBuf::from(odex_file_path_string);

async fn get_address_of_symbol(
&self,
request: Request<GetAddressOfSymbolRequest>,
) -> Result<Response<GetAddressOfSymbolResponse>, Status> {
let request_inner = request.into_inner();
let symbol_name = request_inner.name;
let pid = request_inner.pid;
let package_name = request_inner.package_name;
let symbol_handler_clone = self.symbol_handler.clone();

let offset =
get_symbol_offset_for_function_of_process(pid, &package_name, &symbol_name).await?;
let (tx, rx) = mpsc::channel(4);
tokio::spawn(async move {
let mut symbol_handler_guard = symbol_handler_clone.lock().await;

let symbol = match symbol_handler_guard.get_symbols(pid, &odex_file_path).await{
Ok(symbol) => symbol,
Err(e) => {
tx.send(Err(Status::from(e)))
.await
.expect("Error sending Error to client ._.");
return;
}
};
for (symbol, offset) in symbol.iter() {
tx.send(Ok(Symbol{
method: symbol.to_string(),
offset: *offset,
}))
.await
.expect("Error sending odex file to client");
}
});

Ok(Response::new(GetAddressOfSymbolResponse { offset }))
Ok(Response::new(ReceiverStream::new(rx)))
}
}

Expand All @@ -191,7 +220,7 @@ pub async fn serve_forever() {
let mut state = State::new();
state.init(&mut ebpf).expect("should work");

let mut symbol_handler = Arc::new(Mutex::new(SymbolHandler::new()));
let symbol_handler = Arc::new(Mutex::new(SymbolHandler::new()));

let ebpf = Arc::new(Mutex::new(ebpf));
let state = Arc::new(Mutex::new(state));
Expand Down
Loading

0 comments on commit 0f468a4

Please sign in to comment.