Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Mr-Kanister <[email protected]>
  • Loading branch information
Mr-Kanister committed Dec 4, 2024
1 parent 7deabd2 commit 3f7a856
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 9 deletions.
41 changes: 36 additions & 5 deletions rust/backend/daemon/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// SPDX-License-Identifier: MIT

use crate::collector::MultiCollector;
use crate::symbols::{self, get_symbol_offset_for_function_of_process};
use crate::symbols::{self, get_symbol_offset_for_function_of_process, SymbolHandler};
use crate::{
configuration, constants,
counter::Counter,
Expand All @@ -18,7 +18,7 @@ use aya::Ebpf;
use aya_log::EbpfLogger;
use shared::ziofa::{
Event, GetAddressOfSymbolRequest, GetAddressOfSymbolResponse, GetSymbolsOfProcessRequest,
SymbolList,
PidMessage, StringResponse, Symbol,
};
use shared::{
config::Configuration,
Expand All @@ -30,26 +30,30 @@ use shared::{
};
use std::{ops::DerefMut, sync::Arc};
use tokio::join;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};

pub struct ZiofaImpl {
// tx: Option<Sender<Result<EbpfStreamObject, Status>>>,
ebpf: Arc<Mutex<Ebpf>>,
state: Arc<Mutex<State>>,
channel: Arc<Channel>,
symbol_handler: Arc<Mutex<SymbolHandler>>,
}

impl ZiofaImpl {
pub fn new(
ebpf: Arc<Mutex<Ebpf>>,
state: Arc<Mutex<State>>,
channel: Arc<Channel>,
symbol_handler: Arc<Mutex<SymbolHandler>>,
) -> ZiofaImpl {
ZiofaImpl {
ebpf,
state,
channel,
symbol_handler,
}
}
}
Expand Down Expand Up @@ -116,10 +120,34 @@ impl Ziofa for ZiofaImpl {
Ok(Response::new(self.channel.rx.clone()))
}

type GetOdexFilesStream = ReceiverStream<Result<StringResponse, Status>>;
async fn get_odex_files(
&self,
request: Request<PidMessage>,
) -> Result<Response<Self::GetOdexFilesStream>, Status> {
let pid = request.into_inner().pid;
let symbol_handler_guard = self.symbol_handler.lock().await;
let odex_paths = symbol_handler_guard.get_odex_paths(pid)?.clone();

let (tx, rx) = mpsc::channel(4);

tokio::spawn(async move {
for path in odex_paths {
tx.send(Ok(StringResponse {
name: path.to_str().unwrap().to_string(),
}));
}
});

Ok(Response::new(ReceiverStream::new(rx)))
}

type GetSymbolsOfProcessStream = ReceiverStream<Result<Symbol, Status>>;

async fn get_symbols_of_process(
&self,
request: Request<GetSymbolsOfProcessRequest>,
) -> Result<Response<SymbolList>, Status> {
) -> Result<Response<Self::GetSymbolsOfProcessStream>, Status> {
let process = request.into_inner();
let pid = process.pid;
let package_name = process.package_name;
Expand Down Expand Up @@ -163,9 +191,12 @@ pub async fn serve_forever() {
let mut state = State::new();
state.init(&mut ebpf).expect("should work");

let mut symbol_handler = Arc::new(Mutex::new(SymbolHandler::new()));

let ebpf = Arc::new(Mutex::new(ebpf));
let state = Arc::new(Mutex::new(state));
let ziofa_server = ZiofaServer::new(ZiofaImpl::new(ebpf.clone(), state, channel));
let ziofa_server =
ZiofaServer::new(ZiofaImpl::new(ebpf.clone(), state, channel, symbol_handler));
let counter_server = CounterServer::new(Counter::new(ebpf).await);

let serve = async move {
Expand Down
122 changes: 121 additions & 1 deletion rust/backend/daemon/src/symbols.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,128 @@

use crate::constants::OATDUMP_PATH;
use crate::symbols_helpers::{self, get_odex_files_for_pid};
use object::Symbol;
use procfs::process::{MMapPath, Process};
use procfs::ProcError;
use serde::{Deserialize, Serialize};
use serde_json::de::IoRead;
use serde_json::StreamDeserializer;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::hash::Hash;
use std::io::BufReader;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use symbols_helpers::{generate_json_oatdump, get_section_address};
use thiserror::Error;
use tokio::io::AsyncBufReadExt;

#[derive(Serialize, Deserialize, Debug)]
struct JsonSymbol {
method: String,
offset: String,
}

pub struct SymbolHandler {
/// maps pid to odex paths supplied by /proc/pid/maps
odex_paths: HashMap<i32, HashSet<PathBuf>>,
/// maps pid, odex file path and symbol name to offset
symbols: HashMap<i32, HashMap<PathBuf, HashMap<String, u64>>>,
}

impl SymbolHandler {
pub fn new() -> Self {
SymbolHandler {
odex_paths: HashMap::new(),
symbols: HashMap::new(),
}
}

/// loads the paths to all odex files
// TODO: blocking?
pub fn load_odex_paths(&mut self, pid: i32) -> Result<(), ProcError> {
let process = Process::new(pid)?;
let maps = process.maps()?;

// TODO: Check for old/potentially outdated entries and reload them
if self.odex_paths.contains_key(&pid) {
return Ok(());
}

self.odex_paths.insert(
pid,
maps.iter()
.filter_map(|mm_map| match mm_map.clone().pathname {
MMapPath::Path(path) => Some(path),
_ => None,
})
.filter(|path: &PathBuf| path.to_str().unwrap().contains(".odex"))
.collect(),
);

Ok(())
// TODO: Remove old/long unused paths from cache
}

pub fn get_odex_paths(&self, pid: i32) -> Result<&HashSet<PathBuf>, SymbolError> {
self.odex_paths
.get(&pid)
.ok_or(SymbolError::OdexPathsNotLoaded { pid })
}

pub async fn load_symbols(&mut self, pid: i32, odex_path: &PathBuf) -> Result<(), SymbolError> {
generate_json_oatdump(odex_path).await;

let json_file = tokio::fs::File::open(OATDUMP_PATH).await?;
let json_reader = tokio::io::BufReader::new(json_file);
let mut lines = json_reader.lines();

// check if pid already has a hash map, insert one otherwise
if !self.symbols.contains_key(&pid) {
self.symbols.insert(pid, HashMap::new());
}

// get map from odex file path to symbol map
let odex_to_symbol_map = self.symbols.get_mut(&pid).unwrap();

// if the wished odex file path already contains a symbol map, nothing needs to be done
// TODO: Check for old/potentially outdated entries and reload them
if !odex_to_symbol_map.contains_key(odex_path) {
odex_to_symbol_map.insert(odex_path.to_path_buf(), HashMap::new());
} else {
return Ok(());
}
let symbol_to_offset = odex_to_symbol_map.get_mut(odex_path).unwrap();

// store all symbols with their offsets in the map
for line in lines.next_line().await {
if line.is_none() {
break;
}
let symbol: JsonSymbol = serde_json::from_str(&line.unwrap())?;
let offset =
u64::from_str_radix(symbol.offset.strip_prefix("0x").unwrap(), 16).unwrap();
symbol_to_offset.insert(symbol.method, offset);
}

Ok(())
}

pub fn get_symbols(
&self,
pid: i32,
odex_path: &PathBuf,
) -> Result<&HashMap<String, u64>, SymbolError> {
self.symbols
.get(&pid)
.ok_or(SymbolError::OdexPathsNotLoaded { pid })?
.get(odex_path)
.ok_or(SymbolError::SymbolsNotLoaded {
pid,
odex_path: odex_path.to_path_buf(),
})
}
}

#[derive(Debug, Error)]
pub enum SymbolError {
#[error("Symbol doesn't exist")]
Expand All @@ -35,6 +141,12 @@ pub enum SymbolError {
Other { text: String },
#[error(transparent)]
ProcError(#[from] ProcError),
#[error("Odex paths are not loaded for specified pid")]
OdexPathsNotLoaded { pid: i32 },
#[error("Symbols are not loaded for specified pid and odex path")]
SymbolsNotLoaded { pid: i32, odex_path: PathBuf },
#[error(transparent)]
SerdeError(#[from] serde_json::Error),
}

impl From<SymbolError> for tonic::Status {
Expand Down Expand Up @@ -120,6 +232,14 @@ fn get_symbol_address_from_json(
})
}

pub async fn stream_symbols(
) -> Result<StreamDeserializer<'static, IoRead<BufReader<File>>, JsonSymbol>, SymbolError> {
let json_file = tokio::fs::File::open(OATDUMP_PATH).await?;
let json_reader = tokio::io::BufReader::new(json_file);
let lines = json_reader.lines();
lines.Ok(serde_json::Deserializer::from_reader(json_reader).into_iter::<JsonSymbol>())
}

fn get_oatdump_contents(
) -> Result<StreamDeserializer<'static, IoRead<BufReader<File>>, JsonSymbol>, SymbolError> {
let json_file = File::open(OATDUMP_PATH)?;
Expand Down
12 changes: 9 additions & 3 deletions rust/shared/proto/ziofa.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@ service Ziofa {

rpc InitStream(google.protobuf.Empty) returns (stream Event) {} // all Responses genereated by the ebpf-programms are send via this stream

rpc GetSymbolsOfProcess(GetSymbolsOfProcessRequest) returns (SymbolList){}
rpc GetOdexFiles(PidMessage) returns (stream StringResponse) {}
rpc GetSymbolsOfProcess(GetSymbolsOfProcessRequest) returns (stream Symbol){}
rpc GetAddressOfSymbol(GetAddressOfSymbolRequest) returns (GetAddressOfSymbolResponse){}

}

message StringResponse {
string name = 1;
}

message GetSymbolsOfProcessRequest {
int32 pid = 1;
string package_name = 2;
}

message SymbolList {
repeated string names = 1;
message Symbol {
string method = 1;
uint64 offset = 2;
}

message GetAddressOfSymbolRequest {
Expand Down

0 comments on commit 3f7a856

Please sign in to comment.