Skip to content

Commit

Permalink
runc: add eBPF exit snoop for sandbox recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
abel-von committed Sep 14, 2023
1 parent ad65ca9 commit f04a84c
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 10 deletions.
22 changes: 22 additions & 0 deletions runc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions runc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ prctl = "1.0.0"
os_pipe = "1.1.4"
byteorder = "1.4.3"
unshare = "0.7.0"
lazy_static = "1.4.0"
bcc = "0.0.33"

containerd-sandbox = { git = "https://github.com/kuasar-io/rust-extensions.git", rev = "6ae99540b754cd28c5389d5d6fdeff6ec7290ec5" }
containerd-shim = { git = "https://github.com/kuasar-io/rust-extensions.git", rev = "6ae99540b754cd28c5389d5d6fdeff6ec7290ec5", features = ["async"] }
runc = { git = "https://github.com/kuasar-io/rust-extensions.git", rev = "6ae99540b754cd28c5389d5d6fdeff6ec7290ec5", features = ["async"] }

runc = { git = "https://github.com/kuasar-io/rust-extensions.git", rev = "6ae99540b754cd28c5389d5d6fdeff6ec7290ec5", features = ["async"] }
36 changes: 36 additions & 0 deletions runc/src/exitsnoop.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#include<linux/sched.h>

struct process_exit_data_t {
u64 start_time;
u64 exit_time;
u32 pid;
u32 tid;
u32 ppid;
int exit_code;
u32 sig_info;
char task[TASK_COMM_LEN];
};

BPF_PERF_OUTPUT(events);

//TRACEPOINT_PROBE(sched, sched_process_exit)
int sched_process_exit_handler(struct tracepoint__sched__sched_process_exit *args)
{
struct task_struct *task = (typeof(task)) bpf_get_current_task();
struct process_exit_data_t data = {};
//data.start_time = PROCESS_START_TIME_NS,
if (task->pid != task->tgid) {
return 0;
}
data.start_time = task->start_time;
data.exit_time = bpf_ktime_get_ns();
data.pid = task-> tgid;
data.tid = task->pid;
data.ppid = task -> real_parent->tgid;
data.exit_code = task->exit_code >> 8;
data.sig_info = task->exit_code & 0xFF;
bpf_get_current_comm(&data.task, sizeof(data.task));

events.perf_submit(args, &data, sizeof(data));
return 0;
}
64 changes: 64 additions & 0 deletions runc/src/exitsnoop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use bcc::{BPF, Tracepoint};
use bcc::perf_event::PerfMapBuilder;
use containerd_shim::monitor::monitor_notify_by_pid;
use log::error;
use containerd_sandbox::error::Result;
use std::sync::{Arc, Mutex};

#[repr(C)]
struct process_exit_data_t {
start_time: u64,
exit_time: u64,
pid: u32,
tid: u32,
ppid: u32,
exit_code: i32,
sig_info: u32,
task: [u8; 16],
}

pub fn monit_process_exit(monitor_pid_list: Vec<u32>) -> Result<()> {
if monitor_pid_list.is_empty() {
return Ok(());
}
let pids = Arc::new(Mutex::new(monitor_pid_list));
std::thread::spawn(move || {
let code = include_str!("exitsnoop.c");
let mut module = BPF::new(code).unwrap();
Tracepoint::new().handler("sched_process_exit_handler").subsystem("sched").tracepoint("sched_process_exit").attach(&mut module).unwrap();
let table = module.table("events").unwrap();
let mut perf_map = PerfMapBuilder::new(table, || { data_callback(pids.clone()) }).build().unwrap();

while !pids.lock().unwrap().is_empty() {
perf_map.poll(200);
}
});
Ok(())
}

fn data_callback(monitor_pids: Arc<Mutex<Vec<u32>>>) -> Box<dyn FnMut(&[u8]) + Send> {
Box::new(move |x| {
let data = parse_struct(x);
let exit_code = if data.sig_info == 0 {
data.exit_code
} else {
(data.sig_info & 0x7F) as i32 + 128
};
monitor_pids.lock().unwrap().retain(|&pid| {
if data.pid == pid {
crate::RT.spawn(async move {
monitor_notify_by_pid(pid as i32, exit_code)
.await
.unwrap_or_else(|e| error!("failed to send exit event {}", e));
});
false
} else {
true
}
});
})
}

fn parse_struct(x: &[u8]) -> process_exit_data_t {
unsafe { std::ptr::read_unaligned(x.as_ptr() as *const process_exit_data_t) }
}
17 changes: 13 additions & 4 deletions runc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,24 @@ use nix::{
unistd::Pid,
};
use signal_hook_tokio::Signals;
use lazy_static::lazy_static;
use tokio::runtime::Runtime;

use crate::sandbox::RuncSandboxer;

mod sandbox;
mod exitsnoop;

pub const TASK_ADDRESS_SOCK: &str = "/run/kuasar/task.sock";
const DEFAULT_CONTAINERD_STATE_DIR: &str = "/run/containerd/";

lazy_static! {
pub static ref RT: Runtime = tokio::runtime::Runtime::new().unwrap();
}

