Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] systemd lifecycle integration + tests #9

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
/Cargo.lock
testbins/basic/target
11 changes: 9 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@ anyhow = "1.0.56"
futures = "0.3"
libc = "0.2.76"
log = "0.4.17"
memfile = "0.3.1"
nix = "0.25"
sd-notify = "0.3"
sd-notify = { version = "0.4.1", features = ["fdstore"] }
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1.24.1", features = ["full", "test-util"] }
tokio-stream = { version = "0.1", features = ["net", "io-util" ] }
tokio-stream = { version = "0.1", features = ["net", "io-util"] }
tokio-util = { version = "0.7.4", features = ["compat", "time", "codec"] }

[dev-dependencies]
clap = { version = "4.1.8", features = ["derive"] }
command-group = "5.0"
env_logger = "0.10.0"
rand = { version = "0.8", features = ["small_rng"] }
sendfd = "0.4.3"
test-binary = "3.0"

[patch.crates-io]
sd-notify = { git = "https://github.com/cbranch/sd-notify.git", rev = "b80e6eb18cb64a18015f59dcf182284ffe96fdf8" }
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,30 @@ The main struct of interest is `RestartConfig` which has methods for detecting o
restart. For shutting down a restarted process, the `ShutdownCoordinator` provides the means for
both signalling a shutdown event to spawned tasks, and awaiting their completion.

## Using shellflip with systemd

If you are running your process as a systemd service, you may use shellflip to fork a new instance
of the process. This allows you to upgrade the binary or apply new configuration, but you cannot
change the environment or any systemd-set configuration without stopping the service completely.

If this is insufficient for your needs, shellflip has an alternative runtime model that works with
systemd's process management model. When issuing a `systemctl restart` or using the restart
coordination socket, shellflip can pass data intended for the new process to systemd, then receive
that data in the newly started instance. This requires systemd's file descriptor store to be
enabled; an example `.service` file can be found in the `examples/` directory, or you may test it as
a one-off command using `systemd-run`:

`systemd-run -p FileDescriptorStoreMax=4096 target/debug/examples/restarter --systemd`

The limitations of systemd's process management remain; the old process must terminate before the
new process can start, so all tasks must end promptly and any child processes must terminate or
accept being ungracefully killed when the parent terminates.

If you need to prevent restarting the service if the service cannot successfully serialize its
state, use ExecReload and the restart coordination socket like the non-systemd-aware shellflip mode.
Make sure that systemd restarts your service on successful exit and/or force your service to
terminate with a non-zero error code on restart.

## License

