Skip to content

Commit

Permalink
runc: use a unique task server to serve all task api
Browse files Browse the repository at this point in the history
  • Loading branch information
abel-von committed Sep 6, 2023
1 parent 71ee111 commit 79640ed
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 65 deletions.
27 changes: 20 additions & 7 deletions runc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::ffi::CString;
use std::fs::File;
use std::io::{Read, Write};
use std::os::fd::RawFd;
use std::path::Path;
use std::process::{exit, id};

use anyhow::anyhow;
Expand All @@ -15,18 +16,21 @@ use nix::{errno::Errno, libc, NixPath, sys::{
}, unistd::Pid};
use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag};
use nix::sched::{CloneFlags, setns, unshare};
use nix::sys::signal::{SaFlags, SigAction, SigHandler, SigSet, sigaction, SIGCHLD};
use nix::sys::signal::{SaFlags, SigAction, sigaction, SIGCHLD, SigHandler, SigSet};
use nix::sys::stat::Mode;
use nix::unistd::{close, fork, ForkResult, pause, pipe, read, write};
use prctl::PrctlMM;
use signal_hook_tokio::Signals;
use tokio::fs::create_dir_all;

use crate::sandbox::{RuncSandboxer, SandboxParent};

mod sandbox;
mod runc;
mod common;

pub const TASK_ADDRESS_SOCK: &str = "/run/kuasar/task.sock";

