Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
Merge #361
Browse files Browse the repository at this point in the history
361: Server heartbeats r=MarinPostma a=penberg

This adds support for server heartbeats.

The patch adds three new command line options:

  `--heartbeat-url URL` is the URL to send a HTTP POST request to
  periodically to indicate server heatbeat.

  `--heartbeat-auth AUTH` is the HTTP "Authorization" header to send
  as part of the HTTP POST request.

  `--heartbeat-period SECS` is the heartbeat period in seconds.

The payload of the HTTP POST request is pretty simple for now:

```json
{"rows_read":1234,"rows_written":8765}
```

Fixes #343

Co-authored-by: Pekka Enberg <[email protected]>
  • Loading branch information
bors[bot] and penberg authored Apr 26, 2023
2 parents 6d7ffb3 + d96ea35 commit 2c39268
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sqld/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ tempfile = "3.3.0"
memmap = "0.7.0"
mimalloc = "0.1.36"
sha256 = "1.1.3"
reqwest = { version = "0.11.16", features = ["json"] }

[dev-dependencies]
proptest = "1.0.0"
Expand Down
31 changes: 31 additions & 0 deletions sqld/src/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::time::Duration;
use tokio::time::sleep;

use crate::http::stats::StatsResponse;
use crate::stats::Stats;

pub async fn server_heartbeat(
url: String,
auth: Option<String>,
update_period: Duration,
stats: Stats,
) {
let client = reqwest::Client::new();
loop {
sleep(update_period).await;
let body = StatsResponse {
rows_read_count: stats.rows_read(),
rows_written_count: stats.rows_written(),
};
let request = client.post(&url);
let request = if let Some(ref auth) = auth {
request.header("Authorization", auth.clone())
} else {
request
};
let request = request.json(&body);
if let Err(err) = request.send().await {
tracing::warn!("Error sending heartbeat: {}", err);
}
}
}
2 changes: 1 addition & 1 deletion sqld/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod hrana_over_http;
mod stats;
pub mod stats;
mod types;

use std::net::SocketAddr;
Expand Down
6 changes: 3 additions & 3 deletions sqld/src/http/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use serde::Serialize;
use crate::stats::Stats;

#[derive(Serialize)]
struct StatsResponse {
rows_read_count: usize,
rows_written_count: usize,
pub struct StatsResponse {
pub rows_read_count: usize,
pub rows_written_count: usize,
}

pub fn handle_stats(stats: &Stats) -> Response<Body> {
Expand Down
32 changes: 31 additions & 1 deletion sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub use sqld_libsql_bindings as libsql;
mod auth;
pub mod database;
mod error;
mod heartbeat;
mod hrana;
mod http;
mod postgres;
Expand Down Expand Up @@ -90,6 +91,9 @@ pub struct Config {
pub idle_shutdown_timeout: Option<Duration>,
pub load_from_dump: Option<PathBuf>,
pub max_log_size: u64,
pub heartbeat_url: Option<String>,
pub heartbeat_auth: Option<String>,
pub heartbeat_period: Duration,
}

async fn run_service(
Expand Down Expand Up @@ -133,7 +137,7 @@ async fn run_service(
hrana_upgrade_tx,
config.enable_http_console,
idle_shutdown_layer,
stats,
stats.clone(),
));
}

Expand All @@ -145,6 +149,32 @@ async fn run_service(
});
}

match &config.heartbeat_url {
Some(heartbeat_url) => {
let heartbeat_period = config.heartbeat_period;
tracing::info!(
"Server sending heartbeat to URL {} every {:?}",
heartbeat_url,
heartbeat_period,
);
let heartbeat_url = heartbeat_url.clone();
let heartbeat_auth = config.heartbeat_auth.clone();
join_set.spawn(async move {
heartbeat::server_heartbeat(
heartbeat_url,
heartbeat_auth,
heartbeat_period,
stats.clone(),
)
.await;
Ok(())
});
}
None => {
tracing::warn!("No server heartbeat configured")
}
}

Ok(())
}

Expand Down
19 changes: 19 additions & 0 deletions sqld/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ struct Cli {

#[clap(subcommand)]
utils: Option<UtilsSubcommands>,

/// The URL to send a server heartbeat `POST` request to.
/// By default, the server doesn't send a heartbeat.
#[clap(long, env = "SQLD_HEARTBEAT_URL")]
heartbeat_url: Option<String>,

/// The HTTP "Authornization" header to include in the a server heartbeat
/// `POST` request.
/// By default, the server doesn't send a heartbeat.
#[clap(long, env = "SQLD_HEARTBEAT_AUTH")]
heartbeat_auth: Option<String>,

/// The heartbeat time period in seconds.
/// By default, the the period is 30 seconds.
#[clap(long, env = "SQLD_HEARTBEAT_PERIOD_S", default_value = "30")]
heartbeat_period_s: u64,
}

#[derive(clap::Subcommand, Debug)]
Expand Down Expand Up @@ -231,6 +247,9 @@ fn config_from_args(args: Cli) -> Result<Config> {
idle_shutdown_timeout: args.idle_shutdown_timeout_s.map(Duration::from_secs),
load_from_dump: args.load_from_dump,
max_log_size: args.max_log_size,
heartbeat_url: args.heartbeat_url,
heartbeat_auth: args.heartbeat_auth,
heartbeat_period: Duration::from_secs(args.heartbeat_period_s),
})
}

Expand Down

0 comments on commit 2c39268

Please sign in to comment.