Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into problame/batching-met…
Browse files Browse the repository at this point in the history
…rics-improvements
  • Loading branch information
problame committed Nov 30, 2024
2 parents 42daa4f + aa4ec11 commit 69b878f
Show file tree
Hide file tree
Showing 36 changed files with 526 additions and 233 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use std::collections::HashMap;
use std::fs::File;
use std::path::Path;
use std::process::exit;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
use std::{thread, time::Duration};
Expand Down Expand Up @@ -322,8 +323,15 @@ fn wait_spec(
} else {
spec_set = false;
}
let connstr = Url::parse(connstr).context("cannot parse connstr as a URL")?;
let conn_conf = postgres::config::Config::from_str(connstr.as_str())
.context("cannot build postgres config from connstr")?;
let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr.as_str())
.context("cannot build tokio postgres config from connstr")?;
let compute_node = ComputeNode {
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
connstr,
conn_conf,
tokio_conn_conf,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
pgversion: get_pg_version_string(pgbin),
Expand Down
7 changes: 4 additions & 3 deletions compute_tools/src/bin/fast_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! - Build the image with the following command:
//!
//! ```bash
//! docker buildx build --build-arg DEBIAN_FLAVOR=bullseye-slim --build-arg GIT_VERSION=local --build-arg PG_VERSION=v14 --build-arg BUILD_TAG="$(date --iso-8601=s -u)" -t localhost:3030/localregistry/compute-node-v14:latest -f compute/Dockerfile.com
//! docker buildx build --platform linux/amd64 --build-arg DEBIAN_VERSION=bullseye --build-arg GIT_VERSION=local --build-arg PG_VERSION=v14 --build-arg BUILD_TAG="$(date --iso-8601=s -u)" -t localhost:3030/localregistry/compute-node-v14:latest -f compute/compute-node.Dockerfile .
//! docker push localhost:3030/localregistry/compute-node-v14:latest
//! ```
Expand Down Expand Up @@ -132,7 +132,8 @@ pub(crate) async fn main() -> anyhow::Result<()> {
//
// Initialize pgdata
//
let pg_version = match get_pg_version(pg_bin_dir.as_str()) {
let pgbin = pg_bin_dir.join("postgres");
let pg_version = match get_pg_version(pgbin.as_ref()) {
PostgresMajorVersion::V14 => 14,
PostgresMajorVersion::V15 => 15,
PostgresMajorVersion::V16 => 16,
Expand All @@ -155,7 +156,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
//
// Launch postgres process
//
let mut postgres_proc = tokio::process::Command::new(pg_bin_dir.join("postgres"))
let mut postgres_proc = tokio::process::Command::new(pgbin)
.arg("-D")
.arg(&pgdata_dir)
.args(["-c", "wal_level=minimal"])
Expand Down
7 changes: 2 additions & 5 deletions compute_tools/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use tokio::{
process::Command,
spawn,
};
use tokio_postgres::connect;
use tokio_stream::{self as stream, StreamExt};
use tokio_util::codec::{BytesCodec, FramedRead};
use tracing::warn;
Expand All @@ -16,10 +15,8 @@ use crate::pg_helpers::{get_existing_dbs_async, get_existing_roles_async, postgr
use compute_api::responses::CatalogObjects;

pub async fn get_dbs_and_roles(compute: &Arc<ComputeNode>) -> anyhow::Result<CatalogObjects> {
let connstr = compute.connstr.clone();

let (client, connection): (tokio_postgres::Client, _) =
connect(connstr.as_str(), NoTls).await?;
let conf = compute.get_tokio_conn_conf(Some("compute_ctl:get_dbs_and_roles"));
let (client, connection): (tokio_postgres::Client, _) = conf.connect(NoTls).await?;

spawn(async move {
if let Err(e) = connection.await {
Expand Down
3 changes: 2 additions & 1 deletion compute_tools/src/checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use crate::compute::ComputeNode;
#[instrument(skip_all)]
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
// Connect to the database.
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
let conf = compute.get_tokio_conn_conf(Some("compute_ctl:availability_checker"));
let (client, connection) = conf.connect(NoTls).await?;
if client.is_closed() {
return Err(anyhow!("connection to postgres closed"));
}
Expand Down
49 changes: 34 additions & 15 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use nix::unistd::Pid;
use postgres;
use postgres::error::SqlState;
use postgres::{Client, NoTls};
use postgres::NoTls;
use tracing::{debug, error, info, instrument, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
Expand Down Expand Up @@ -58,6 +59,10 @@ pub static PG_PID: AtomicU32 = AtomicU32::new(0);
pub struct ComputeNode {
// Url type maintains proper escaping
pub connstr: url::Url,
// We connect to Postgres from many different places, so build configs once
// and reuse them where needed.
pub conn_conf: postgres::config::Config,
pub tokio_conn_conf: tokio_postgres::config::Config,
pub pgdata: String,
pub pgbin: String,
pub pgversion: String,
Expand Down Expand Up @@ -800,10 +805,10 @@ impl ComputeNode {
/// version. In the future, it may upgrade all 3rd-party extensions.
#[instrument(skip_all)]
pub fn post_apply_config(&self) -> Result<()> {
let connstr = self.connstr.clone();
let conf = self.get_conn_conf(Some("compute_ctl:post_apply_config"));
thread::spawn(move || {
let func = || {
let mut client = Client::connect(connstr.as_str(), NoTls)?;
let mut client = conf.connect(NoTls)?;
handle_neon_extension_upgrade(&mut client)
.context("handle_neon_extension_upgrade")?;
Ok::<_, anyhow::Error>(())
Expand All @@ -815,12 +820,27 @@ impl ComputeNode {
Ok(())
}

pub fn get_conn_conf(&self, application_name: Option<&str>) -> postgres::Config {
let mut conf = self.conn_conf.clone();
if let Some(application_name) = application_name {
conf.application_name(application_name);
}
conf
}

pub fn get_tokio_conn_conf(&self, application_name: Option<&str>) -> tokio_postgres::Config {
let mut conf = self.tokio_conn_conf.clone();
if let Some(application_name) = application_name {
conf.application_name(application_name);
}
conf
}

async fn get_maintenance_client(
conf: &tokio_postgres::Config,
) -> Result<tokio_postgres::Client> {
let mut conf = conf.clone();

conf.application_name("apply_config");
conf.application_name("compute_ctl:apply_config");

let (client, conn) = match conf.connect(NoTls).await {
// If connection fails, it may be the old node with `zenith_admin` superuser.
Expand All @@ -837,6 +857,7 @@ impl ComputeNode {
e
);
let mut zenith_admin_conf = postgres::config::Config::from(conf.clone());
zenith_admin_conf.application_name("compute_ctl:apply_config");
zenith_admin_conf.user("zenith_admin");

let mut client =
Expand Down Expand Up @@ -1134,8 +1155,7 @@ impl ComputeNode {
/// Do initial configuration of the already started Postgres.
#[instrument(skip_all)]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
let mut conf = tokio_postgres::Config::from_str(self.connstr.as_str()).unwrap();
conf.application_name("apply_config");
let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));

let conf = Arc::new(conf);
let spec = Arc::new(
Expand All @@ -1161,7 +1181,7 @@ impl ComputeNode {
thread::spawn(move || {
let conf = conf.as_ref().clone();
let mut conf = postgres::config::Config::from(conf);
conf.application_name("migrations");
conf.application_name("compute_ctl:migrations");

let mut client = conf.connect(NoTls)?;
handle_migrations(&mut client).context("apply_config handle_migrations")
Expand Down Expand Up @@ -1369,9 +1389,9 @@ impl ComputeNode {
}
self.post_apply_config()?;

let connstr = self.connstr.clone();
let conf = self.get_conn_conf(None);
thread::spawn(move || {
let res = get_installed_extensions(&connstr);
let res = get_installed_extensions(conf);
match res {
Ok(extensions) => {
info!(
Expand Down Expand Up @@ -1510,7 +1530,8 @@ impl ComputeNode {
/// Select `pg_stat_statements` data and return it as a stringified JSON
pub async fn collect_insights(&self) -> String {
let mut result_rows: Vec<String> = Vec::new();
let connect_result = tokio_postgres::connect(self.connstr.as_str(), NoTls).await;
let conf = self.get_tokio_conn_conf(Some("compute_ctl:collect_insights"));
let connect_result = conf.connect(NoTls).await;
let (client, connection) = connect_result.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
Expand Down Expand Up @@ -1636,10 +1657,9 @@ LIMIT 100",
privileges: &[Privilege],
role_name: &PgIdent,
) -> Result<()> {
use tokio_postgres::config::Config;
use tokio_postgres::NoTls;

let mut conf = Config::from_str(self.connstr.as_str()).unwrap();
let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:set_role_grants"));
conf.dbname(db_name);

let (db_client, conn) = conf
Expand Down Expand Up @@ -1676,10 +1696,9 @@ LIMIT 100",
db_name: &PgIdent,
ext_version: ExtVersion,
) -> Result<ExtVersion> {
use tokio_postgres::config::Config;
use tokio_postgres::NoTls;

let mut conf = Config::from_str(self.connstr.as_str()).unwrap();
let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:install_extension"));
conf.dbname(db_name);

let (db_client, conn) = conf
Expand Down
11 changes: 5 additions & 6 deletions compute_tools/src/http/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,11 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
return Response::new(Body::from(msg));
}

let connstr = compute.connstr.clone();
let res = task::spawn_blocking(move || {
installed_extensions::get_installed_extensions(&connstr)
})
.await
.unwrap();
let conf = compute.get_conn_conf(None);
let res =
task::spawn_blocking(move || installed_extensions::get_installed_extensions(conf))
.await
.unwrap();

match res {
Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
Expand Down
14 changes: 7 additions & 7 deletions compute_tools/src/installed_extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use metrics::core::Collector;
use metrics::{register_uint_gauge_vec, UIntGaugeVec};
use once_cell::sync::Lazy;

use crate::pg_helpers::postgres_conf_for_db;

/// We don't reuse get_existing_dbs() just for code clarity
/// and to make database listing query here more explicit.
///
Expand Down Expand Up @@ -41,14 +39,16 @@ fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
///
/// Same extension can be installed in multiple databases with different versions,
/// we only keep the highest and lowest version across all databases.
pub fn get_installed_extensions(connstr: &url::Url) -> Result<InstalledExtensions> {
let mut client = Client::connect(connstr.as_str(), NoTls)?;
pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result<InstalledExtensions> {
conf.application_name("compute_ctl:get_installed_extensions");
let mut client = conf.connect(NoTls)?;

let databases: Vec<String> = list_dbs(&mut client)?;

let mut extensions_map: HashMap<String, InstalledExtension> = HashMap::new();
for db in databases.iter() {
let config = postgres_conf_for_db(connstr, db)?;
let mut db_client = config.connect(NoTls)?;
conf.dbname(db);
let mut db_client = conf.connect(NoTls)?;
let extensions: Vec<(String, String)> = db_client
.query(
"SELECT extname, extversion FROM pg_catalog.pg_extension;",
Expand Down Expand Up @@ -82,7 +82,7 @@ pub fn get_installed_extensions(connstr: &url::Url) -> Result<InstalledExtension
}

let res = InstalledExtensions {
extensions: extensions_map.values().cloned().collect(),
extensions: extensions_map.into_values().collect(),
};

Ok(res)
Expand Down
13 changes: 5 additions & 8 deletions compute_tools/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@ const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
// should be handled gracefully.
fn watch_compute_activity(compute: &ComputeNode) {
// Suppose that `connstr` doesn't change
let mut connstr = compute.connstr.clone();
connstr
.query_pairs_mut()
.append_pair("application_name", "compute_activity_monitor");
let connstr = connstr.as_str();
let connstr = compute.connstr.clone();
let conf = compute.get_conn_conf(Some("compute_ctl:activity_monitor"));

// During startup and configuration we connect to every Postgres database,
// but we don't want to count this as some user activity. So wait until
// the compute fully started before monitoring activity.
wait_for_postgres_start(compute);

// Define `client` outside of the loop to reuse existing connection if it's active.
let mut client = Client::connect(connstr, NoTls);
let mut client = conf.connect(NoTls);

let mut sleep = false;
let mut prev_active_time: Option<f64> = None;
Expand Down Expand Up @@ -57,7 +54,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
info!("connection to Postgres is closed, trying to reconnect");

// Connection is closed, reconnect and try again.
client = Client::connect(connstr, NoTls);
client = conf.connect(NoTls);
continue;
}

Expand Down Expand Up @@ -196,7 +193,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
debug!("could not connect to Postgres: {}, retrying", e);

// Establish a new connection and try again.
client = Client::connect(connstr, NoTls);
client = conf.connect(NoTls);
}
}
}
Expand Down
7 changes: 1 addition & 6 deletions libs/pageserver_api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,12 +442,7 @@ impl Default for ConfigToml {
tenant_config: TenantConfigToml::default(),
no_sync: None,
wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
page_service_pipelining: PageServicePipeliningConfig::Pipelined(
PageServicePipeliningConfigPipelined {
max_batch_size: NonZeroUsize::new(32).unwrap(),
execution: PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures,
},
),
page_service_pipelining: PageServicePipeliningConfig::Serial,
}
}
}
Expand Down
Loading

0 comments on commit 69b878f

Please sign in to comment.