Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Api server thread clean shutdown #3348

Merged
merged 3 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/api_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ serde_json = "1.0.78"
thiserror = "1.0.32"

logger = { path = "../logger" }
micro_http = { git = "https://github.com/firecracker-microvm/micro-http", rev = "4b18a04" }
micro_http = { git = "https://github.com/firecracker-microvm/micro-http" }
mmds = { path = "../mmds" }
seccompiler = { path = "../seccompiler" }
utils = { path = "../utils" }
Expand Down
54 changes: 39 additions & 15 deletions src/api_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ pub struct ApiServer {
/// FD on which we notify the VMM that we have sent at least one
/// `VmmRequest`.
to_vmm_fd: EventFd,
/// If this flag is set, the API thread will go down.
shutdown_flag: bool,
}

impl ApiServer {
Expand All @@ -55,7 +53,6 @@ impl ApiServer {
api_request_sender,
vmm_response_receiver,
to_vmm_fd,
shutdown_flag: false,
}
}

Expand Down Expand Up @@ -97,6 +94,11 @@ impl ApiServer {
loop {
let request_vec = match server.requests() {
Ok(vec) => vec,
Err(ServerError::ShutdownEvent) => {
server.flush_outgoing_writes();
debug!("shutdown request received, API server thread ending.");
return;
}
Err(err) => {
// print request error, but keep server running
error!("API Server error on retrieving incoming request: {}", err);
Expand All @@ -116,14 +118,6 @@ impl ApiServer {
let delta_us = utils::time::get_time_us(utils::time::ClockType::Monotonic)
- request_processing_start_us;
debug!("Total previous API call duration: {} us.", delta_us);

if self.shutdown_flag {
server.flush_outgoing_writes();
debug!(
"/shutdown-internal request received, API server thread now ending itself"
);
return;
}
}
}
}
Expand All @@ -140,10 +134,6 @@ impl ApiServer {
RequestAction::Sync(vmm_action) => {
self.serve_vmm_action_request(vmm_action, request_processing_start_us)
}
RequestAction::ShutdownInternal => {
self.shutdown_flag = true;
Response::new(Version::Http11, StatusCode::NoContent)
}
};
if let Some(message) = parsing_info.take_deprecation_message() {
warn!("{}", message);
Expand Down Expand Up @@ -478,4 +468,38 @@ mod tests {
unanswered requests will be dropped.\" }";
assert_eq!(&buf[..], &error_message[..]);
}

#[test]
fn test_kill_switch() {
let mut tmp_socket = TempFile::new().unwrap();
tmp_socket.remove().unwrap();
let path_to_socket = tmp_socket.as_path().to_str().unwrap().to_owned();

let to_vmm_fd = EventFd::new(libc::EFD_NONBLOCK).unwrap();
let (api_request_sender, _from_api) = channel();
let (_to_api, vmm_response_receiver) = channel();
let seccomp_filters = get_empty_filters();

let api_kill_switch = EventFd::new(libc::EFD_NONBLOCK).unwrap();
let kill_switch = api_kill_switch.try_clone().unwrap();

let mut server = HttpServer::new(PathBuf::from(path_to_socket)).unwrap();
server.add_kill_switch(kill_switch).unwrap();

let api_thread = thread::Builder::new()
.name("fc_api_test".to_owned())
.spawn(move || {
ApiServer::new(api_request_sender, vmm_response_receiver, to_vmm_fd).run(
server,
ProcessTimeReporter::new(Some(1), Some(1), Some(1)),
seccomp_filters.get("api").unwrap(),
vmm::HTTP_MAX_PAYLOAD_SIZE,
)
})
.unwrap();
// Signal the API thread it should shut down.
api_kill_switch.write(1).unwrap();
// Verify API thread was brought down.
api_thread.join().unwrap();
}
}
22 changes: 0 additions & 22 deletions src/api_server/src/parsed_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use crate::ApiServer;
#[derive(Debug)]
pub(crate) enum RequestAction {
Sync(Box<VmmAction>),
ShutdownInternal, // !!! not an API, used by shutdown to thread::join the API thread
}

#[derive(Debug, Default, PartialEq)]
Expand Down Expand Up @@ -97,9 +96,6 @@ impl TryFrom<&Request> for ParsedRequest {
(Method::Put, "network-interfaces", Some(body)) => {
parse_put_net(body, path_tokens.next())
}
(Method::Put, "shutdown-internal", None) => {
Ok(ParsedRequest::new(RequestAction::ShutdownInternal))
}
(Method::Put, "snapshot", Some(body)) => parse_put_snapshot(body, path_tokens.next()),
(Method::Put, "vsock", Some(body)) => parse_put_vsock(body),
(Method::Put, "entropy", Some(body)) => parse_put_entropy(body),
Expand Down Expand Up @@ -350,15 +346,13 @@ pub mod tests {
(RequestAction::Sync(ref sync_req), RequestAction::Sync(ref other_sync_req)) => {
sync_req == other_sync_req
}
_ => false,
}
}
}

pub(crate) fn vmm_action_from_request(req: ParsedRequest) -> VmmAction {
match req.action {
RequestAction::Sync(vmm_action) => *vmm_action,
_ => panic!("Invalid request"),
}
}

Expand All @@ -371,7 +365,6 @@ pub mod tests {
assert_eq!(req_msg, msg);
*vmm_action
}
_ => panic!("Invalid request"),
}
}

