Skip to content

Commit

Permalink
runc: add restart recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
abel-von committed Sep 7, 2023
1 parent 79640ed commit 36de2c8
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 33 deletions.
20 changes: 10 additions & 10 deletions runc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions runc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ futures = { version = "0.3.21" }
log = { version = "0.4.17", features = ["std"] }
oci-spec = "0.5.4"
time = "0.3.5"
serde_json = "1.0.74"
serde = { version = "1.0.133", features = ["derive"] }
serde_json = "1.0.96"
serde = { version = "1.0.163", features = ["derive"] }
procfs = "0.15.1"
prctl = "1.0.0"
os_pipe = "1.1.4"
Expand Down
111 changes: 110 additions & 1 deletion runc/src/runc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
process::ExitStatus,
sync::Arc,
};
use std::collections::HashMap;

use async_trait::async_trait;
use containerd_shim::{
Expand All @@ -46,11 +47,13 @@ use containerd_shim::{
protobuf::{CodedInputStream, Message},
}, Result, util::{asyncify, mkdir, mount_rootfs, read_file_to_str, write_options, write_runtime},
};
use containerd_shim::asynchronous::util::{read_options, write_str_to_file};
use log::{debug, error};
use nix::{sys::signal::kill, unistd::Pid};
use oci_spec::runtime::{LinuxResources, Process};
use runc::{Command, Runc, Spawner};
use serde::Deserialize;
use runc::options::DeleteOpts;
use serde::{Deserialize, Serialize};
use tokio::{
fs::{File, OpenOptions},
io::{AsyncRead, AsyncReadExt, AsyncWrite},
Expand All @@ -70,6 +73,36 @@ pub type RuncContainer = ContainerTemplate<InitProcess, ExecProcess, RuncExecFac
#[derive(Clone, Default)]
pub(crate) struct RuncFactory;

#[derive(Serialize, Deserialize)]
pub struct SerializableStdio {
pub stdin: String,
pub stdout: String,
pub stderr: String,
pub terminal: bool,
}

impl From<SerializableStdio> for Stdio {
fn from(value: SerializableStdio) -> Self {
Self {
stdin: value.stdin,
stdout: value.stdout,
stderr: value.stderr,
terminal: value.terminal,
}
}
}

impl From<Stdio> for SerializableStdio {
fn from(value: Stdio) -> Self {
Self {
stdin: value.stdin,
stdout: value.stdout,
stderr: value.stderr,
terminal: value.terminal,
}
}
}

#[async_trait]
impl ContainerFactory<RuncContainer> for RuncFactory {
async fn create(
Expand Down Expand Up @@ -113,6 +146,7 @@ impl ContainerFactory<RuncContainer> for RuncFactory {

let id = req.id();
let stdio = Stdio::new(req.stdin(), req.stdout(), req.stderr(), req.terminal());
write_stdio(bundle, &stdio).await?;

let mut init = InitProcess::new(
id,
Expand Down Expand Up @@ -652,6 +686,67 @@ impl Spawner for ShimExecutor {
}
}

pub async fn recover() -> Result<HashMap<String, RuncContainer>> {
let mut opts = Options::new();
let mut runc_containers = HashMap::new();
for ns in ["k8s.io", "moby", "default"] {
let runc = create_runc("", ns, Path::new("/tmp"), &opts, Some(Arc::new(ShimExecutor::default())))?;
let containers = runc.list().await
.map_err(other_error!(e, "failed to call runc list"))?;
for c in containers {
if !Path::new(&c.bundle).exists() {
runc.delete(&c.id, Some(&DeleteOpts { force: true })).await.unwrap_or_default();
continue;
}
let runc_container = recover_one(&c, &runc).await;
runc_containers.insert(c.id.to_string(), runc_container);
}
}
Ok(runc_containers)
}

pub async fn recover_one(c: &runc::container::Container, runtime: &Runc) -> RuncContainer {
let stdio = if let Ok(s) = read_stdio(&*c.bundle).await {
s
} else {
Stdio {
stdin: "".to_string(),
stdout: "".to_string(),
stderr: "".to_string(),
terminal: false,
}
};
let opts = if let Ok(r) = read_options(&c.bundle).await {
r
} else {
Options::new()
};
let mut init = InitProcess::new(
&c.id,
stdio,
RuncInitLifecycle::new(runtime.clone(), opts.clone(), &c.bundle),
);
init.state = match &*c.status {
"created" => Status::CREATED,
"running" => Status::RUNNING,
"stopped" => Status::STOPPED,
_ => Status::CREATED,
};
init.pid = c.pid as i32;
RuncContainer {
id: "".to_string(),
bundle: "".to_string(),
init,
process_factory: RuncExecFactory {
runtime: runtime.clone(),
bundle: c.bundle.to_string(),
io_uid: opts.io_uid,
io_gid: opts.io_gid,
},
processes: Default::default(),
}
}

async fn read_std<T>(std: Option<T>) -> String
where
T: AsyncRead + Unpin,
Expand Down Expand Up @@ -711,3 +806,17 @@ async fn runtime_error(e: runc::error::Error, bundle: &str) -> Error {
}
}

async fn write_stdio(bundle: &str, stdio: &Stdio) -> Result<()> {
let serializable_stdio: SerializableStdio = stdio.clone().into();
let stdio_str = serde_json::to_string(&serializable_stdio)
.map_err(other_error!(e, "failed to serialize stdio"))?;
write_str_to_file(Path::new(bundle).join("stdio"), stdio_str).await
}

async fn read_stdio(bundle: &str) -> Result<Stdio> {
let stdio_str = read_file_to_str(Path::new(bundle).join("stdio")).await?;
let serializable_stdio: SerializableStdio = serde_json::from_str(&stdio_str)
.map_err(other_error!(e, "failed to deserialize stdio"))?;
Ok(serializable_stdio.into())
}

72 changes: 52 additions & 20 deletions runc/src/sandbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::ffi::CString;
use std::io::{Read, Write};
use std::os::fd::RawFd;
use std::path::Path;
use std::sync::Arc;

use anyhow::anyhow;
Expand All @@ -10,6 +11,7 @@ use containerd_sandbox::{Container, ContainerOption, Sandbox, Sandboxer, Sandbox
use containerd_sandbox::data::{ContainerData, SandboxData};
use containerd_sandbox::error::{Error, Result};
use containerd_sandbox::signal::ExitSignal;
use containerd_shim::api::Options;
use containerd_shim::asynchronous::monitor::{monitor_subscribe, monitor_unsubscribe};
use containerd_shim::asynchronous::task::TaskService;
use containerd_shim::monitor::{Subject, Topic};
Expand All @@ -26,12 +28,16 @@ use nix::sys::stat::Mode;
use nix::unistd::{close, fork, ForkResult, pause, Pid};
use os_pipe::{PipeReader, PipeWriter};
use prctl::PrctlMM;
use tokio::fs::create_dir_all;
use runc::options::DeleteOpts;
use serde::{Deserialize, Serialize};
use tokio::fs::{create_dir_all, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{Mutex, RwLock};
use tokio::sync::mpsc::channel;

use crate::{read_count, write_all};
use crate::runc::{RuncContainer, RuncFactory};
use crate::common::{create_runc, ShimExecutor};
use crate::runc::{recover, RuncContainer, RuncFactory};

pub struct RuncSandboxer {
#[allow(clippy::type_complexity)]
Expand All @@ -41,16 +47,18 @@ pub struct RuncSandboxer {
sandbox_parent: Arc<Mutex<SandboxParent>>,
}

#[derive(Serialize, Deserialize)]
pub struct RuncSandbox {
pub(crate) id: String,
pub(crate) base_dir: String,
pub(crate) data: SandboxData,
pub(crate) status: SandboxStatus,
#[serde(skip, default)]
pub(crate) exit_signal: Arc<ExitSignal>,
pub(crate) containers: HashMap<String, RuncContainerData>,
pub(crate) server: Option<Server>,
}

#[derive(Serialize, Deserialize)]
pub struct RuncContainerData {
data: ContainerData,
}
Expand Down Expand Up @@ -129,11 +137,11 @@ impl Sandboxer for RuncSandboxer {
status: SandboxStatus::Created,
exit_signal: Arc::new(Default::default()),
containers: Default::default(),
server: None,
};
create_dir_all(&sandbox.base_dir)
.await
.map_err(|e| anyhow!("failed to create {}, {}", sandbox.base_dir, e))?;
sandbox.dump().await?;
let mut sandboxes = self.sandboxes.write().await;
sandboxes.insert(id.to_string(), Arc::new(Mutex::new(sandbox)));
Ok(())
Expand All @@ -146,6 +154,7 @@ impl Sandboxer for RuncSandboxer {
let sandbox_pid = sandbox_parent.fork_sandbox_process(id, &sandbox.data.netns)?;
sandbox.status = SandboxStatus::Running(sandbox_pid as u32);
sandbox.data.task_address = self.task_address.clone();
sandbox.dump().await?;
Ok(())
}

Expand All @@ -166,27 +175,13 @@ impl Sandboxer for RuncSandboxer {
}

async fn delete(&self, id: &str) -> Result<()> {
if let Some(sandbox) = self.sandboxes.write().await.remove(id) {
let mut sandbox = sandbox.lock().await;
if let Some(mut server) = sandbox.server.take() {
server
.shutdown()
.await
.map_err(|e| anyhow!("failed to shutdown task server, {}", e))?;
}
}
self.sandboxes.write().await.remove(id);
Ok(())
}
}

impl RuncSandbox {
async fn stop(&mut self) -> Result<()> {
if let Some(mut server) = self.server.take() {
server
.shutdown()
.await
.map_err(|e| anyhow!("failed to shutdown task server, {}", e))?;
}
if let SandboxStatus::Running(pid) = self.status {
kill(Pid::from_raw(pid as i32), Signal::SIGKILL)
.map_err(|e| anyhow!("failed to kill sandbox process {}", e))?;
Expand All @@ -196,6 +191,40 @@ impl RuncSandbox {
self.exit_signal.signal();
Ok(())
}

async fn recover<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
let dump_path = base_dir.as_ref().join("sandbox.json");
let mut dump_file = OpenOptions::new()
.read(true)
.open(&dump_path)
.await
.map_err(Error::IO)?;
let mut content = vec![];
dump_file
.read_to_end(&mut content)
.await
.map_err(Error::IO)?;
let mut sb = serde_json::from_slice::<RuncSandbox>(content.as_slice())
.map_err(|e| anyhow!("failed to deserialize sandbox, {}", e))?;
Ok(sb)
}

async fn dump(&self) -> Result<()> {
let dump_data =
serde_json::to_vec(&self).map_err(|e| anyhow!("failed to serialize sandbox, {}", e))?;
let dump_path = format!("{}/sandbox.json", self.base_dir);
let mut dump_file = OpenOptions::new()
.write(true)
.create(true)
.open(&dump_path)
.await
.map_err(Error::IO)?;
dump_file
.write_all(dump_data.as_slice())
.await
.map_err(Error::IO)?;
Ok(())
}
}

#[async_trait]
Expand All @@ -220,6 +249,7 @@ impl Sandbox for RuncSandbox {
self.containers.insert(id.to_string(), RuncContainerData {
data: option.container
});
self.dump().await?;
Ok(())
}

Expand All @@ -229,6 +259,7 @@ impl Sandbox for RuncSandbox {

async fn remove_container(&mut self, id: &str) -> Result<()> {
self.containers.remove(id);
self.dump().await?;
Ok(())
}

Expand Down Expand Up @@ -292,9 +323,10 @@ pub async fn process_exits<F>(task: &TaskService<F, RuncContainer>) {
async fn start_task_service() -> Result<TaskService<RuncFactory, RuncContainer>> {
let (tx, mut rx) = channel(128);
let factory = RuncFactory::default();
let containers = recover().await.map_err(|e| anyhow!("failed to recover containers {}", e))?;
let task = TaskService {
factory,
containers: Arc::new(Default::default()),
containers: Arc::new(Mutex::new(containers)),
namespace: "k8s.io".to_string(),
exit: Arc::new(Default::default()),
tx: tx.clone(),
Expand Down

0 comments on commit 36de2c8

Please sign in to comment.