Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compute_ctl: Streamline and Pipeline startup SQL #9717

Merged
merged 11 commits into from
Nov 20, 2024
374 changes: 301 additions & 73 deletions compute_tools/src/compute.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions compute_tools/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ pub mod monitor;
pub mod params;
pub mod pg_helpers;
pub mod spec;
mod spec_apply;
pub mod swap;
pub mod sync_sk;
64 changes: 64 additions & 0 deletions compute_tools/src/pg_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::thread::JoinHandle;
use std::time::{Duration, Instant};

use anyhow::{bail, Result};
use futures::StreamExt;
use ini::Ini;
use notify::{RecursiveMode, Watcher};
use postgres::{Client, Transaction};
Expand Down Expand Up @@ -196,6 +197,13 @@ impl Escaping for PgIdent {
}
}

impl Escaping for &'static str {
fn pg_quote(&self) -> String {
let result = format!("\"{}\"", self.replace('"', "\"\""));
result
}
}

MMeent marked this conversation as resolved.
Show resolved Hide resolved
/// Build a list of existing Postgres roles
pub fn get_existing_roles(xact: &mut Transaction<'_>) -> Result<Vec<Role>> {
let postgres_roles = xact
Expand All @@ -210,6 +218,27 @@ pub fn get_existing_roles(xact: &mut Transaction<'_>) -> Result<Vec<Role>> {

Ok(postgres_roles)
}
/// Build a list of existing Postgres roles
pub async fn get_existing_roles_async(
xact: &mut tokio_postgres::Transaction<'_>,
) -> Result<Vec<Role>> {
let postgres_roles = xact
.query_raw::<str, &String, &[String; 0]>(
"SELECT rolname, rolpassword FROM pg_catalog.pg_authid",
&[],
)
.await?
.filter_map(|row| async { row.ok() })
.map(|row| Role {
name: row.get("rolname"),
encrypted_password: row.get("rolpassword"),
options: None,
})
.collect()
.await;

Ok(postgres_roles)
}

/// Build a list of existing Postgres databases
pub fn get_existing_dbs(client: &mut Client) -> Result<HashMap<String, Database>> {
Expand Down Expand Up @@ -244,6 +273,41 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<HashMap<String, Database>

Ok(dbs_map)
}
/// Build a list of existing Postgres databases
pub async fn get_existing_dbs_async(
MMeent marked this conversation as resolved.
Show resolved Hide resolved
client: &mut tokio_postgres::Client,
) -> Result<HashMap<String, Database>> {
// `pg_database.datconnlimit = -2` means that the database is in the
// invalid state. See:
// https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
let rowstream = client
.query_raw::<str, &String, &[String; 0]>(
"SELECT
datname AS name,
datdba::regrole::text AS owner,
NOT datallowconn AS restrict_conn,
datconnlimit = - 2 AS invalid
FROM
pg_catalog.pg_database;",
&[],
)
.await?;

let dbs_map = rowstream
.filter_map(|r| async { r.ok() })
.map(|row| Database {
name: row.get("name"),
owner: row.get("owner"),
restrict_conn: row.get("restrict_conn"),
invalid: row.get("invalid"),
options: None,
})
.map(|db| (db.name.clone(), db.clone()))
.collect::<HashMap<_, _>>()
.await;

Ok(dbs_map)
}

/// Wait for Postgres to become ready to accept connections. It's ready to
/// accept connections when the state-field in `pgdata/postmaster.pid` says
Expand Down
9 changes: 4 additions & 5 deletions compute_tools/src/spec.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::collections::HashSet;
use std::fs::File;
use std::path::Path;
use std::str::FromStr;

use anyhow::{anyhow, bail, Context, Result};
use postgres::config::Config;
use postgres::{Client, NoTls};
use reqwest::StatusCode;
use std::collections::HashSet;
use std::fs::File;
use std::path::Path;
use std::str::FromStr;
use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};

use crate::config;
Expand Down
Loading
Loading