Skip to content

Commit

Permalink
Add /terminate API (#6745) (#6853)
Browse files Browse the repository at this point in the history
this is to speed up suspends, see
neondatabase/cloud#10284


Cherry-pick to release branch to build new compute images
  • Loading branch information
nikitakalyanov authored Feb 22, 2024
1 parent 0118066 commit 96a4e8d
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 13 deletions.
25 changes: 13 additions & 12 deletions compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,16 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use nix::sys::signal::{kill, Signal};
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
use tracing::{error, info};
use url::Url;

use compute_api::responses::ComputeStatus;

use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec, PG_PID, SYNC_SAFEKEEPERS_PID};
use compute_tools::compute::{
forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::get_pg_version;
use compute_tools::http::api::launch_http_server;
Expand Down Expand Up @@ -394,6 +395,15 @@ fn main() -> Result<()> {
info!("synced safekeepers at lsn {lsn}");
}

let mut state = compute.state.lock().unwrap();
if state.status == ComputeStatus::TerminationPending {
state.status = ComputeStatus::Terminated;
compute.state_changed.notify_all();
// we were asked to terminate gracefully, don't exit to avoid restart
delay_exit = true
}
drop(state);

if let Err(err) = compute.check_for_core_dumps() {
error!("error while checking for core dumps: {err:?}");
}
Expand Down Expand Up @@ -523,16 +533,7 @@ fn cli() -> clap::Command {
/// wait for termination which would be easy then.
fn handle_exit_signal(sig: i32) {
info!("received {sig} termination signal");
let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst);
if ss_pid != 0 {
let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32);
kill(ss_pid, Signal::SIGTERM).ok();
}
let pg_pid = PG_PID.load(Ordering::SeqCst);
if pg_pid != 0 {
let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
kill(pg_pid, Signal::SIGTERM).ok();
}
forward_termination_signal();
exit(1);
}

Expand Down
16 changes: 16 additions & 0 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec};
use utils::measured_stream::MeasuredReader;

use nix::sys::signal::{kill, Signal};

use remote_storage::{DownloadError, RemotePath};

use crate::checker::create_availability_check_data;
Expand Down Expand Up @@ -1322,3 +1324,17 @@ LIMIT 100",
Ok(remote_ext_metrics)
}
}

pub fn forward_termination_signal() {
let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst);
if ss_pid != 0 {
let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32);
kill(ss_pid, Signal::SIGTERM).ok();
}
let pg_pid = PG_PID.load(Ordering::SeqCst);
if pg_pid != 0 {
let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
// use 'immediate' shutdown (SIGQUIT): https://www.postgresql.org/docs/current/server-shutdown.html
kill(pg_pid, Signal::SIGQUIT).ok();
}
}
55 changes: 55 additions & 0 deletions compute_tools/src/http/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;

use crate::compute::forward_termination_signal;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
Expand Down Expand Up @@ -123,6 +124,17 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}

(&Method::POST, "/terminate") => {
info!("serving /terminate POST request");
match handle_terminate_request(compute).await {
Ok(()) => Response::new(Body::empty()),
Err((msg, code)) => {
error!("error handling /terminate request: {msg}");
render_json_error(&msg, code)
}
}
}

// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
Expand Down Expand Up @@ -297,6 +309,49 @@ fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
.unwrap()
}

async fn handle_terminate_request(compute: &Arc<ComputeNode>) -> Result<(), (String, StatusCode)> {
{
let mut state = compute.state.lock().unwrap();
if state.status == ComputeStatus::Terminated {
return Ok(());
}
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for termination request: {:?}",
state.status.clone()
);
return Err((msg, StatusCode::PRECONDITION_FAILED));
}
state.status = ComputeStatus::TerminationPending;
compute.state_changed.notify_all();
drop(state);
}
forward_termination_signal();
info!("sent signal and notified waiters");

// Spawn a blocking thread to wait for compute to become Terminated.
// This is needed to do not block the main pool of workers and
// be able to serve other requests while some particular request
// is waiting for compute to finish configuration.
let c = compute.clone();
task::spawn_blocking(move || {
let mut state = c.state.lock().unwrap();
while state.status != ComputeStatus::Terminated {
state = c.state_changed.wait(state).unwrap();
info!(
"waiting for compute to become Terminated, current status: {:?}",
state.status
);
}

Ok(())
})
.await
.unwrap()?;
info!("terminated Postgres");
Ok(())
}

// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
#[tokio::main]
async fn serve(port: u16, state: Arc<ComputeNode>) {
Expand Down
23 changes: 23 additions & 0 deletions compute_tools/src/http/openapi_spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,29 @@ paths:
schema:
$ref: "#/components/schemas/GenericError"

/terminate:
post:
tags:
- Terminate
summary: Terminate Postgres and wait for it to exit
description: ""
operationId: terminate
responses:
200:
description: Result
412:
description: "wrong state"
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
500:
description: "Unexpected error"
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"

components:
securitySchemes:
JWT:
Expand Down
4 changes: 3 additions & 1 deletion control_plane/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,9 @@ impl Endpoint {
}
ComputeStatus::Empty
| ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration => {
| ComputeStatus::Configuration
| ComputeStatus::TerminationPending
| ComputeStatus::Terminated => {
bail!("unexpected compute status: {:?}", state.status)
}
}
Expand Down
4 changes: 4 additions & 0 deletions libs/compute_api/src/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub enum ComputeStatus {
// compute will exit soon or is waiting for
// control-plane to terminate it.
Failed,
// Termination requested
TerminationPending,
// Terminated Postgres
Terminated,
}

fn rfc3339_serialize<S>(x: &Option<DateTime<Utc>>, s: S) -> Result<S::Ok, S::Error>
Expand Down

1 comment on commit 96a4e8d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2442 tests run: 2325 passed, 0 failed, 117 skipped (full report)


Flaky tests (3)

Postgres 15

  • test_sharding_split_smoke: release

Postgres 14

  • test_statvfs_pressure_usage: release
  • test_sharding_split_smoke: release

Code coverage (full report)

  • functions: 55.8% (12933 of 23173 functions)
  • lines: 82.5% (69963 of 84785 lines)

The comment gets automatically updated with the latest test results
96a4e8d at 2024-02-22T10:47:32.623Z :recycle:

Please sign in to comment.