Expand Down Expand Up @@ -883,21 +876,6 @@ pub mod tests {
assert!(ParsedRequest::try_from(&req).is_ok());
}

#[test]
fn test_try_from_put_shutdown() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
let mut connection = HttpConnection::new(receiver);
sender
.write_all(http_request("PUT", "/shutdown-internal", None).as_bytes())
.unwrap();
assert!(connection.try_read().is_ok());
let req = connection.pop_parsed_request().unwrap();
match ParsedRequest::try_from(&req).unwrap().into_parts() {
(RequestAction::ShutdownInternal, _) => (),
_ => panic!("wrong parsed request"),
};
}

#[test]
fn test_try_from_patch_vm() {
let (mut sender, receiver) = UnixStream::pair().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/dumbo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ bitflags = "1.3.2"
derive_more = { version = "0.99.17", default-features = false, features = ["from"] }

logger = { path = "../logger" }
micro_http = { git = "https://github.com/firecracker-microvm/micro-http", rev = "4b18a04" }
micro_http = { git = "https://github.com/firecracker-microvm/micro-http" }
utils = { path = "../utils" }

[dev-dependencies]
Expand Down
28 changes: 12 additions & 16 deletions src/firecracker/src/api_server_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use std::io::Write;
use std::os::unix::io::AsRawFd;
use std::os::unix::net::UnixStream;
use std::path::PathBuf;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -147,6 +145,8 @@
// It is used in the config/pre-boot loop which is a simple blocking loop
// which only consumes API events.
let api_event_fd = EventFd::new(libc::EFD_SEMAPHORE).expect("Cannot create API Eventfd.");
// FD used to signal API thread to stop/shutdown.
let api_kill_switch = EventFd::new(libc::EFD_NONBLOCK).expect("Cannot create API kill switch.");

Check warning on line 149 in src/firecracker/src/api_server_adapter.rs

View check run for this annotation

Codecov / codecov/patch

src/firecracker/src/api_server_adapter.rs#L148-L149

Added lines #L148 - L149 were not covered by tests

// Channels for both directions between Vmm and Api threads.
let (to_vmm, from_api) = channel();
Expand All @@ -159,7 +159,7 @@
.remove("api")
.expect("Missing seccomp filter for API thread.");

let server = match HttpServer::new(&bind_path) {
let mut server = match HttpServer::new(&bind_path) {

Check warning on line 162 in src/firecracker/src/api_server_adapter.rs

View check run for this annotation

Codecov / codecov/patch

src/firecracker/src/api_server_adapter.rs#L162

Added line #L162 was not covered by tests
Ok(s) => s,
Err(ServerError::IOError(inner)) if inner.kind() == std::io::ErrorKind::AddrInUse => {
let sock_path = bind_path.display().to_string();
Expand All @@ -170,6 +170,14 @@
}
};

let api_kill_switch_clone = api_kill_switch
.try_clone()
.expect("Failed to clone API kill switch");

server
.add_kill_switch(api_kill_switch_clone)
.expect("Cannot add HTTP server kill switch");

Check warning on line 180 in src/firecracker/src/api_server_adapter.rs

View check run for this annotation

Codecov / codecov/patch

src/firecracker/src/api_server_adapter.rs#L173-L180

Added lines #L173 - L180 were not covered by tests
// Start the separate API thread.
let api_thread = thread::Builder::new()
.name("fc_api".to_owned())
Expand Down Expand Up @@ -231,19 +239,7 @@
)
});

// We want to tell the API thread to shut down for a clean exit. But this is after
// the Vmm.stop() has been called, so it's a moment of internal finalization (as
// opposed to be something the client might call to shut the Vm down). Since it's
// an internal signal implementing it with an HTTP request is probably not the ideal
// way to do it...but having another way would involve multiplexing micro-http server
// with some other communication mechanism, or enhancing micro-http with exit
// conditions.

// We also need to make sure the socket path is ready.
let mut sock = UnixStream::connect(bind_path).unwrap();
sock.write_all(b"PUT /shutdown-internal HTTP/1.1\r\n\r\n")
.unwrap();

api_kill_switch.write(1).unwrap();

Check warning on line 242 in src/firecracker/src/api_server_adapter.rs

View check run for this annotation

Codecov / codecov/patch

src/firecracker/src/api_server_adapter.rs#L242

Added line #L242 was not covered by tests
// This call to thread::join() should block until the API thread has processed the
// shutdown-internal and returns from its function.
api_thread.join().expect("Api thread should join");
Expand Down
4 changes: 2 additions & 2 deletions src/mmds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ versionize_derive = "0.1.5"

dumbo = { path = "../dumbo" }
logger = { path = "../logger" }
micro_http = { git = "https://github.com/firecracker-microvm/micro-http", rev = "4b18a04" }
micro_http = { git = "https://github.com/firecracker-microvm/micro-http" }
snapshot = { path = "../snapshot" }
utils = { path = "../utils" }
utils = { path = "../utils" }
Loading