fn main() {
env_logger::builder().format_timestamp_micros().init();
let sandbox_parent = fork_sandbox_parent().unwrap();
Expand Down Expand Up @@ -60,9 +64,9 @@ fn fork_sandbox_parent() -> Result<SandboxParent, anyhow::Error> {
let sig_action = SigAction::new(
SigHandler::Handler(sandbox_parent_handle_signals),
SaFlags::empty(),
SigSet::empty()
SigSet::empty(),
);
unsafe {sigaction(SIGCHLD, &sig_action).unwrap();}
unsafe { sigaction(SIGCHLD, &sig_action).unwrap(); }
loop {
let buffer = read_count(reqr, 512).unwrap();
let id = String::from_utf8_lossy(&buffer[0..64]).to_string();
Expand Down Expand Up @@ -151,11 +155,11 @@ fn fork_sandbox(id: &str, netns: &str) -> Result<i32, anyhow::Error> {
}

fn set_process_comm(addr: u64, len: u64) {
if let Err(_) = prctl::set_mm(PrctlMM::PR_SET_MM_ARG_START, addr) {
if let Err(_) = prctl::set_mm(PrctlMM::PR_SET_MM_ARG_START, addr) {
prctl::set_mm(PrctlMM::PR_SET_MM_ARG_END, addr + len).unwrap();
prctl::set_mm(PrctlMM::PR_SET_MM_ARG_START, addr).unwrap()
} else {
prctl::set_mm(PrctlMM::PR_SET_MM_ARG_END, addr + len).unwrap();
prctl::set_mm(PrctlMM::PR_SET_MM_ARG_END, addr + len).unwrap();
}
}

Expand All @@ -166,8 +170,17 @@ async fn start_sandboxer(sandbox_parent: SandboxParent) -> anyhow::Result<()> {
handle_signals(signals).await;
});
prctl::set_child_subreaper(true).unwrap();

let sandboxer = RuncSandboxer::new(sandbox_parent);
let sock_path = Path::new(TASK_ADDRESS_SOCK);
if sock_path.exists() {
tokio::fs::remove_file(sock_path).await?;
}
if let Some(sock_parent) = sock_path.parent() {
create_dir_all(sock_parent)
.await
.map_err(|e| anyhow!("failed to create {}, {}", sock_parent.display(), e))?;
}
let sock_addr = format!("unix://{}", TASK_ADDRESS_SOCK);
let sandboxer = RuncSandboxer::new(sandbox_parent, &sock_addr).await?;
containerd_sandbox::run("runc-sandboxer", sandboxer)
.await
.unwrap();
Expand Down
12 changes: 1 addition & 11 deletions runc/src/runc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,7 @@ pub type InitProcess = ProcessTemplate<RuncInitLifecycle>;
pub type RuncContainer = ContainerTemplate<InitProcess, ExecProcess, RuncExecFactory>;

#[derive(Clone, Default)]
pub(crate) struct RuncFactory {
netns: String,
}

impl RuncFactory {
pub fn new(netns: &str) -> Self {
Self {
netns: netns.to_string()
}
}
}
pub(crate) struct RuncFactory;

#[async_trait]
impl ContainerFactory<RuncContainer> for RuncFactory {
Expand Down
90 changes: 43 additions & 47 deletions runc/src/sandbox.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashMap;
use std::ffi::CString;
use std::io::{Read, Write};
use std::os::fd::RawFd;
use std::sync::Arc;

use anyhow::anyhow;
Expand Down Expand Up @@ -28,14 +30,14 @@ use tokio::fs::create_dir_all;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::mpsc::channel;

use crate::{read_count, write_all};
use crate::runc::{RuncContainer, RuncFactory};
use std::io::{Write, Read};
use std::os::fd::RawFd;
use crate::{write_all, read_count};

pub struct RuncSandboxer {
#[allow(clippy::type_complexity)]
pub(crate) sandboxes: Arc<RwLock<HashMap<String, Arc<Mutex<RuncSandbox>>>>>,
task_address: String,
server: Server,
sandbox_parent: Arc<Mutex<SandboxParent>>,
}

Expand Down Expand Up @@ -79,7 +81,7 @@ impl SandboxParent {
(&mut req[64..]).write_all(netns.as_bytes())?;
}
write_all(self.req, &req)?;
let mut resp = [0u8;4];
let mut resp = [0u8; 4];
let mut r = read_count(self.resp, 4)?;
resp[..].copy_from_slice(r.as_slice());
let pid = i32::from_le_bytes(resp);
Expand All @@ -95,11 +97,23 @@ impl Drop for SandboxParent {
}

impl RuncSandboxer {
pub fn new(sandbox_parent: SandboxParent) -> Self {
Self {
pub async fn new(sandbox_parent: SandboxParent, task_address: &str) -> Result<Self> {
let task = start_task_service().await?;
let task_service = create_task(Arc::new(Box::new(task)));
let mut server = Server::new().register_service(task_service);
server = server
.bind(&task_address)
.map_err(|e| anyhow!("failed to bind socket {}, {}", task_address, e))?;
server
.start()
.await
.map_err(|e| anyhow!("failed to start task server, {}", e))?;
Ok(Self {
task_address: task_address.to_string(),
server,
sandboxes: Default::default(),
sandbox_parent: Arc::new(Mutex::new(sandbox_parent)),
}
})
}
}

Expand Down Expand Up @@ -131,6 +145,7 @@ impl Sandboxer for RuncSandboxer {
let mut sandbox_parent = self.sandbox_parent.lock().await;
let sandbox_pid = sandbox_parent.fork_sandbox_process(id, &sandbox.data.netns)?;
sandbox.status = SandboxStatus::Running(sandbox_pid as u32);
sandbox.data.task_address = self.task_address.clone();
Ok(())
}

Expand Down Expand Up @@ -181,46 +196,6 @@ impl RuncSandbox {
self.exit_signal.signal();
Ok(())
}

async fn start(&mut self) -> Result<()> {
let task = self.start_task_service().await?;
let task_address = format!("unix://{}/task.sock", self.base_dir);
self.data.task_address = task_address.clone();
let task_service = create_task(Arc::new(Box::new(task)));
let mut server = Server::new().register_service(task_service);
server = server
.bind(&task_address)
.map_err(|e| anyhow!("failed to bind socket {}, {}", task_address, e))?;
server
.start()
.await
.map_err(|e| anyhow!("failed to start task server, {}", e))?;
self.server = Some(server);
Ok(())
}

async fn start_task_service(
&self,
) -> Result<TaskService<RuncFactory, RuncContainer>> {
let (tx, mut rx) = channel(128);
let factory = RuncFactory::new(&self.data.netns);
let task = TaskService {
factory,
containers: Arc::new(Default::default()),
namespace: "k8s.io".to_string(),
exit: Arc::new(Default::default()),
tx: tx.clone(),
};

process_exits(&task).await;

tokio::spawn(async move {
while let Some((_topic, e)) = rx.recv().await {
debug!("received event {:?}", e);
}
});
Ok(task)
}
}

#[async_trait]
Expand Down Expand Up @@ -312,4 +287,25 @@ pub async fn process_exits<F>(task: &TaskService<F, RuncContainer>) {
}
}
});
}

async fn start_task_service() -> Result<TaskService<RuncFactory, RuncContainer>> {
let (tx, mut rx) = channel(128);
let factory = RuncFactory::default();
let task = TaskService {
factory,
containers: Arc::new(Default::default()),
namespace: "k8s.io".to_string(),
exit: Arc::new(Default::default()),
tx: tx.clone(),
};

process_exits(&task).await;

tokio::spawn(async move {
while let Some((_topic, e)) = rx.recv().await {
debug!("received event {:?}", e);
}
});
Ok(task)
}

0 comments on commit 79640ed

Please sign in to comment.