fn main() {
env_logger::builder().format_timestamp_micros().init();
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async move {
RT.block_on(async move {
start_sandboxer().await.unwrap();
});
}
Expand All @@ -37,6 +43,7 @@ async fn start_sandboxer() -> anyhow::Result<()> {
.expect("new signal failed");
handle_signals(signals).await;
});

prctl::set_child_subreaper(true).unwrap();
let containerd_grpc_address = std::env::var("CONTAINERD_GRPC_ADDRESS").unwrap_or_default();
let containerd_ttrpc_address = std::env::var("CONTAINERD_TTRPC_ADDRESS").unwrap_or_default();
Expand All @@ -51,6 +58,8 @@ async fn start_sandboxer() -> anyhow::Result<()> {
}
}
}

process_exits(&sandboxer).await;
containerd_sandbox::run("runc-sandboxer", sandboxer)
.await
.unwrap();
Expand Down Expand Up @@ -100,7 +109,7 @@ async fn handle_signals(signals: Signals) {
}
}

pub async fn process_exits<F>(sandboxer: &RuncSandboxer) {
pub async fn process_exits(sandboxer: &RuncSandboxer) {
let sandboxes = sandboxer.sandboxes.clone();
let mut s = monitor_subscribe(Topic::Pid)
.await
Expand All @@ -117,7 +126,7 @@ pub async fn process_exits<F>(sandboxer: &RuncSandboxer) {
if sb_pid == pid as u32 {
let ts = time::OffsetDateTime::now_utc().unix_timestamp_nanos();
sb.status = SandboxStatus::Stopped(exit_code as u32, ts);
sb.stop().await.unwrap_or_default();
sb.exit_signal.signal();
}
}
}
Expand Down
22 changes: 18 additions & 4 deletions runc/src/sandbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use containerd_sandbox::data::{ContainerData, SandboxData};
use containerd_sandbox::error::{Error, Result};
use containerd_sandbox::signal::ExitSignal;
use containerd_shim::monitor::{ExitEvent, monitor_subscribe, monitor_unsubscribe, Subject, Subscription, Topic};
use log::{debug, warn, info};
use log::{debug, info, warn};
use nix::errno::Errno;
use nix::fcntl::OFlag;
use nix::mount::{mount, MsFlags};
Expand All @@ -27,6 +27,8 @@ use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::sync::{Mutex, RwLock};
use unshare::{Fd, Namespace};

use crate::exitsnoop::monit_process_exit;

const SHIM_COMMAND_PATH: &str = "/usr/local/bin/runc-task";

pub struct RuncSandboxer {
Expand Down Expand Up @@ -75,12 +77,17 @@ impl RuncSandboxer {

pub async fn recover(&self, dir: &str) -> Result<()> {
let mut subs = tokio::fs::read_dir(dir).await.map_err(Error::IO)?;
let mut pids = Vec::new();
while let Some(entry) = subs.next_entry().await.unwrap() {
if let Ok(t) = entry.file_type().await {
if t.is_dir() {
let path = Path::new(dir).join(entry.file_name());
match RuncSandbox::recover(&path).await {
Ok(sb) => {
if let SandboxStatus::Running(pid) = sb.status {
// TODO need to check if the sandbox process is still running.
pids.push(pid);
}
let sb_mutex = Arc::new(Mutex::new(sb));
self.sandboxes
.write()
Expand All @@ -95,6 +102,8 @@ impl RuncSandboxer {
}
}
}

monit_process_exit(pids)?;
Ok(())
}
}
Expand Down Expand Up @@ -256,7 +265,6 @@ impl RuncSandbox {
}
}
}
self.exit_signal.signal();
Ok(())
}

Expand Down Expand Up @@ -310,6 +318,12 @@ impl RuncSandbox {
.map_err(Error::IO)?;
let sb = serde_json::from_slice::<RuncSandbox>(content.as_slice())
.map_err(|e| anyhow!("failed to deserialize sandbox, {}", e))?;
let log_pipe = base_dir.as_ref().to_path_buf().join("log");
let mut sandbox_id = sb.id.to_string();
sandbox_id.truncate(8);
tokio::spawn(async move {
read_shim_log(&log_pipe, sandbox_id).await
});
Ok(sb)
}

Expand Down Expand Up @@ -400,7 +414,7 @@ fn parse_sockaddr(addr: &str) -> &str {
addr
}

async fn read_shim_log(log_pipe: &str, sandbox_id: String) -> Result<()> {
async fn read_shim_log<T: AsRef<Path>>(log_pipe: T, sandbox_id: String) -> Result<()> {
let f = File::open(&log_pipe).await?;
let mut reader = BufReader::new(f);
loop {
Expand All @@ -412,7 +426,7 @@ async fn read_shim_log(log_pipe: &str, sandbox_id: String) -> Result<()> {
// the "\n" is appended in the string when read_line,
// log will also append a "\n" when print a log,
// so we have to truncate it.
b.truncate(len-1);
b.truncate(len - 1);
debug!("shim-{}: {}", sandbox_id, b);
}
}
Expand Down

0 comments on commit f04a84c

Please sign in to comment.