BSD licensed. See the [LICENSE](LICENSE) file for details.
Expand Down
24 changes: 12 additions & 12 deletions examples/restarter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct AppData {

#[async_trait]
impl LifecycleHandler for AppData {
async fn send_to_new_process(&mut self, mut write_pipe: PipeWriter) -> std::io::Result<()> {
async fn send_to_new_process(&mut self, mut write_pipe: PipeWriter<'_>) -> std::io::Result<()> {
if self.restart_generation > 4 {
log::info!("Four restarts is more than anybody needs, surely?");
return Err(std::io::Error::new(
Expand All @@ -57,21 +57,11 @@ impl LifecycleHandler for AppData {
async fn main() -> Result<(), Error> {
env_logger::init();
let args = Args::parse();
let mut app_data = AppData {
restart_generation: 0,
};

if let Some(mut handover_pipe) = receive_from_old_process() {
app_data.restart_generation = handover_pipe.read_u32().await? + 1;
}

let restart_generation = app_data.restart_generation;

// Configure the essential requirements for implementing graceful restart.
let restart_conf = RestartConfig {
enabled: true,
coordination_socket_path: args.socket.into(),
lifecycle_handler: Box::new(app_data),
..Default::default()
};

Expand All @@ -94,8 +84,18 @@ async fn main() -> Result<(), Error> {
None => {}
}

let mut app_data = AppData {
restart_generation: 0,
};

if let Some(mut handover_pipe) = receive_from_old_process() {
app_data.restart_generation = handover_pipe.read_u32().await? + 1;
}

let restart_generation = app_data.restart_generation;

// Start the restart thread and get a task that will complete when a restart completes.
let restart_task = restart_conf.try_into_restart_task()?;
let restart_task = restart_conf.try_into_restart_task(app_data)?;
// (need to pin this because of the loop below!)
pin!(restart_task);
// Create a shutdown coordinator so that we can wait for all client connections to complete.
Expand Down
22 changes: 22 additions & 0 deletions src/fdstore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use memfile::MemFile;
use std::io;
use std::pin::Pin;
use tokio::fs::File;

const SHELLFLIP_FD_PREFIX: &str = "sf_";
const SYSTEMD_MEMFD_NAME: &str = "mem_handover";

pub(crate) fn create_handover_memfd() -> io::Result<Pin<Box<File>>> {
let memfd = MemFile::create_default("shellflip_restart")?;

sd_notify::notify_with_fds(
false,
&[
sd_notify::NotifyState::FdStore,
sd_notify::NotifyState::FdName(&format!("{SHELLFLIP_FD_PREFIX}{SYSTEMD_MEMFD_NAME}")),
],
&[memfd.as_fd()],
)?;

Ok(Box::pin(File::from_std(memfd.into_file())))
}
99 changes: 89 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
//! `LifecycleHandler::new_process_failed` is called and you can undo any changes you made in
//! preparation for handover. If the new process succeeds, however, the restart task will resolve
//! and you may terminate the process as usual.
pub mod fdstore;
pub mod lifecycle;
mod pipes;
pub mod restart_coordination_socket;
Expand All @@ -67,6 +68,7 @@ use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
use std::os::unix::net::UnixListener as StdUnixListener;
use std::os::unix::process::CommandExt;
use std::path::{Path, PathBuf};
use std::pin::pin;
use std::process;
use std::thread;
use thiserror::Error;
Expand All @@ -93,19 +95,33 @@ pub struct RestartConfig {
pub coordination_socket_path: PathBuf,
/// Sets environment variables on the newly-started process
pub environment: Vec<(OsString, OsString)>,
/// Receive fine-grained events on the lifecycle of the new process and support data transfer.
pub lifecycle_handler: Box<dyn LifecycleHandler>,
/// Exits early when child process fail to start
pub exit_on_error: bool,
}

impl RestartConfig {
/// Prepare the current process to handle restarts, if enabled.
pub fn try_into_restart_task(
pub fn try_into_restart_task<L: LifecycleHandler + 'static>(
self,
lifecycle_handler: L,
) -> io::Result<(impl Future<Output = RestartResult<process::Child>> + Send)> {
fixup_systemd_env();
spawn_restart_task(self)
spawn_restart_task(self, Box::new(lifecycle_handler))
}

/// Prepare the current process to handle restarts through systemd and its file descriptor store.
///
/// When SIGUSR1 is received, instead of forking like `try_into_restart_task`, the process shall store state in the
/// file descriptor store before terminating.
///
/// Proper use of this option requires a different restart strategy for your service; instead of letting tasks
/// run to completion, the shutdown signal indicates that you need to suspend task state.
pub fn try_into_systemd_restart_task<L: LifecycleHandler + 'static>(
self,
lifecycle_handler: L,
) -> io::Result<(impl Future<Output = RestartResult<()>> + Send)> {
fixup_systemd_env();
spawn_systemd_restart_task(self, Box::new(lifecycle_handler))
}

/// Request an already-running service to restart.
Expand Down Expand Up @@ -137,7 +153,6 @@ impl Default for RestartConfig {
enabled: false,
coordination_socket_path: Default::default(),
environment: vec![],
lifecycle_handler: Box::new(lifecycle::NullLifecycleHandler),
exit_on_error: true,
}
}
Expand Down Expand Up @@ -197,6 +212,11 @@ impl RestartResponder {
}
}
}

/// True if responding to a client, false if restart was triggered by signal.
fn client_initiated(&self) -> bool {
self.rpc.is_some()
}
}

/// Spawns a thread that can be used to restart the process.
Expand All @@ -205,6 +225,7 @@ impl RestartResponder {
/// The child spawner thread needs to be created before seccomp locks down fork/exec.
pub fn spawn_restart_task(
settings: RestartConfig,
lifecycle_handler: Box<dyn LifecycleHandler>,
) -> io::Result<impl Future<Output = RestartResult<process::Child>> + Send> {
let socket = match settings.enabled {
true => Some(settings.coordination_socket_path.as_ref()),
Expand All @@ -213,8 +234,7 @@ pub fn spawn_restart_task(

let mut signal_stream = signal(SignalKind::user_defined1())?;
let (restart_fd, mut socket_stream) = new_restart_coordination_socket_stream(socket)?;
let mut child_spawner =
ChildSpawner::new(restart_fd, settings.environment, settings.lifecycle_handler);
let mut child_spawner = ChildSpawner::new(restart_fd, settings.environment, lifecycle_handler);

Ok(async move {
startup_complete()?;
Expand Down Expand Up @@ -255,6 +275,66 @@ pub fn spawn_restart_task(
})
}

/// Spawns a thread that handles state serialization during systemd restart.
/// Returns a future that resolves when a restart succeeds.
pub fn spawn_systemd_restart_task(
settings: RestartConfig,
mut lifecycle_handler: Box<dyn LifecycleHandler>,
) -> io::Result<impl Future<Output = RestartResult<()>> + Send> {
let socket = match settings.enabled {
true => Some(settings.coordination_socket_path.as_ref()),
false => None,
};

let mut signal_stream = signal(SignalKind::user_defined1())?;
// No child process, so drop the spare inheritable socket.
let (_, mut socket_stream) = new_restart_coordination_socket_stream(socket)?;

// Ensure that we can store at least the serialized state into systemd. Without this, no graceful restart can happen!
let mut memfd = match fdstore::create_handover_memfd() {
Ok(memfd) => memfd,
Err(e) => {
log::error!(
"Failed to write memfd to systemd file descriptor store: {}",
e
);
return Err(e);
}
};

Ok(async move {
startup_complete()?;
loop {
let responder = next_restart_request(&mut signal_stream, &mut socket_stream).await?;
log::debug!("Received restart signal, serializing state");

if !responder.client_initiated() {
if let Err(e) = sd_notify::notify(false, &[sd_notify::NotifyState::Stopping]) {
log::error!("Failed to notify systemd: {}", e);
}
}

let res = lifecycle_handler.send_to_new_process(memfd.as_mut()).await;

let retry_on_err = responder.client_initiated();

responder
.respond(res.as_ref().map(|_| 0).map_err(|e| e.to_string()))
.await;

if res.is_err() && retry_on_err {
// If a call to `send_to_new_process` fails, this will clean up
// the memfd to avoid invalid data read on restart.
memfd.set_len(0).await.inspect_err(|e| {
log::error!("Failed to truncate handover memfd: {}", e);
})?;
} else {
return res.map_err(|e| e.into());
}
}
})
}

/// Handles forking a new client in a more privileged thread.
struct ChildSpawner {
signal_sender: Sender<()>,
Expand Down Expand Up @@ -461,9 +541,8 @@ async fn send_parent_state(
notif_w: CompletionSender,
handover_w: StdFile,
) -> io::Result<()> {
lifecycle_handler
.send_to_new_process(Box::pin(File::from(handover_w)))
.await?;
let handover_w = pin!(File::from(handover_w));
lifecycle_handler.send_to_new_process(handover_w).await?;

// only the child needs the write end
drop(notif_w);
Expand Down
4 changes: 2 additions & 2 deletions src/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncWrite};

pub type PipeReader = Pin<Box<dyn AsyncRead + Send>>;
pub type PipeWriter = Pin<Box<dyn AsyncWrite + Send>>;
pub type PipeWriter<'a> = Pin<&'a mut (dyn AsyncWrite + Send)>;

#[async_trait]
pub trait LifecycleHandler: Send {
/// Called after the child process has been spawned, allowing the current process to send state
/// to the child process. The child process can receive this data by calling
/// `receive_from_old_process`.
async fn send_to_new_process(&mut self, _write_pipe: PipeWriter) -> io::Result<()> {
async fn send_to_new_process(&mut self, _write_pipe: PipeWriter<'_>) -> io::Result<()> {
Ok(())
}

Expand Down
Loading
Loading