diff --git a/Cargo.lock b/Cargo.lock index a25fa89c77e6..665aa4aeccc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -374,6 +374,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-sdk-kms" +version = "1.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "564a597a3c71a957d60a2e4c62c93d78ee5a0d636531e15b760acad983a5c18e" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.9", + "once_cell", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-s3" version = "1.52.0" @@ -590,9 +612,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.1" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1ce695746394772e7000b39fe073095db6d45a862d0767dd5ad0ac0d7f8eb87" +checksum = "a065c0fe6fdbdf9f11817eb68582b2ab4aff9e9c39e986ae48f7ec576c6322db" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1235,6 +1257,10 @@ name = "compute_tools" version = "0.1.0" dependencies = [ "anyhow", + "aws-config", + "aws-sdk-kms", + "aws-sdk-s3", + "base64 0.13.1", "bytes", "camino", "cfg-if", @@ -1252,13 +1278,16 @@ dependencies = [ "opentelemetry", "opentelemetry_sdk", "postgres", + "postgres_initdb", "prometheus", "regex", "remote_storage", "reqwest 0.12.4", "rlimit", "rust-ini", + "serde", "serde_json", + "serde_with", "signal-hook", "tar", "thiserror", @@ -3712,6 +3741,7 @@ dependencies = [ "num_cpus", "once_cell", "pageserver_api", + "pageserver_client", "pageserver_compaction", "pin-project-lite", "postgres", @@ -3720,6 +3750,7 @@ dependencies = [ "postgres_backend", "postgres_connection", "postgres_ffi", + "postgres_initdb", "pq_proto", "procfs", "rand 0.8.5", @@ -4195,6 +4226,17 @@ dependencies = [ "utils", ] +[[package]] +name = "postgres_initdb" +version = "0.1.0" +dependencies = [ + "anyhow", + "camino", + "thiserror", + "tokio", + "workspace_hack", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -7504,6 +7546,7 @@ dependencies = [ "anyhow", "axum", "axum-core", + "base64 0.13.1", "base64 0.21.1", "base64ct", "bytes", diff --git a/Cargo.toml b/Cargo.toml index aac19a4122d0..e3dc5b97f8b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ members = [ "libs/vm_monitor", "libs/walproposer", "libs/wal_decoder", + "libs/postgres_initdb", ] [workspace.package] @@ -57,6 +58,7 @@ async-trait = "0.1" aws-config = { version = "1.5", default-features = false, features=["rustls", "sso"] } aws-sdk-s3 = "1.52" aws-sdk-iam = "1.46.0" +aws-sdk-kms = "1.47.0" aws-smithy-async = { version = "1.2.1", default-features = false, features=["rt-tokio"] } aws-smithy-types = "1.2" aws-credential-types = "1.2.0" @@ -73,7 +75,7 @@ bytes = "1.0" camino = "1.1.6" cfg-if = "1.0.0" chrono = { version = "0.4", default-features = false, features = ["clock"] } -clap = { version = "4.0", features = ["derive"] } +clap = { version = "4.0", features = ["derive", "env"] } comfy-table = "7.1" const_format = "0.2" crc32c = "0.6" @@ -154,7 +156,7 @@ sentry = { version = "0.32", default-features = false, features = ["backtrace", serde = { version = "1.0", features = ["derive"] } serde_json = "1" serde_path_to_error = "0.1" -serde_with = "2.0" +serde_with = { version = "2.0", features = [ "base64" ] } serde_assert = "0.5.0" sha2 = "0.10.2" signal-hook = "0.3" @@ -213,12 +215,14 @@ tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", br compute_api = { version = "0.1", path = "./libs/compute_api/" } consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" } metrics = { version = "0.1", path = "./libs/metrics/" } +pageserver = { path = "./pageserver" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } pageserver_client = { path = "./pageserver/client" } pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" } postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" } postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" } postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" } +postgres_initdb = { path = "./libs/postgres_initdb" } pq_proto = { version = "0.1", path = "./libs/pq_proto/" } remote_storage = { version = "0.1", path = "./libs/remote_storage/" } safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" } diff --git a/compute/compute-node.Dockerfile b/compute/compute-node.Dockerfile index 32405ece8625..7c21c67a0af9 100644 --- a/compute/compute-node.Dockerfile +++ b/compute/compute-node.Dockerfile @@ -1243,7 +1243,7 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \ ######################################################################################### # -# Compile and run the Neon-specific `compute_ctl` binary +# Compile and run the Neon-specific `compute_ctl` and `fast_import` binaries # ######################################################################################### FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools @@ -1264,6 +1264,7 @@ RUN cd compute_tools && mold -run cargo build --locked --profile release-line-de FROM debian:$DEBIAN_FLAVOR AS compute-tools-image COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl +COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/fast_import /usr/local/bin/fast_import ######################################################################################### # @@ -1458,6 +1459,7 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \ COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl +COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/fast_import /usr/local/bin/fast_import # pgbouncer and its config COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer @@ -1533,6 +1535,25 @@ RUN apt update && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \ localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8 +# s5cmd 2.2.2 from https://github.com/peak/s5cmd/releases/tag/v2.2.2 +# used by fast_import +ARG TARGETARCH +ADD https://github.com/peak/s5cmd/releases/download/v2.2.2/s5cmd_2.2.2_linux_$TARGETARCH.deb /tmp/s5cmd.deb +RUN set -ex; \ + \ + # Determine the expected checksum based on TARGETARCH + if [ "${TARGETARCH}" = "amd64" ]; then \ + CHECKSUM="392c385320cd5ffa435759a95af77c215553d967e4b1c0fffe52e4f14c29cf85"; \ + elif [ "${TARGETARCH}" = "arm64" ]; then \ + CHECKSUM="939bee3cf4b5604ddb00e67f8c157b91d7c7a5b553d1fbb6890fad32894b7b46"; \ + else \ + echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \ + fi; \ + \ + # Compute and validate the checksum + echo "${CHECKSUM} /tmp/s5cmd.deb" | sha256sum -c - +RUN dpkg -i /tmp/s5cmd.deb && rm /tmp/s5cmd.deb + ENV LANG=en_US.utf8 USER postgres ENTRYPOINT ["/usr/local/bin/compute_ctl"] diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 0bf4ed53d669..c0c390caef81 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -10,6 +10,10 @@ default = [] testing = [] [dependencies] +base64.workspace = true +aws-config.workspace = true +aws-sdk-s3.workspace = true +aws-sdk-kms.workspace = true anyhow.workspace = true camino.workspace = true chrono.workspace = true @@ -27,6 +31,8 @@ opentelemetry.workspace = true opentelemetry_sdk.workspace = true postgres.workspace = true regex.workspace = true +serde.workspace = true +serde_with.workspace = true serde_json.workspace = true signal-hook.workspace = true tar.workspace = true @@ -43,6 +49,7 @@ thiserror.workspace = true url.workspace = true prometheus.workspace = true +postgres_initdb.workspace = true compute_api.workspace = true utils.workspace = true workspace_hack.workspace = true diff --git a/compute_tools/src/bin/fast_import.rs b/compute_tools/src/bin/fast_import.rs new file mode 100644 index 000000000000..3b0b990df2aa --- /dev/null +++ b/compute_tools/src/bin/fast_import.rs @@ -0,0 +1,338 @@ +//! This program dumps a remote Postgres database into a local Postgres database +//! and uploads the resulting PGDATA into object storage for import into a Timeline. +//! +//! # Context, Architecture, Design +//! +//! See cloud.git Fast Imports RFC () +//! for the full picture. +//! The RFC describing the storage pieces of importing the PGDATA dump into a Timeline +//! is publicly accessible at . +//! +//! # This is a Prototype! +//! +//! This program is part of a prototype feature and not yet used in production. +//! +//! The cloud.git RFC contains lots of suggestions for improving e2e throughput +//! of this step of the timeline import process. +//! +//! # Local Testing +//! +//! - Comment out most of the pgxns in The Dockerfile.compute-tools to speed up the build. +//! - Build the image with the following command: +//! +//! ```bash +//! docker buildx build --build-arg DEBIAN_FLAVOR=bullseye-slim --build-arg GIT_VERSION=local --build-arg PG_VERSION=v14 --build-arg BUILD_TAG="$(date --iso-8601=s -u)" -t localhost:3030/localregistry/compute-node-v14:latest -f compute/Dockerfile.com +//! docker push localhost:3030/localregistry/compute-node-v14:latest +//! ``` + +use anyhow::Context; +use aws_config::BehaviorVersion; +use camino::{Utf8Path, Utf8PathBuf}; +use clap::Parser; +use nix::unistd::Pid; +use tracing::{info, info_span, warn, Instrument}; +use utils::fs_ext::is_directory_empty; + +#[path = "fast_import/child_stdio_to_log.rs"] +mod child_stdio_to_log; +#[path = "fast_import/s3_uri.rs"] +mod s3_uri; +#[path = "fast_import/s5cmd.rs"] +mod s5cmd; + +#[derive(clap::Parser)] +struct Args { + #[clap(long)] + working_directory: Utf8PathBuf, + #[clap(long, env = "NEON_IMPORTER_S3_PREFIX")] + s3_prefix: s3_uri::S3Uri, + #[clap(long)] + pg_bin_dir: Utf8PathBuf, + #[clap(long)] + pg_lib_dir: Utf8PathBuf, +} + +#[serde_with::serde_as] +#[derive(serde::Deserialize)] +struct Spec { + encryption_secret: EncryptionSecret, + #[serde_as(as = "serde_with::base64::Base64")] + source_connstring_ciphertext_base64: Vec, +} + +#[derive(serde::Deserialize)] +enum EncryptionSecret { + #[allow(clippy::upper_case_acronyms)] + KMS { key_id: String }, +} + +#[tokio::main] +pub(crate) async fn main() -> anyhow::Result<()> { + utils::logging::init( + utils::logging::LogFormat::Plain, + utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter, + utils::logging::Output::Stdout, + )?; + + info!("starting"); + + let Args { + working_directory, + s3_prefix, + pg_bin_dir, + pg_lib_dir, + } = Args::parse(); + + let aws_config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; + + let spec: Spec = { + let spec_key = s3_prefix.append("/spec.json"); + let s3_client = aws_sdk_s3::Client::new(&aws_config); + let object = s3_client + .get_object() + .bucket(&spec_key.bucket) + .key(spec_key.key) + .send() + .await + .context("get spec from s3")? + .body + .collect() + .await + .context("download spec body")?; + serde_json::from_slice(&object.into_bytes()).context("parse spec as json")? + }; + + match tokio::fs::create_dir(&working_directory).await { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { + if !is_directory_empty(&working_directory) + .await + .context("check if working directory is empty")? + { + anyhow::bail!("working directory is not empty"); + } else { + // ok + } + } + Err(e) => return Err(anyhow::Error::new(e).context("create working directory")), + } + + let pgdata_dir = working_directory.join("pgdata"); + tokio::fs::create_dir(&pgdata_dir) + .await + .context("create pgdata directory")?; + + // + // Setup clients + // + let aws_config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; + let kms_client = aws_sdk_kms::Client::new(&aws_config); + + // + // Initialize pgdata + // + let superuser = "cloud_admin"; // XXX: this shouldn't be hard-coded + postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs { + superuser, + locale: "en_US.UTF-8", // XXX: this shouldn't be hard-coded, + pg_version: 140000, // XXX: this shouldn't be hard-coded but derived from which compute image we're running in + initdb_bin: pg_bin_dir.join("initdb").as_ref(), + library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local. + pgdata: &pgdata_dir, + }) + .await + .context("initdb")?; + + let nproc = num_cpus::get(); + + // + // Launch postgres process + // + let mut postgres_proc = tokio::process::Command::new(pg_bin_dir.join("postgres")) + .arg("-D") + .arg(&pgdata_dir) + .args(["-c", "wal_level=minimal"]) + .args(["-c", "shared_buffers=10GB"]) + .args(["-c", "max_wal_senders=0"]) + .args(["-c", "fsync=off"]) + .args(["-c", "full_page_writes=off"]) + .args(["-c", "synchronous_commit=off"]) + .args(["-c", "maintenance_work_mem=8388608"]) + .args(["-c", &format!("max_parallel_maintenance_workers={nproc}")]) + .args(["-c", &format!("max_parallel_workers={nproc}")]) + .args(["-c", &format!("max_parallel_workers_per_gather={nproc}")]) + .args(["-c", &format!("max_worker_processes={nproc}")]) + .args(["-c", "effective_io_concurrency=100"]) + .env_clear() + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .context("spawn postgres")?; + + info!("spawned postgres, waiting for it to become ready"); + tokio::spawn( + child_stdio_to_log::relay_process_output( + postgres_proc.stdout.take(), + postgres_proc.stderr.take(), + ) + .instrument(info_span!("postgres")), + ); + let restore_pg_connstring = + format!("host=localhost port=5432 user={superuser} dbname=postgres"); + loop { + let res = tokio_postgres::connect(&restore_pg_connstring, tokio_postgres::NoTls).await; + if res.is_ok() { + info!("postgres is ready, could connect to it"); + break; + } + } + + // + // Decrypt connection string + // + let source_connection_string = { + match spec.encryption_secret { + EncryptionSecret::KMS { key_id } => { + let mut output = kms_client + .decrypt() + .key_id(key_id) + .ciphertext_blob(aws_sdk_s3::primitives::Blob::new( + spec.source_connstring_ciphertext_base64, + )) + .send() + .await + .context("decrypt source connection string")?; + let plaintext = output + .plaintext + .take() + .context("get plaintext source connection string")?; + String::from_utf8(plaintext.into_inner()) + .context("parse source connection string as utf8")? + } + } + }; + + // + // Start the work + // + + let dumpdir = working_directory.join("dumpdir"); + + let common_args = [ + // schema mapping (prob suffices to specify them on one side) + "--no-owner".to_string(), + "--no-privileges".to_string(), + "--no-publications".to_string(), + "--no-security-labels".to_string(), + "--no-subscriptions".to_string(), + "--no-tablespaces".to_string(), + // format + "--format".to_string(), + "directory".to_string(), + // concurrency + "--jobs".to_string(), + num_cpus::get().to_string(), + // progress updates + "--verbose".to_string(), + ]; + + info!("dump into the working directory"); + { + let mut pg_dump = tokio::process::Command::new(pg_bin_dir.join("pg_dump")) + .args(&common_args) + .arg("-f") + .arg(&dumpdir) + .arg("--no-sync") + // POSITIONAL args + // source db (db name included in connection string) + .arg(&source_connection_string) + // how we run it + .env_clear() + .kill_on_drop(true) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .context("spawn pg_dump")?; + + info!(pid=%pg_dump.id().unwrap(), "spawned pg_dump"); + + tokio::spawn( + child_stdio_to_log::relay_process_output(pg_dump.stdout.take(), pg_dump.stderr.take()) + .instrument(info_span!("pg_dump")), + ); + + let st = pg_dump.wait().await.context("wait for pg_dump")?; + info!(status=?st, "pg_dump exited"); + if !st.success() { + warn!(status=%st, "pg_dump failed, restore will likely fail as well"); + } + } + + // TODO: do it in a streaming way, plenty of internal research done on this already + // TODO: do the unlogged table trick + + info!("restore from working directory into vanilla postgres"); + { + let mut pg_restore = tokio::process::Command::new(pg_bin_dir.join("pg_restore")) + .args(&common_args) + .arg("-d") + .arg(&restore_pg_connstring) + // POSITIONAL args + .arg(&dumpdir) + // how we run it + .env_clear() + .kill_on_drop(true) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .context("spawn pg_restore")?; + + info!(pid=%pg_restore.id().unwrap(), "spawned pg_restore"); + tokio::spawn( + child_stdio_to_log::relay_process_output( + pg_restore.stdout.take(), + pg_restore.stderr.take(), + ) + .instrument(info_span!("pg_restore")), + ); + let st = pg_restore.wait().await.context("wait for pg_restore")?; + info!(status=?st, "pg_restore exited"); + if !st.success() { + warn!(status=%st, "pg_restore failed, restore will likely fail as well"); + } + } + + info!("shutdown postgres"); + { + nix::sys::signal::kill( + Pid::from_raw( + i32::try_from(postgres_proc.id().unwrap()).expect("convert child pid to i32"), + ), + nix::sys::signal::SIGTERM, + ) + .context("signal postgres to shut down")?; + postgres_proc + .wait() + .await + .context("wait for postgres to shut down")?; + } + + info!("upload pgdata"); + s5cmd::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/")) + .await + .context("sync dump directory to destination")?; + + info!("write status"); + { + let status_dir = working_directory.join("status"); + std::fs::create_dir(&status_dir).context("create status directory")?; + let status_file = status_dir.join("status"); + std::fs::write(&status_file, serde_json::json!({"done": true}).to_string()) + .context("write status file")?; + s5cmd::sync(&status_file, &s3_prefix.append("/status/pgdata")) + .await + .context("sync status directory to destination")?; + } + + Ok(()) +} diff --git a/compute_tools/src/bin/fast_import/child_stdio_to_log.rs b/compute_tools/src/bin/fast_import/child_stdio_to_log.rs new file mode 100644 index 000000000000..6724ef9bedc8 --- /dev/null +++ b/compute_tools/src/bin/fast_import/child_stdio_to_log.rs @@ -0,0 +1,35 @@ +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::{ChildStderr, ChildStdout}; +use tracing::info; + +/// Asynchronously relays the output from a child process's `stdout` and `stderr` to the tracing log. +/// Each line is read and logged individually, with lossy UTF-8 conversion. +/// +/// # Arguments +/// +/// * `stdout`: An `Option` from the child process. +/// * `stderr`: An `Option` from the child process. +/// +pub(crate) async fn relay_process_output(stdout: Option, stderr: Option) { + let stdout_fut = async { + if let Some(stdout) = stdout { + let reader = BufReader::new(stdout); + let mut lines = reader.lines(); + while let Ok(Some(line)) = lines.next_line().await { + info!(fd = "stdout", "{}", line); + } + } + }; + + let stderr_fut = async { + if let Some(stderr) = stderr { + let reader = BufReader::new(stderr); + let mut lines = reader.lines(); + while let Ok(Some(line)) = lines.next_line().await { + info!(fd = "stderr", "{}", line); + } + } + }; + + tokio::join!(stdout_fut, stderr_fut); +} diff --git a/compute_tools/src/bin/fast_import/s3_uri.rs b/compute_tools/src/bin/fast_import/s3_uri.rs new file mode 100644 index 000000000000..52bbef420f4b --- /dev/null +++ b/compute_tools/src/bin/fast_import/s3_uri.rs @@ -0,0 +1,75 @@ +use anyhow::Result; +use std::str::FromStr; + +/// Struct to hold parsed S3 components +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct S3Uri { + pub bucket: String, + pub key: String, +} + +impl FromStr for S3Uri { + type Err = anyhow::Error; + + /// Parse an S3 URI into a bucket and key + fn from_str(uri: &str) -> Result { + // Ensure the URI starts with "s3://" + if !uri.starts_with("s3://") { + return Err(anyhow::anyhow!("Invalid S3 URI scheme")); + } + + // Remove the "s3://" prefix + let stripped_uri = &uri[5..]; + + // Split the remaining string into bucket and key parts + if let Some((bucket, key)) = stripped_uri.split_once('/') { + Ok(S3Uri { + bucket: bucket.to_string(), + key: key.to_string(), + }) + } else { + Err(anyhow::anyhow!( + "Invalid S3 URI format, missing bucket or key" + )) + } + } +} + +impl S3Uri { + pub fn append(&self, suffix: &str) -> Self { + Self { + bucket: self.bucket.clone(), + key: format!("{}{}", self.key, suffix), + } + } +} + +impl std::fmt::Display for S3Uri { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "s3://{}/{}", self.bucket, self.key) + } +} + +impl clap::builder::TypedValueParser for S3Uri { + type Value = Self; + + fn parse_ref( + &self, + _cmd: &clap::Command, + _arg: Option<&clap::Arg>, + value: &std::ffi::OsStr, + ) -> Result { + let value_str = value.to_str().ok_or_else(|| { + clap::Error::raw( + clap::error::ErrorKind::InvalidUtf8, + "Invalid UTF-8 sequence", + ) + })?; + S3Uri::from_str(value_str).map_err(|e| { + clap::Error::raw( + clap::error::ErrorKind::InvalidValue, + format!("Failed to parse S3 URI: {}", e), + ) + }) + } +} diff --git a/compute_tools/src/bin/fast_import/s5cmd.rs b/compute_tools/src/bin/fast_import/s5cmd.rs new file mode 100644 index 000000000000..d2d9a79736fe --- /dev/null +++ b/compute_tools/src/bin/fast_import/s5cmd.rs @@ -0,0 +1,27 @@ +use anyhow::Context; +use camino::Utf8Path; + +use super::s3_uri::S3Uri; + +pub(crate) async fn sync(local: &Utf8Path, remote: &S3Uri) -> anyhow::Result<()> { + let mut builder = tokio::process::Command::new("s5cmd"); + // s5cmd uses aws-sdk-go v1, hence doesn't support AWS_ENDPOINT_URL + if let Some(val) = std::env::var_os("AWS_ENDPOINT_URL") { + builder.arg("--endpoint-url").arg(val); + } + builder + .arg("sync") + .arg(local.as_str()) + .arg(remote.to_string()); + let st = builder + .spawn() + .context("spawn s5cmd")? + .wait() + .await + .context("wait for s5cmd")?; + if st.success() { + Ok(()) + } else { + Err(anyhow::anyhow!("s5cmd failed")) + } +} diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index c4063bbd1aaf..1ea443b026a8 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -1153,6 +1153,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re timeline_info.timeline_id ); } + // TODO: rename to import-basebackup-plus-wal TimelineCmd::Import(args) => { let tenant_id = get_tenant_id(args.tenant_id, env)?; let timeline_id = args.timeline_id; diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index 8710904cec6e..79da05da6ca1 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -33,6 +33,7 @@ remote_storage.workspace = true postgres_backend.workspace = true nix = {workspace = true, optional = true} reqwest.workspace = true +rand.workspace = true [dev-dependencies] bincode.workspace = true diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index ee20613d6db3..766672842718 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -97,6 +97,15 @@ pub struct ConfigToml { pub control_plane_api: Option, pub control_plane_api_token: Option, pub control_plane_emergency_mode: bool, + /// Unstable feature: subject to change or removal without notice. + /// See . + pub import_pgdata_upcall_api: Option, + /// Unstable feature: subject to change or removal without notice. + /// See . + pub import_pgdata_upcall_api_token: Option, + /// Unstable feature: subject to change or removal without notice. + /// See . + pub import_pgdata_aws_endpoint_url: Option, pub heatmap_upload_concurrency: usize, pub secondary_download_concurrency: usize, pub virtual_file_io_engine: Option, @@ -386,6 +395,10 @@ impl Default for ConfigToml { control_plane_api_token: (None), control_plane_emergency_mode: (false), + import_pgdata_upcall_api: (None), + import_pgdata_upcall_api_token: (None), + import_pgdata_aws_endpoint_url: (None), + heatmap_upload_concurrency: (DEFAULT_HEATMAP_UPLOAD_CONCURRENCY), secondary_download_concurrency: (DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY), diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index 401887d3629c..c55b9e948468 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -48,7 +48,7 @@ pub struct ShardedRange<'a> { // Calculate the size of a range within the blocks of the same relation, or spanning only the // top page in the previous relation's space. -fn contiguous_range_len(range: &Range) -> u32 { +pub fn contiguous_range_len(range: &Range) -> u32 { debug_assert!(is_contiguous_range(range)); if range.start.field6 == 0xffffffff { range.end.field6 + 1 @@ -67,7 +67,7 @@ fn contiguous_range_len(range: &Range) -> u32 { /// This matters, because: /// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse. /// - Within such ranges, we may calculate distances using simple subtraction of field6. -fn is_contiguous_range(range: &Range) -> bool { +pub fn is_contiguous_range(range: &Range) -> bool { range.start.field1 == range.end.field1 && range.start.field2 == range.end.field2 && range.start.field3 == range.end.field3 diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 0dfa1ba817ca..1b86bfd91ad5 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -2,6 +2,8 @@ pub mod detach_ancestor; pub mod partitioning; pub mod utilization; +#[cfg(feature = "testing")] +use camino::Utf8PathBuf; pub use utilization::PageserverUtilization; use std::{ @@ -227,6 +229,9 @@ pub enum TimelineCreateRequestMode { // we continue to accept it by having it here. pg_version: Option, }, + ImportPgdata { + import_pgdata: TimelineCreateRequestModeImportPgdata, + }, // NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap. // (serde picks the first matching enum variant, in declaration order). Bootstrap { @@ -236,6 +241,42 @@ pub enum TimelineCreateRequestMode { }, } +#[derive(Serialize, Deserialize, Clone)] +pub struct TimelineCreateRequestModeImportPgdata { + pub location: ImportPgdataLocation, + pub idempotency_key: ImportPgdataIdempotencyKey, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum ImportPgdataLocation { + #[cfg(feature = "testing")] + LocalFs { path: Utf8PathBuf }, + AwsS3 { + region: String, + bucket: String, + /// A better name for this would be `prefix`; changing requires coordination with cplane. + /// See . + key: String, + }, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(transparent)] +pub struct ImportPgdataIdempotencyKey(pub String); + +impl ImportPgdataIdempotencyKey { + pub fn random() -> Self { + use rand::{distributions::Alphanumeric, Rng}; + Self( + rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(20) + .map(char::from) + .collect(), + ) + } +} + #[derive(Serialize, Deserialize, Clone)] pub struct LsnLeaseRequest { pub lsn: Lsn, diff --git a/libs/postgres_initdb/Cargo.toml b/libs/postgres_initdb/Cargo.toml new file mode 100644 index 000000000000..1605279bce76 --- /dev/null +++ b/libs/postgres_initdb/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "postgres_initdb" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +anyhow.workspace = true +tokio.workspace = true +camino.workspace = true +thiserror.workspace = true +workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/libs/postgres_initdb/src/lib.rs b/libs/postgres_initdb/src/lib.rs new file mode 100644 index 000000000000..2f072354fb90 --- /dev/null +++ b/libs/postgres_initdb/src/lib.rs @@ -0,0 +1,103 @@ +//! The canonical way we run `initdb` in Neon. +//! +//! initdb has implicit defaults that are dependent on the environment, e.g., locales & collations. +//! +//! This module's job is to eliminate the environment-dependence as much as possible. + +use std::fmt; + +use camino::Utf8Path; + +pub struct RunInitdbArgs<'a> { + pub superuser: &'a str, + pub locale: &'a str, + pub initdb_bin: &'a Utf8Path, + pub pg_version: u32, + pub library_search_path: &'a Utf8Path, + pub pgdata: &'a Utf8Path, +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + Spawn(std::io::Error), + Failed { + status: std::process::ExitStatus, + stderr: Vec, + }, + WaitOutput(std::io::Error), + Other(anyhow::Error), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Spawn(e) => write!(f, "Error spawning command: {:?}", e), + Error::Failed { status, stderr } => write!( + f, + "Command failed with status {:?}: {}", + status, + String::from_utf8_lossy(stderr) + ), + Error::WaitOutput(e) => write!(f, "Error waiting for command output: {:?}", e), + Error::Other(e) => write!(f, "Error: {:?}", e), + } + } +} + +pub async fn do_run_initdb(args: RunInitdbArgs<'_>) -> Result<(), Error> { + let RunInitdbArgs { + superuser, + locale, + initdb_bin: initdb_bin_path, + pg_version, + library_search_path, + pgdata, + } = args; + let mut initdb_command = tokio::process::Command::new(initdb_bin_path); + initdb_command + .args(["--pgdata", pgdata.as_ref()]) + .args(["--username", superuser]) + .args(["--encoding", "utf8"]) + .args(["--locale", locale]) + .arg("--no-instructions") + .arg("--no-sync") + .env_clear() + .env("LD_LIBRARY_PATH", library_search_path) + .env("DYLD_LIBRARY_PATH", library_search_path) + .stdin(std::process::Stdio::null()) + // stdout invocation produces the same output every time, we don't need it + .stdout(std::process::Stdio::null()) + // we would be interested in the stderr output, if there was any + .stderr(std::process::Stdio::piped()); + + // Before version 14, only the libc provide was available. + if pg_version > 14 { + // Version 17 brought with it a builtin locale provider which only provides + // C and C.UTF-8. While being safer for collation purposes since it is + // guaranteed to be consistent throughout a major release, it is also more + // performant. + let locale_provider = if pg_version >= 17 { "builtin" } else { "libc" }; + + initdb_command.args(["--locale-provider", locale_provider]); + } + + let initdb_proc = initdb_command.spawn().map_err(Error::Spawn)?; + + // Ideally we'd select here with the cancellation token, but the problem is that + // we can't safely terminate initdb: it launches processes of its own, and killing + // initdb doesn't kill them. After we return from this function, we want the target + // directory to be able to be cleaned up. + // See https://github.com/neondatabase/neon/issues/6385 + let initdb_output = initdb_proc + .wait_with_output() + .await + .map_err(Error::WaitOutput)?; + if !initdb_output.status.success() { + return Err(Error::Failed { + status: initdb_output.status, + stderr: initdb_output.stderr, + }); + } + + Ok(()) +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 143d8236dff5..140b287ccc18 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -43,6 +43,7 @@ postgres.workspace = true postgres_backend.workspace = true postgres-protocol.workspace = true postgres-types.workspace = true +postgres_initdb.workspace = true rand.workspace = true range-set-blaze = { version = "0.1.16", features = ["alloc"] } regex.workspace = true @@ -68,6 +69,7 @@ url.workspace = true walkdir.workspace = true metrics.workspace = true pageserver_api.workspace = true +pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that pageserver_compaction.workspace = true postgres_connection.workspace = true postgres_ffi.workspace = true diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index f7be6ecaabd4..59ea6fb9416b 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -144,6 +144,10 @@ pub struct PageServerConf { /// JWT token for use with the control plane API. pub control_plane_api_token: Option, + pub import_pgdata_upcall_api: Option, + pub import_pgdata_upcall_api_token: Option, + pub import_pgdata_aws_endpoint_url: Option, + /// If true, pageserver will make best-effort to operate without a control plane: only /// for use in major incidents. pub control_plane_emergency_mode: bool, @@ -328,6 +332,9 @@ impl PageServerConf { control_plane_api, control_plane_api_token, control_plane_emergency_mode, + import_pgdata_upcall_api, + import_pgdata_upcall_api_token, + import_pgdata_aws_endpoint_url, heatmap_upload_concurrency, secondary_download_concurrency, ingest_batch_size, @@ -383,6 +390,9 @@ impl PageServerConf { timeline_offloading, ephemeral_bytes_per_memory_kb, server_side_batch_timeout, + import_pgdata_upcall_api, + import_pgdata_upcall_api_token: import_pgdata_upcall_api_token.map(SecretString::from), + import_pgdata_aws_endpoint_url, // ------------------------------------------------------------ // fields that require additional validation or custom handling diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 2bc7f5ad3965..7fb9247feb20 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -623,6 +623,8 @@ paths: existing_initdb_timeline_id: type: string format: hex + import_pgdata: + $ref: "#/components/schemas/TimelineCreateRequestImportPgdata" responses: "201": description: Timeline was created, or already existed with matching parameters @@ -979,6 +981,34 @@ components: $ref: "#/components/schemas/TenantConfig" effective_config: $ref: "#/components/schemas/TenantConfig" + TimelineCreateRequestImportPgdata: + type: object + required: + - location + - idempotency_key + properties: + idempotency_key: + type: string + location: + $ref: "#/components/schemas/TimelineCreateRequestImportPgdataLocation" + TimelineCreateRequestImportPgdataLocation: + type: object + properties: + AwsS3: + $ref: "#/components/schemas/TimelineCreateRequestImportPgdataLocationAwsS3" + TimelineCreateRequestImportPgdataLocationAwsS3: + type: object + properties: + region: + type: string + bucket: + type: string + key: + type: string + required: + - region + - bucket + - key TimelineInfo: type: object required: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 7168850ed691..ceb1c3b012f5 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -40,6 +40,7 @@ use pageserver_api::models::TenantSorting; use pageserver_api::models::TenantState; use pageserver_api::models::TimelineArchivalConfigRequest; use pageserver_api::models::TimelineCreateRequestMode; +use pageserver_api::models::TimelineCreateRequestModeImportPgdata; use pageserver_api::models::TimelinesInfoAndOffloaded; use pageserver_api::models::TopTenantShardItem; use pageserver_api::models::TopTenantShardsRequest; @@ -81,6 +82,7 @@ use crate::tenant::secondary::SecondaryController; use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; use crate::tenant::storage_layer::LayerName; +use crate::tenant::timeline::import_pgdata; use crate::tenant::timeline::offload::offload_timeline; use crate::tenant::timeline::offload::OffloadError; use crate::tenant::timeline::CompactFlags; @@ -580,6 +582,35 @@ async fn timeline_create_handler( ancestor_timeline_id, ancestor_start_lsn, }), + TimelineCreateRequestMode::ImportPgdata { + import_pgdata: + TimelineCreateRequestModeImportPgdata { + location, + idempotency_key, + }, + } => tenant::CreateTimelineParams::ImportPgdata(tenant::CreateTimelineParamsImportPgdata { + idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new( + idempotency_key.0, + ), + new_timeline_id, + location: { + use import_pgdata::index_part_format::Location; + use pageserver_api::models::ImportPgdataLocation; + match location { + #[cfg(feature = "testing")] + ImportPgdataLocation::LocalFs { path } => Location::LocalFs { path }, + ImportPgdataLocation::AwsS3 { + region, + bucket, + key, + } => Location::AwsS3 { + region, + bucket, + key, + }, + } + }, + }), }; let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error); diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 5995d1cc5726..f4f184be5a72 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -2276,9 +2276,9 @@ impl<'a> Version<'a> { //--- Metadata structs stored in key-value pairs in the repository. #[derive(Debug, Serialize, Deserialize)] -struct DbDirectory { +pub(crate) struct DbDirectory { // (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist) - dbdirs: HashMap<(Oid, Oid), bool>, + pub(crate) dbdirs: HashMap<(Oid, Oid), bool>, } // The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of @@ -2287,8 +2287,8 @@ struct DbDirectory { // "pg_twophsae/0000000A000002E4". #[derive(Debug, Serialize, Deserialize)] -struct TwoPhaseDirectory { - xids: HashSet, +pub(crate) struct TwoPhaseDirectory { + pub(crate) xids: HashSet, } #[derive(Debug, Serialize, Deserialize)] @@ -2297,12 +2297,12 @@ struct TwoPhaseDirectoryV17 { } #[derive(Debug, Serialize, Deserialize, Default)] -struct RelDirectory { +pub(crate) struct RelDirectory { // Set of relations that exist. (relfilenode, forknum) // // TODO: Store it as a btree or radix tree or something else that spans multiple // key-value pairs, if you have a lot of relations - rels: HashSet<(Oid, u8)>, + pub(crate) rels: HashSet<(Oid, u8)>, } #[derive(Debug, Serialize, Deserialize)] @@ -2311,9 +2311,9 @@ struct RelSizeEntry { } #[derive(Debug, Serialize, Deserialize, Default)] -struct SlruSegmentDirectory { +pub(crate) struct SlruSegmentDirectory { // Set of SLRU segments that exist. - segments: HashSet, + pub(crate) segments: HashSet, } #[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)] diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 6a4e90dd558a..622738022a3a 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -381,6 +381,8 @@ pub enum TaskKind { UnitTest, DetachAncestor, + + ImportPgdata, } #[derive(Default)] diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 2e5f69e3c942..0214ee68fa08 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -43,7 +43,9 @@ use std::sync::atomic::AtomicBool; use std::sync::Weak; use std::time::SystemTime; use storage_broker::BrokerClientChannel; +use timeline::import_pgdata; use timeline::offload::offload_timeline; +use timeline::ShutdownMode; use tokio::io::BufReader; use tokio::sync::watch; use tokio::task::JoinSet; @@ -373,7 +375,6 @@ pub struct Tenant { l0_flush_global_state: L0FlushGlobalState, } - impl std::fmt::Debug for Tenant { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{} ({})", self.tenant_shard_id, self.current_state()) @@ -860,6 +861,7 @@ impl Debug for SetStoppingError { pub(crate) enum CreateTimelineParams { Bootstrap(CreateTimelineParamsBootstrap), Branch(CreateTimelineParamsBranch), + ImportPgdata(CreateTimelineParamsImportPgdata), } #[derive(Debug)] @@ -877,7 +879,14 @@ pub(crate) struct CreateTimelineParamsBranch { pub(crate) ancestor_start_lsn: Option, } -/// What is used to determine idempotency of a [`Tenant::create_timeline`] call in [`Tenant::start_creating_timeline`]. +#[derive(Debug)] +pub(crate) struct CreateTimelineParamsImportPgdata { + pub(crate) new_timeline_id: TimelineId, + pub(crate) location: import_pgdata::index_part_format::Location, + pub(crate) idempotency_key: import_pgdata::index_part_format::IdempotencyKey, +} + +/// What is used to determine idempotency of a [`Tenant::create_timeline`] call in [`Tenant::start_creating_timeline`] in [`Tenant::start_creating_timeline`]. /// /// Each [`Timeline`] object holds [`Self`] as an immutable property in [`Timeline::create_idempotency`]. /// @@ -907,19 +916,50 @@ pub(crate) enum CreateTimelineIdempotency { ancestor_timeline_id: TimelineId, ancestor_start_lsn: Lsn, }, + ImportPgdata(CreatingTimelineIdempotencyImportPgdata), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct CreatingTimelineIdempotencyImportPgdata { + idempotency_key: import_pgdata::index_part_format::IdempotencyKey, } /// What is returned by [`Tenant::start_creating_timeline`]. #[must_use] -enum StartCreatingTimelineResult<'t> { - CreateGuard(TimelineCreateGuard<'t>), +enum StartCreatingTimelineResult { + CreateGuard(TimelineCreateGuard), Idempotent(Arc), } +enum TimelineInitAndSyncResult { + ReadyToActivate(Arc), + NeedsSpawnImportPgdata(TimelineInitAndSyncNeedsSpawnImportPgdata), +} + +impl TimelineInitAndSyncResult { + fn ready_to_activate(self) -> Option> { + match self { + Self::ReadyToActivate(timeline) => Some(timeline), + _ => None, + } + } +} + +#[must_use] +struct TimelineInitAndSyncNeedsSpawnImportPgdata { + timeline: Arc, + import_pgdata: import_pgdata::index_part_format::Root, + guard: TimelineCreateGuard, +} + /// What is returned by [`Tenant::create_timeline`]. enum CreateTimelineResult { Created(Arc), Idempotent(Arc), + /// IMPORTANT: This [`Arc`] object is not in [`Tenant::timelines`] when + /// we return this result, nor will this concrete object ever be added there. + /// Cf method comment on [`Tenant::create_timeline_import_pgdata`]. + ImportSpawned(Arc), } impl CreateTimelineResult { @@ -927,18 +967,19 @@ impl CreateTimelineResult { match self { Self::Created(_) => "Created", Self::Idempotent(_) => "Idempotent", + Self::ImportSpawned(_) => "ImportSpawned", } } fn timeline(&self) -> &Arc { match self { - Self::Created(t) | Self::Idempotent(t) => t, + Self::Created(t) | Self::Idempotent(t) | Self::ImportSpawned(t) => t, } } /// Unit test timelines aren't activated, test has to do it if it needs to. #[cfg(test)] fn into_timeline_for_test(self) -> Arc { match self { - Self::Created(t) | Self::Idempotent(t) => t, + Self::Created(t) | Self::Idempotent(t) | Self::ImportSpawned(t) => t, } } } @@ -962,33 +1003,13 @@ pub enum CreateTimelineError { } #[derive(thiserror::Error, Debug)] -enum InitdbError { - Other(anyhow::Error), +pub enum InitdbError { + #[error("Operation was cancelled")] Cancelled, - Spawn(std::io::Result<()>), - Failed(std::process::ExitStatus, Vec), -} - -impl fmt::Display for InitdbError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - InitdbError::Cancelled => write!(f, "Operation was cancelled"), - InitdbError::Spawn(e) => write!(f, "Spawn error: {:?}", e), - InitdbError::Failed(status, stderr) => write!( - f, - "Command failed with status {:?}: {}", - status, - String::from_utf8_lossy(stderr) - ), - InitdbError::Other(e) => write!(f, "Error: {:?}", e), - } - } -} - -impl From for InitdbError { - fn from(error: std::io::Error) -> Self { - InitdbError::Spawn(Err(error)) - } + #[error(transparent)] + Other(anyhow::Error), + #[error(transparent)] + Inner(postgres_initdb::Error), } enum CreateTimelineCause { @@ -996,6 +1017,15 @@ enum CreateTimelineCause { Delete, } +enum LoadTimelineCause { + Attach, + Unoffload, + ImportPgdata { + create_guard: TimelineCreateGuard, + activate: ActivateTimelineArgs, + }, +} + #[derive(thiserror::Error, Debug)] pub(crate) enum GcError { // The tenant is shutting down @@ -1072,24 +1102,35 @@ impl Tenant { /// it is marked as Active. #[allow(clippy::too_many_arguments)] async fn timeline_init_and_sync( - &self, + self: &Arc, timeline_id: TimelineId, resources: TimelineResources, - index_part: IndexPart, + mut index_part: IndexPart, metadata: TimelineMetadata, ancestor: Option>, - _ctx: &RequestContext, - ) -> anyhow::Result<()> { + cause: LoadTimelineCause, + ctx: &RequestContext, + ) -> anyhow::Result { let tenant_id = self.tenant_shard_id; - let idempotency = if metadata.ancestor_timeline().is_none() { - CreateTimelineIdempotency::Bootstrap { - pg_version: metadata.pg_version(), + let import_pgdata = index_part.import_pgdata.take(); + let idempotency = match &import_pgdata { + Some(import_pgdata) => { + CreateTimelineIdempotency::ImportPgdata(CreatingTimelineIdempotencyImportPgdata { + idempotency_key: import_pgdata.idempotency_key().clone(), + }) } - } else { - CreateTimelineIdempotency::Branch { - ancestor_timeline_id: metadata.ancestor_timeline().unwrap(), - ancestor_start_lsn: metadata.ancestor_lsn(), + None => { + if metadata.ancestor_timeline().is_none() { + CreateTimelineIdempotency::Bootstrap { + pg_version: metadata.pg_version(), + } + } else { + CreateTimelineIdempotency::Branch { + ancestor_timeline_id: metadata.ancestor_timeline().unwrap(), + ancestor_start_lsn: metadata.ancestor_lsn(), + } + } } }; @@ -1121,39 +1162,91 @@ impl Tenant { format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}") })?; - { - // avoiding holding it across awaits - let mut timelines_accessor = self.timelines.lock().unwrap(); - match timelines_accessor.entry(timeline_id) { - // We should never try and load the same timeline twice during startup - Entry::Occupied(_) => { - unreachable!( - "Timeline {tenant_id}/{timeline_id} already exists in the tenant map" - ); + match import_pgdata { + Some(import_pgdata) if !import_pgdata.is_done() => { + match cause { + LoadTimelineCause::Attach | LoadTimelineCause::Unoffload => (), + LoadTimelineCause::ImportPgdata { .. } => { + unreachable!("ImportPgdata should not be reloading timeline import is done and persisted as such in s3") + } } - Entry::Vacant(v) => { - v.insert(Arc::clone(&timeline)); - timeline.maybe_spawn_flush_loop(); + let mut guard = self.timelines_creating.lock().unwrap(); + if !guard.insert(timeline_id) { + // We should never try and load the same timeline twice during startup + unreachable!("Timeline {tenant_id}/{timeline_id} is already being created") } + let timeline_create_guard = TimelineCreateGuard { + _tenant_gate_guard: self.gate.enter()?, + owning_tenant: self.clone(), + timeline_id, + idempotency, + // The users of this specific return value don't need the timline_path in there. + timeline_path: timeline + .conf + .timeline_path(&timeline.tenant_shard_id, &timeline.timeline_id), + }; + Ok(TimelineInitAndSyncResult::NeedsSpawnImportPgdata( + TimelineInitAndSyncNeedsSpawnImportPgdata { + timeline, + import_pgdata, + guard: timeline_create_guard, + }, + )) } - }; + Some(_) | None => { + { + let mut timelines_accessor = self.timelines.lock().unwrap(); + match timelines_accessor.entry(timeline_id) { + // We should never try and load the same timeline twice during startup + Entry::Occupied(_) => { + unreachable!( + "Timeline {tenant_id}/{timeline_id} already exists in the tenant map" + ); + } + Entry::Vacant(v) => { + v.insert(Arc::clone(&timeline)); + timeline.maybe_spawn_flush_loop(); + } + } + } - // Sanity check: a timeline should have some content. - anyhow::ensure!( - ancestor.is_some() - || timeline - .layers - .read() - .await - .layer_map() - .expect("currently loading, layer manager cannot be shutdown already") - .iter_historic_layers() - .next() - .is_some(), - "Timeline has no ancestor and no layer files" - ); + // Sanity check: a timeline should have some content. + anyhow::ensure!( + ancestor.is_some() + || timeline + .layers + .read() + .await + .layer_map() + .expect("currently loading, layer manager cannot be shutdown already") + .iter_historic_layers() + .next() + .is_some(), + "Timeline has no ancestor and no layer files" + ); - Ok(()) + match cause { + LoadTimelineCause::Attach | LoadTimelineCause::Unoffload => (), + LoadTimelineCause::ImportPgdata { + create_guard, + activate, + } => { + // TODO: see the comment in the task code above how I'm not so certain + // it is safe to activate here because of concurrent shutdowns. + match activate { + ActivateTimelineArgs::Yes { broker_client } => { + info!("activating timeline after reload from pgdata import task"); + timeline.activate(self.clone(), broker_client, None, ctx); + } + ActivateTimelineArgs::No => (), + } + drop(create_guard); + } + } + + Ok(TimelineInitAndSyncResult::ReadyToActivate(timeline)) + } + } } /// Attach a tenant that's available in cloud storage. @@ -1578,24 +1671,46 @@ impl Tenant { } // TODO again handle early failure - self.load_remote_timeline( - timeline_id, - index_part, - remote_metadata, - TimelineResources { - remote_client, - timeline_get_throttle: self.timeline_get_throttle.clone(), - l0_flush_global_state: self.l0_flush_global_state.clone(), - }, - ctx, - ) - .await - .with_context(|| { - format!( - "failed to load remote timeline {} for tenant {}", - timeline_id, self.tenant_shard_id + let effect = self + .load_remote_timeline( + timeline_id, + index_part, + remote_metadata, + TimelineResources { + remote_client, + timeline_get_throttle: self.timeline_get_throttle.clone(), + l0_flush_global_state: self.l0_flush_global_state.clone(), + }, + LoadTimelineCause::Attach, + ctx, ) - })?; + .await + .with_context(|| { + format!( + "failed to load remote timeline {} for tenant {}", + timeline_id, self.tenant_shard_id + ) + })?; + + match effect { + TimelineInitAndSyncResult::ReadyToActivate(_) => { + // activation happens later, on Tenant::activate + } + TimelineInitAndSyncResult::NeedsSpawnImportPgdata( + TimelineInitAndSyncNeedsSpawnImportPgdata { + timeline, + import_pgdata, + guard, + }, + ) => { + tokio::task::spawn(self.clone().create_timeline_import_pgdata_task( + timeline, + import_pgdata, + ActivateTimelineArgs::No, + guard, + )); + } + } } // Walk through deleted timelines, resume deletion @@ -1719,13 +1834,14 @@ impl Tenant { #[instrument(skip_all, fields(timeline_id=%timeline_id))] async fn load_remote_timeline( - &self, + self: &Arc, timeline_id: TimelineId, index_part: IndexPart, remote_metadata: TimelineMetadata, resources: TimelineResources, + cause: LoadTimelineCause, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { span::debug_assert_current_span_has_tenant_id(); info!("downloading index file for timeline {}", timeline_id); @@ -1752,6 +1868,7 @@ impl Tenant { index_part, remote_metadata, ancestor, + cause, ctx, ) .await @@ -1938,6 +2055,7 @@ impl Tenant { TimelineArchivalError::Other(anyhow::anyhow!("Timeline already exists")) } TimelineExclusionError::Other(e) => TimelineArchivalError::Other(e), + TimelineExclusionError::ShuttingDown => TimelineArchivalError::Cancelled, })?; let timeline_preload = self @@ -1976,6 +2094,7 @@ impl Tenant { index_part, remote_metadata, timeline_resources, + LoadTimelineCause::Unoffload, &ctx, ) .await @@ -2213,7 +2332,7 @@ impl Tenant { /// /// Tests should use `Tenant::create_test_timeline` to set up the minimum required metadata keys. pub(crate) async fn create_empty_timeline( - &self, + self: &Arc, new_timeline_id: TimelineId, initdb_lsn: Lsn, pg_version: u32, @@ -2263,7 +2382,7 @@ impl Tenant { // Our current tests don't need the background loops. #[cfg(test)] pub async fn create_test_timeline( - &self, + self: &Arc, new_timeline_id: TimelineId, initdb_lsn: Lsn, pg_version: u32, @@ -2302,7 +2421,7 @@ impl Tenant { #[cfg(test)] #[allow(clippy::too_many_arguments)] pub async fn create_test_timeline_with_layers( - &self, + self: &Arc, new_timeline_id: TimelineId, initdb_lsn: Lsn, pg_version: u32, @@ -2439,6 +2558,16 @@ impl Tenant { self.branch_timeline(&ancestor_timeline, new_timeline_id, ancestor_start_lsn, ctx) .await? } + CreateTimelineParams::ImportPgdata(params) => { + self.create_timeline_import_pgdata( + params, + ActivateTimelineArgs::Yes { + broker_client: broker_client.clone(), + }, + ctx, + ) + .await? + } }; // At this point we have dropped our guard on [`Self::timelines_creating`], and @@ -2481,11 +2610,202 @@ impl Tenant { ); timeline } + CreateTimelineResult::ImportSpawned(timeline) => { + info!("import task spawned, timeline will become visible and activated once the import is done"); + timeline + } }; Ok(activated_timeline) } + /// The returned [`Arc`] is NOT in the [`Tenant::timelines`] map until the import + /// completes in the background. A DIFFERENT [`Arc`] will be inserted into the + /// [`Tenant::timelines`] map when the import completes. + /// We only return an [`Arc`] here so the API handler can create a [`pageserver_api::models::TimelineInfo`] + /// for the response. + async fn create_timeline_import_pgdata( + self: &Arc, + params: CreateTimelineParamsImportPgdata, + activate: ActivateTimelineArgs, + ctx: &RequestContext, + ) -> Result { + let CreateTimelineParamsImportPgdata { + new_timeline_id, + location, + idempotency_key, + } = params; + + let started_at = chrono::Utc::now().naive_utc(); + + // + // There's probably a simpler way to upload an index part, but, remote_timeline_client + // is the canonical way we do it. + // - create an empty timeline in-memory + // - use its remote_timeline_client to do the upload + // - dispose of the uninit timeline + // - keep the creation guard alive + + let timeline_create_guard = match self + .start_creating_timeline( + new_timeline_id, + CreateTimelineIdempotency::ImportPgdata(CreatingTimelineIdempotencyImportPgdata { + idempotency_key: idempotency_key.clone(), + }), + ) + .await? + { + StartCreatingTimelineResult::CreateGuard(guard) => guard, + StartCreatingTimelineResult::Idempotent(timeline) => { + return Ok(CreateTimelineResult::Idempotent(timeline)) + } + }; + + let mut uninit_timeline = { + let this = &self; + let initdb_lsn = Lsn(0); + let _ctx = ctx; + async move { + let new_metadata = TimelineMetadata::new( + // Initialize disk_consistent LSN to 0, The caller must import some data to + // make it valid, before calling finish_creation() + Lsn(0), + None, + None, + Lsn(0), + initdb_lsn, + initdb_lsn, + 15, + ); + this.prepare_new_timeline( + new_timeline_id, + &new_metadata, + timeline_create_guard, + initdb_lsn, + None, + ) + .await + } + } + .await?; + + let in_progress = import_pgdata::index_part_format::InProgress { + idempotency_key, + location, + started_at, + }; + let index_part = import_pgdata::index_part_format::Root::V1( + import_pgdata::index_part_format::V1::InProgress(in_progress), + ); + uninit_timeline + .raw_timeline() + .unwrap() + .remote_client + .schedule_index_upload_for_import_pgdata_state_update(Some(index_part.clone()))?; + + // wait_completion happens in caller + + let (timeline, timeline_create_guard) = uninit_timeline.finish_creation_myself(); + + tokio::spawn(self.clone().create_timeline_import_pgdata_task( + timeline.clone(), + index_part, + activate, + timeline_create_guard, + )); + + // NB: the timeline doesn't exist in self.timelines at this point + Ok(CreateTimelineResult::ImportSpawned(timeline)) + } + + #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))] + async fn create_timeline_import_pgdata_task( + self: Arc, + timeline: Arc, + index_part: import_pgdata::index_part_format::Root, + activate: ActivateTimelineArgs, + timeline_create_guard: TimelineCreateGuard, + ) { + debug_assert_current_span_has_tenant_and_timeline_id(); + info!("starting"); + scopeguard::defer! {info!("exiting")}; + + let res = self + .create_timeline_import_pgdata_task_impl( + timeline, + index_part, + activate, + timeline_create_guard, + ) + .await; + if let Err(err) = &res { + error!(?err, "task failed"); + // TODO sleep & retry, sensitive to tenant shutdown + // TODO: allow timeline deletion requests => should cancel the task + } + } + + async fn create_timeline_import_pgdata_task_impl( + self: Arc, + timeline: Arc, + index_part: import_pgdata::index_part_format::Root, + activate: ActivateTimelineArgs, + timeline_create_guard: TimelineCreateGuard, + ) -> Result<(), anyhow::Error> { + let ctx = RequestContext::new(TaskKind::ImportPgdata, DownloadBehavior::Warn); + + info!("importing pgdata"); + import_pgdata::doit(&timeline, index_part, &ctx, self.cancel.clone()) + .await + .context("import")?; + info!("import done"); + + // + // Reload timeline from remote. + // This proves that the remote state is attachable, and it reuses the code. + // + // TODO: think about whether this is safe to do with concurrent Tenant::shutdown. + // timeline_create_guard hols the tenant gate open, so, shutdown cannot _complete_ until we exit. + // But our activate() call might launch new background tasks after Tenant::shutdown + // already went past shutting down the Tenant::timelines, which this timeline here is no part of. + // I think the same problem exists with the bootstrap & branch mgmt API tasks (tenant shutting + // down while bootstrapping/branching + activating), but, the race condition is much more likely + // to manifest because of the long runtime of this import task. + + // in theory this shouldn't even .await anything except for coop yield + info!("shutting down timeline"); + timeline.shutdown(ShutdownMode::Hard).await; + info!("timeline shut down, reloading from remote"); + // TODO: we can't do the following check because create_timeline_import_pgdata must return an Arc + // let Some(timeline) = Arc::into_inner(timeline) else { + // anyhow::bail!("implementation error: timeline that we shut down was still referenced from somewhere"); + // }; + let timeline_id = timeline.timeline_id; + + // load from object storage like Tenant::attach does + let resources = self.build_timeline_resources(timeline_id); + let index_part = resources + .remote_client + .download_index_file(&self.cancel) + .await?; + let index_part = match index_part { + MaybeDeletedIndexPart::Deleted(_) => { + // likely concurrent delete call, cplane should prevent this + anyhow::bail!("index part says deleted but we are not done creating yet, this should not happen but") + } + MaybeDeletedIndexPart::IndexPart(p) => p, + }; + let metadata = index_part.metadata.clone(); + self + .load_remote_timeline(timeline_id, index_part, metadata, resources, LoadTimelineCause::ImportPgdata{ + create_guard: timeline_create_guard, activate, }, &ctx) + .await? + .ready_to_activate() + .context("implementation error: reloaded timeline still needs import after import reported success")?; + + anyhow::Ok(()) + } + pub(crate) async fn delete_timeline( self: Arc, timeline_id: TimelineId, @@ -3337,6 +3657,13 @@ where Ok(result) } +enum ActivateTimelineArgs { + Yes { + broker_client: storage_broker::BrokerClientChannel, + }, + No, +} + impl Tenant { pub fn tenant_specific_overrides(&self) -> TenantConfOpt { self.tenant_conf.load().tenant_conf.clone() @@ -3520,6 +3847,7 @@ impl Tenant { /// `validate_ancestor == false` is used when a timeline is created for deletion /// and we might not have the ancestor present anymore which is fine for to be /// deleted timelines. + #[allow(clippy::too_many_arguments)] fn create_timeline_struct( &self, new_timeline_id: TimelineId, @@ -4283,16 +4611,17 @@ impl Tenant { /// If the timeline was already created in the meantime, we check whether this /// request conflicts or is idempotent , based on `state`. async fn start_creating_timeline( - &self, + self: &Arc, new_timeline_id: TimelineId, idempotency: CreateTimelineIdempotency, - ) -> Result, CreateTimelineError> { + ) -> Result { let allow_offloaded = false; match self.create_timeline_create_guard(new_timeline_id, idempotency, allow_offloaded) { Ok(create_guard) => { pausable_failpoint!("timeline-creation-after-uninit"); Ok(StartCreatingTimelineResult::CreateGuard(create_guard)) } + Err(TimelineExclusionError::ShuttingDown) => Err(CreateTimelineError::ShuttingDown), Err(TimelineExclusionError::AlreadyCreating) => { // Creation is in progress, we cannot create it again, and we cannot // check if this request matches the existing one, so caller must try @@ -4582,7 +4911,7 @@ impl Tenant { &'a self, new_timeline_id: TimelineId, new_metadata: &TimelineMetadata, - create_guard: TimelineCreateGuard<'a>, + create_guard: TimelineCreateGuard, start_lsn: Lsn, ancestor: Option>, ) -> anyhow::Result> { @@ -4642,7 +4971,7 @@ impl Tenant { /// The `allow_offloaded` parameter controls whether to tolerate the existence of /// offloaded timelines or not. fn create_timeline_create_guard( - &self, + self: &Arc, timeline_id: TimelineId, idempotency: CreateTimelineIdempotency, allow_offloaded: bool, @@ -4902,48 +5231,16 @@ async fn run_initdb( let _permit = INIT_DB_SEMAPHORE.acquire().await; - let mut initdb_command = tokio::process::Command::new(&initdb_bin_path); - initdb_command - .args(["--pgdata", initdb_target_dir.as_ref()]) - .args(["--username", &conf.superuser]) - .args(["--encoding", "utf8"]) - .args(["--locale", &conf.locale]) - .arg("--no-instructions") - .arg("--no-sync") - .env_clear() - .env("LD_LIBRARY_PATH", &initdb_lib_dir) - .env("DYLD_LIBRARY_PATH", &initdb_lib_dir) - .stdin(std::process::Stdio::null()) - // stdout invocation produces the same output every time, we don't need it - .stdout(std::process::Stdio::null()) - // we would be interested in the stderr output, if there was any - .stderr(std::process::Stdio::piped()); - - // Before version 14, only the libc provide was available. - if pg_version > 14 { - // Version 17 brought with it a builtin locale provider which only provides - // C and C.UTF-8. While being safer for collation purposes since it is - // guaranteed to be consistent throughout a major release, it is also more - // performant. - let locale_provider = if pg_version >= 17 { "builtin" } else { "libc" }; - - initdb_command.args(["--locale-provider", locale_provider]); - } - - let initdb_proc = initdb_command.spawn()?; - - // Ideally we'd select here with the cancellation token, but the problem is that - // we can't safely terminate initdb: it launches processes of its own, and killing - // initdb doesn't kill them. After we return from this function, we want the target - // directory to be able to be cleaned up. - // See https://github.com/neondatabase/neon/issues/6385 - let initdb_output = initdb_proc.wait_with_output().await?; - if !initdb_output.status.success() { - return Err(InitdbError::Failed( - initdb_output.status, - initdb_output.stderr, - )); - } + let res = postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs { + superuser: &conf.superuser, + locale: &conf.locale, + initdb_bin: &initdb_bin_path, + pg_version, + library_search_path: &initdb_lib_dir, + pgdata: initdb_target_dir, + }) + .await + .map_err(InitdbError::Inner); // This isn't true cancellation support, see above. Still return an error to // excercise the cancellation code path. @@ -4951,7 +5248,7 @@ async fn run_initdb( return Err(InitdbError::Cancelled); } - Ok(()) + res } /// Dump contents of a layer file to stdout. diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 4c8828221416..007bd3eef083 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -244,6 +244,7 @@ use self::index::IndexPart; use super::config::AttachedLocationConfig; use super::metadata::MetadataUpdate; use super::storage_layer::{Layer, LayerName, ResidentLayer}; +use super::timeline::import_pgdata; use super::upload_queue::{NotInitialized, SetDeletedFlagProgress}; use super::{DeleteTimelineError, Generation}; @@ -813,6 +814,18 @@ impl RemoteTimelineClient { Ok(need_wait) } + /// Launch an index-file upload operation in the background, setting `import_pgdata` field. + pub(crate) fn schedule_index_upload_for_import_pgdata_state_update( + self: &Arc, + state: Option, + ) -> anyhow::Result<()> { + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut()?; + upload_queue.dirty.import_pgdata = state; + self.schedule_index_upload(upload_queue)?; + Ok(()) + } + /// /// Launch an index-file upload operation in the background, if necessary. /// diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index efcd20d1bf5c..d632e595ada0 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -706,7 +706,7 @@ where .and_then(|x| x) } -async fn download_retry_forever( +pub(crate) async fn download_retry_forever( op: O, description: &str, cancel: &CancellationToken, diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index d8a881a2c443..506990fb2fa4 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -12,6 +12,7 @@ use utils::id::TimelineId; use crate::tenant::metadata::TimelineMetadata; use crate::tenant::storage_layer::LayerName; +use crate::tenant::timeline::import_pgdata; use crate::tenant::Generation; use pageserver_api::shard::ShardIndex; @@ -37,6 +38,13 @@ pub struct IndexPart { #[serde(skip_serializing_if = "Option::is_none")] pub archived_at: Option, + /// This field supports import-from-pgdata ("fast imports" platform feature). + /// We don't currently use fast imports, so, this field is None for all production timelines. + /// See for more information. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub import_pgdata: Option, + /// Per layer file name metadata, which can be present for a present or missing layer file. /// /// Older versions of `IndexPart` will not have this property or have only a part of metadata @@ -90,10 +98,11 @@ impl IndexPart { /// - 7: metadata_bytes is no longer written, but still read /// - 8: added `archived_at` /// - 9: +gc_blocking - const LATEST_VERSION: usize = 9; + /// - 10: +import_pgdata + const LATEST_VERSION: usize = 10; // Versions we may see when reading from a bucket. - pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9]; + pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; pub const FILE_NAME: &'static str = "index_part.json"; @@ -108,6 +117,7 @@ impl IndexPart { lineage: Default::default(), gc_blocking: None, last_aux_file_policy: None, + import_pgdata: None, } } @@ -381,6 +391,7 @@ mod tests { lineage: Lineage::default(), gc_blocking: None, last_aux_file_policy: None, + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -425,6 +436,7 @@ mod tests { lineage: Lineage::default(), gc_blocking: None, last_aux_file_policy: None, + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -470,6 +482,7 @@ mod tests { lineage: Lineage::default(), gc_blocking: None, last_aux_file_policy: None, + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -518,6 +531,7 @@ mod tests { lineage: Lineage::default(), gc_blocking: None, last_aux_file_policy: None, + import_pgdata: None, }; let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap(); @@ -561,6 +575,7 @@ mod tests { lineage: Lineage::default(), gc_blocking: None, last_aux_file_policy: None, + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -607,6 +622,7 @@ mod tests { }, gc_blocking: None, last_aux_file_policy: None, + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -658,6 +674,7 @@ mod tests { }, gc_blocking: None, last_aux_file_policy: Some(AuxFilePolicy::V2), + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -714,6 +731,7 @@ mod tests { lineage: Default::default(), gc_blocking: None, last_aux_file_policy: Default::default(), + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -771,6 +789,7 @@ mod tests { lineage: Default::default(), gc_blocking: None, last_aux_file_policy: Default::default(), + import_pgdata: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -833,6 +852,83 @@ mod tests { }), last_aux_file_policy: Default::default(), archived_at: None, + import_pgdata: None, + }; + + let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); + assert_eq!(part, expected); + } + + #[test] + fn v10_importpgdata_is_parsed() { + let example = r#"{ + "version": 10, + "layer_metadata":{ + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 }, + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 } + }, + "disk_consistent_lsn":"0/16960E8", + "metadata": { + "disk_consistent_lsn": "0/16960E8", + "prev_record_lsn": "0/1696070", + "ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e", + "ancestor_lsn": "0/0", + "latest_gc_cutoff_lsn": "0/1696070", + "initdb_lsn": "0/1696070", + "pg_version": 14 + }, + "gc_blocking": { + "started_at": "2024-07-19T09:00:00.123", + "reasons": ["DetachAncestor"] + }, + "import_pgdata": { + "V1": { + "Done": { + "idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5", + "started_at": "2024-11-13T09:23:42.123", + "finished_at": "2024-11-13T09:42:23.123" + } + } + } + }"#; + + let expected = IndexPart { + version: 10, + layer_metadata: HashMap::from([ + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata { + file_size: 25600000, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }), + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata { + file_size: 9007199254741001, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }) + ]), + disk_consistent_lsn: "0/16960E8".parse::().unwrap(), + metadata: TimelineMetadata::new( + Lsn::from_str("0/16960E8").unwrap(), + Some(Lsn::from_str("0/1696070").unwrap()), + Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()), + Lsn::INVALID, + Lsn::from_str("0/1696070").unwrap(), + Lsn::from_str("0/1696070").unwrap(), + 14, + ).with_recalculated_checksum().unwrap(), + deleted_at: None, + lineage: Default::default(), + gc_blocking: Some(GcBlocking { + started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"), + reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]), + }), + last_aux_file_policy: Default::default(), + archived_at: None, + import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{ + started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"), + finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"), + idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()), + }))) }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d1285e7c8ae8..4881be33a605 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4,6 +4,7 @@ pub mod delete; pub(crate) mod detach_ancestor; mod eviction_task; pub(crate) mod handle; +pub(crate) mod import_pgdata; mod init; pub mod layer_manager; pub(crate) mod logical_size; @@ -2708,20 +2709,23 @@ impl Timeline { { Some(cancel) => cancel.cancel(), None => { - let state = self.current_state(); - if matches!( - state, - TimelineState::Broken { .. } | TimelineState::Stopping - ) { - - // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken). - // Don't make noise. - } else { - warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work"); - debug_assert!(false); + match self.current_state() { + TimelineState::Broken { .. } | TimelineState::Stopping => { + // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken). + // Don't make noise. + } + TimelineState::Loading => { + // Import does not return an activated timeline. + info!("discarding priority boost for logical size calculation because timeline is not yet active"); + } + TimelineState::Active => { + // activation should be setting the once cell + warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work"); + debug_assert!(false); + } } } - }; + } } } diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs new file mode 100644 index 000000000000..de564685805f --- /dev/null +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -0,0 +1,218 @@ +use std::sync::Arc; + +use anyhow::{bail, Context}; +use remote_storage::RemotePath; +use tokio_util::sync::CancellationToken; +use tracing::{info, info_span, Instrument}; +use utils::lsn::Lsn; + +use crate::{context::RequestContext, tenant::metadata::TimelineMetadata}; + +use super::Timeline; + +mod flow; +mod importbucket_client; +mod importbucket_format; +pub(crate) mod index_part_format; +pub(crate) mod upcall_api; + +pub async fn doit( + timeline: &Arc, + index_part: index_part_format::Root, + ctx: &RequestContext, + cancel: CancellationToken, +) -> anyhow::Result<()> { + let index_part_format::Root::V1(v1) = index_part; + let index_part_format::InProgress { + location, + idempotency_key, + started_at, + } = match v1 { + index_part_format::V1::Done(_) => return Ok(()), + index_part_format::V1::InProgress(in_progress) => in_progress, + }; + + let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?; + + info!("get spec early so we know we'll be able to upcall when done"); + let Some(spec) = storage.get_spec().await? else { + bail!("spec not found") + }; + + let upcall_client = + upcall_api::Client::new(timeline.conf, cancel.clone()).context("create upcall client")?; + + // + // send an early progress update to clean up k8s job early and generate potentially useful logs + // + info!("send early progress update"); + upcall_client + .send_progress_until_success(&spec) + .instrument(info_span!("early_progress_update")) + .await?; + + let status_prefix = RemotePath::from_string("status").unwrap(); + + // + // See if shard is done. + // TODO: incorporate generations into status key for split brain safety. Figure out together with checkpointing. + // + let shard_status_key = + status_prefix.join(format!("shard-{}", timeline.tenant_shard_id.shard_slug())); + let shard_status: Option = + storage.get_json(&shard_status_key).await?; + info!(?shard_status, "peeking shard status"); + if shard_status.map(|st| st.done).unwrap_or(false) { + info!("shard status indicates that the shard is done, skipping import"); + } else { + // TODO: checkpoint the progress into the IndexPart instead of restarting + // from the beginning. + + // + // Wipe the slate clean - the flow does not allow resuming. + // We can implement resuming in the future by checkpointing the progress into the IndexPart. + // + info!("wipe the slate clean"); + { + // TODO: do we need to hold GC lock for this? + let mut guard = timeline.layers.write().await; + assert!( + guard.layer_map()?.open_layer.is_none(), + "while importing, there should be no in-memory layer" // this just seems like a good place to assert it + ); + let all_layers_keys = guard.all_persistent_layers(); + let all_layers: Vec<_> = all_layers_keys + .iter() + .map(|key| guard.get_from_key(key)) + .collect(); + let open = guard.open_mut().context("open_mut")?; + + timeline.remote_client.schedule_gc_update(&all_layers)?; + open.finish_gc_timeline(&all_layers); + } + + // + // Wait for pgdata to finish uploading + // + info!("wait for pgdata to reach status 'done'"); + let pgdata_status_key = status_prefix.join("pgdata"); + loop { + let res = async { + let pgdata_status: Option = storage + .get_json(&pgdata_status_key) + .await + .context("get pgdata status")?; + info!(?pgdata_status, "peeking pgdata status"); + if pgdata_status.map(|st| st.done).unwrap_or(false) { + Ok(()) + } else { + Err(anyhow::anyhow!("pgdata not done yet")) + } + } + .await; + match res { + Ok(_) => break, + Err(err) => { + info!(?err, "indefintely waiting for pgdata to finish"); + if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled()) + .await + .is_ok() + { + bail!("cancelled while waiting for pgdata"); + } + } + } + } + + // + // Do the import + // + info!("do the import"); + let control_file = storage.get_control_file().await?; + let base_lsn = control_file.base_lsn(); + + info!("update TimelineMetadata based on LSNs from control file"); + { + let pg_version = control_file.pg_version(); + let _ctx: &RequestContext = ctx; + async move { + // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the + // checkpoint record, and prev_record_lsn should point to its beginning. + // We should read the real end of the record from the WAL, but here we + // just fake it. + let disk_consistent_lsn = Lsn(base_lsn.0 + 8); + let prev_record_lsn = base_lsn; + let metadata = TimelineMetadata::new( + disk_consistent_lsn, + Some(prev_record_lsn), + None, // no ancestor + Lsn(0), // no ancestor lsn + base_lsn, // latest_gc_cutoff_lsn + base_lsn, // initdb_lsn + pg_version, + ); + + let _start_lsn = disk_consistent_lsn + 1; + + timeline + .remote_client + .schedule_index_upload_for_full_metadata_update(&metadata)?; + + timeline.remote_client.wait_completion().await?; + + anyhow::Ok(()) + } + } + .await?; + + flow::run( + timeline.clone(), + base_lsn, + control_file, + storage.clone(), + ctx, + ) + .await?; + + // + // Communicate that shard is done. + // + storage + .put_json( + &shard_status_key, + &importbucket_format::ShardStatus { done: true }, + ) + .await + .context("put shard status")?; + } + + // + // Ensure at-least-once deliver of the upcall to cplane + // before we mark the task as done and never come here again. + // + info!("send final progress update"); + upcall_client + .send_progress_until_success(&spec) + .instrument(info_span!("final_progress_update")) + .await?; + + // + // Mark as done in index_part. + // This makes subsequent timeline loads enter the normal load code path + // instead of spawning the import task and calling this here function. + // + info!("mark import as complete in index part"); + timeline + .remote_client + .schedule_index_upload_for_import_pgdata_state_update(Some(index_part_format::Root::V1( + index_part_format::V1::Done(index_part_format::Done { + idempotency_key, + started_at, + finished_at: chrono::Utc::now().naive_utc(), + }), + )))?; + + timeline.remote_client.wait_completion().await?; + + Ok(()) +} diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs new file mode 100644 index 000000000000..cbd4168c06e5 --- /dev/null +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -0,0 +1,798 @@ +//! Import a PGDATA directory into an empty root timeline. +//! +//! This module is adapted hackathon code by Heikki and Stas. +//! Other code in the parent module was written by Christian as part of a customer PoC. +//! +//! The hackathon code was producing image layer files as a free-standing program. +//! +//! It has been modified to +//! - run inside a running Pageserver, within the proper lifecycles of Timeline -> Tenant(Shard) +//! - => sharding-awareness: produce image layers with only the data relevant for this shard +//! - => S3 as the source for the PGDATA instead of local filesystem +//! +//! TODOs before productionization: +//! - ChunkProcessingJob size / ImportJob::total_size does not account for sharding. +//! => produced image layers likely too small. +//! - ChunkProcessingJob should cut up an ImportJob to hit exactly target image layer size. +//! - asserts / unwraps need to be replaced with errors +//! - don't trust remote objects will be small (=prevent OOMs in those cases) +//! - limit all in-memory buffers in size, or download to disk and read from there +//! - limit task concurrency +//! - generally play nice with other tenants in the system +//! - importbucket is different bucket than main pageserver storage, so, should be fine wrt S3 rate limits +//! - but concerns like network bandwidth, local disk write bandwidth, local disk capacity, etc +//! - integrate with layer eviction system +//! - audit for Tenant::cancel nor Timeline::cancel responsivity +//! - audit for Tenant/Timeline gate holding (we spawn tokio tasks during this flow!) +//! +//! An incomplete set of TODOs from the Hackathon: +//! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest) + +use std::sync::Arc; + +use anyhow::{bail, ensure}; +use bytes::Bytes; + +use itertools::Itertools; +use pageserver_api::{ + key::{rel_block_to_key, rel_dir_to_key, rel_size_to_key, relmap_file_key, DBDIR_KEY}, + reltag::RelTag, + shard::ShardIdentity, +}; +use postgres_ffi::{pg_constants, relfile_utils::parse_relfilename, BLCKSZ}; +use tokio::task::JoinSet; +use tracing::{debug, info_span, instrument, Instrument}; + +use crate::{ + assert_u64_eq_usize::UsizeIsU64, + pgdatadir_mapping::{SlruSegmentDirectory, TwoPhaseDirectory}, +}; +use crate::{ + context::{DownloadBehavior, RequestContext}, + pgdatadir_mapping::{DbDirectory, RelDirectory}, + task_mgr::TaskKind, + tenant::storage_layer::{ImageLayerWriter, Layer}, +}; + +use pageserver_api::key::Key; +use pageserver_api::key::{ + slru_block_to_key, slru_dir_to_key, slru_segment_size_to_key, CHECKPOINT_KEY, CONTROLFILE_KEY, + TWOPHASEDIR_KEY, +}; +use pageserver_api::keyspace::singleton_range; +use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range}; +use pageserver_api::reltag::SlruKind; +use utils::bin_ser::BeSer; +use utils::lsn::Lsn; + +use std::collections::HashSet; +use std::ops::Range; + +use super::{ + importbucket_client::{ControlFile, RemoteStorageWrapper}, + Timeline, +}; + +use remote_storage::RemotePath; + +pub async fn run( + timeline: Arc, + pgdata_lsn: Lsn, + control_file: ControlFile, + storage: RemoteStorageWrapper, + ctx: &RequestContext, +) -> anyhow::Result<()> { + Flow { + timeline, + pgdata_lsn, + control_file, + tasks: Vec::new(), + storage, + } + .run(ctx) + .await +} + +struct Flow { + timeline: Arc, + pgdata_lsn: Lsn, + control_file: ControlFile, + tasks: Vec, + storage: RemoteStorageWrapper, +} + +impl Flow { + /// Perform the ingestion into [`Self::timeline`]. + /// Assumes the timeline is empty (= no layers). + pub async fn run(mut self, ctx: &RequestContext) -> anyhow::Result<()> { + let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align(); + + self.pgdata_lsn = pgdata_lsn; + + let datadir = PgDataDir::new(&self.storage).await?; + + // Import dbdir (00:00:00 keyspace) + // This is just constructed here, but will be written to the image layer in the first call to import_db() + let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory { + dbdirs: datadir + .dbs + .iter() + .map(|db| ((db.spcnode, db.dboid), true)) + .collect(), + })?); + self.tasks + .push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into()); + + // Import databases (00:spcnode:dbnode keyspace for each db) + for db in datadir.dbs { + self.import_db(&db).await?; + } + + // Import SLRUs + + // pg_xact (01:00 keyspace) + self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact")) + .await?; + // pg_multixact/members (01:01 keyspace) + self.import_slru( + SlruKind::MultiXactMembers, + &self.storage.pgdata().join("pg_multixact/members"), + ) + .await?; + // pg_multixact/offsets (01:02 keyspace) + self.import_slru( + SlruKind::MultiXactOffsets, + &self.storage.pgdata().join("pg_multixact/offsets"), + ) + .await?; + + // Import pg_twophase. + // TODO: as empty + let twophasedir_buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory { + xids: HashSet::new(), + })?; + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + TWOPHASEDIR_KEY, + Bytes::from(twophasedir_buf), + ))); + + // Controlfile, checkpoint + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + CONTROLFILE_KEY, + self.control_file.control_file_buf().clone(), + ))); + + let checkpoint_buf = self + .control_file + .control_file_data() + .checkPointCopy + .encode()?; + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + CHECKPOINT_KEY, + checkpoint_buf, + ))); + + // Assigns parts of key space to later parallel jobs + let mut last_end_key = Key::MIN; + let mut current_chunk = Vec::new(); + let mut current_chunk_size: usize = 0; + let mut parallel_jobs = Vec::new(); + for task in std::mem::take(&mut self.tasks).into_iter() { + if current_chunk_size + task.total_size() > 1024 * 1024 * 1024 { + let key_range = last_end_key..task.key_range().start; + parallel_jobs.push(ChunkProcessingJob::new( + key_range.clone(), + std::mem::take(&mut current_chunk), + &self, + )); + last_end_key = key_range.end; + current_chunk_size = 0; + } + current_chunk_size += task.total_size(); + current_chunk.push(task); + } + parallel_jobs.push(ChunkProcessingJob::new( + last_end_key..Key::MAX, + current_chunk, + &self, + )); + + // Start all jobs simultaneosly + let mut work = JoinSet::new(); + // TODO: semaphore? + for job in parallel_jobs { + let ctx: RequestContext = + ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error); + work.spawn(async move { job.run(&ctx).await }.instrument(info_span!("parallel_job"))); + } + let mut results = Vec::new(); + while let Some(result) = work.join_next().await { + match result { + Ok(res) => { + results.push(res); + } + Err(_joinset_err) => { + results.push(Err(anyhow::anyhow!( + "parallel job panicked or cancelled, check pageserver logs" + ))); + } + } + } + + if results.iter().all(|r| r.is_ok()) { + Ok(()) + } else { + let mut msg = String::new(); + for result in results { + if let Err(err) = result { + msg.push_str(&format!("{err:?}\n\n")); + } + } + bail!("Some parallel jobs failed:\n\n{msg}"); + } + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))] + async fn import_db(&mut self, db: &PgDataDirDb) -> anyhow::Result<()> { + debug!("start"); + scopeguard::defer! { + debug!("return"); + } + + // Import relmap (00:spcnode:dbnode:00:*:00) + let relmap_key = relmap_file_key(db.spcnode, db.dboid); + debug!("Constructing relmap entry, key {relmap_key}"); + let relmap_path = db.path.join("pg_filenode.map"); + let relmap_buf = self.storage.get(&relmap_path).await?; + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + relmap_key, relmap_buf, + ))); + + // Import reldir (00:spcnode:dbnode:00:*:01) + let reldir_key = rel_dir_to_key(db.spcnode, db.dboid); + debug!("Constructing reldirs entry, key {reldir_key}"); + let reldir_buf = RelDirectory::ser(&RelDirectory { + rels: db + .files + .iter() + .map(|f| (f.rel_tag.relnode, f.rel_tag.forknum)) + .collect(), + })?; + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + reldir_key, + Bytes::from(reldir_buf), + ))); + + // Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last + // segment in a given relation (00:spcnode:dbnode:reloid:fork:ff) + for file in &db.files { + debug!(%file.path, %file.filesize, "importing file"); + let len = file.filesize; + ensure!(len % 8192 == 0); + let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192); + let start_key = rel_block_to_key(file.rel_tag, start_blk); + let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32); + self.tasks + .push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new( + *self.timeline.get_shard_identity(), + start_key..end_key, + &file.path, + self.storage.clone(), + ))); + + // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff) + if let Some(nblocks) = file.nblocks { + let size_key = rel_size_to_key(file.rel_tag); + //debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}"); + let buf = nblocks.to_le_bytes(); + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + size_key, + Bytes::from(buf.to_vec()), + ))); + } + } + + Ok(()) + } + + async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> { + let segments = self.storage.listfilesindir(path).await?; + let segments: Vec<(String, u32, usize)> = segments + .into_iter() + .filter_map(|(path, size)| { + let filename = path.object_name()?; + let segno = u32::from_str_radix(filename, 16).ok()?; + Some((filename.to_string(), segno, size)) + }) + .collect(); + + // Write SlruDir + let slrudir_key = slru_dir_to_key(kind); + let segnos: HashSet = segments + .iter() + .map(|(_path, segno, _size)| *segno) + .collect(); + let slrudir = SlruSegmentDirectory { segments: segnos }; + let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?; + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + slrudir_key, + Bytes::from(slrudir_buf), + ))); + + for (segpath, segno, size) in segments { + // SlruSegBlocks for each segment + let p = path.join(&segpath); + let file_size = size; + ensure!(file_size % 8192 == 0); + let nblocks = u32::try_from(file_size / 8192)?; + let start_key = slru_block_to_key(kind, segno, 0); + let end_key = slru_block_to_key(kind, segno, nblocks); + debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment"); + self.tasks + .push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new( + *self.timeline.get_shard_identity(), + start_key..end_key, + &p, + self.storage.clone(), + ))); + + // Followed by SlruSegSize + let segsize_key = slru_segment_size_to_key(kind, segno); + let segsize_buf = nblocks.to_le_bytes(); + self.tasks + .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new( + segsize_key, + Bytes::copy_from_slice(&segsize_buf), + ))); + } + Ok(()) + } +} + +// +// dbdir iteration tools +// + +struct PgDataDir { + pub dbs: Vec, // spcnode, dboid, path +} + +struct PgDataDirDb { + pub spcnode: u32, + pub dboid: u32, + pub path: RemotePath, + pub files: Vec, +} + +struct PgDataDirDbFile { + pub path: RemotePath, + pub rel_tag: RelTag, + pub segno: u32, + pub filesize: usize, + // Cummulative size of the given fork, set only for the last segment of that fork + pub nblocks: Option, +} + +impl PgDataDir { + async fn new(storage: &RemoteStorageWrapper) -> anyhow::Result { + let datadir_path = storage.pgdata(); + // Import ordinary databases, DEFAULTTABLESPACE_OID is smaller than GLOBALTABLESPACE_OID, so import them first + // Traverse database in increasing oid order + + let basedir = &datadir_path.join("base"); + let db_oids: Vec<_> = storage + .listdir(basedir) + .await? + .into_iter() + .filter_map(|path| path.object_name().and_then(|name| name.parse::().ok())) + .sorted() + .collect(); + debug!(?db_oids, "found databases"); + let mut databases = Vec::new(); + for dboid in db_oids { + databases.push( + PgDataDirDb::new( + storage, + &basedir.join(dboid.to_string()), + pg_constants::DEFAULTTABLESPACE_OID, + dboid, + &datadir_path, + ) + .await?, + ); + } + + // special case for global catalogs + databases.push( + PgDataDirDb::new( + storage, + &datadir_path.join("global"), + postgres_ffi::pg_constants::GLOBALTABLESPACE_OID, + 0, + &datadir_path, + ) + .await?, + ); + + databases.sort_by_key(|db| (db.spcnode, db.dboid)); + + Ok(Self { dbs: databases }) + } +} + +impl PgDataDirDb { + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%dboid, %db_path))] + async fn new( + storage: &RemoteStorageWrapper, + db_path: &RemotePath, + spcnode: u32, + dboid: u32, + datadir_path: &RemotePath, + ) -> anyhow::Result { + let mut files: Vec = storage + .listfilesindir(db_path) + .await? + .into_iter() + .filter_map(|(path, size)| { + debug!(%path, %size, "found file in dbdir"); + path.object_name().and_then(|name| { + // returns (relnode, forknum, segno) + parse_relfilename(name).ok().map(|x| (size, x)) + }) + }) + .sorted_by_key(|(_, relfilename)| *relfilename) + .map(|(filesize, (relnode, forknum, segno))| { + let rel_tag = RelTag { + spcnode, + dbnode: dboid, + relnode, + forknum, + }; + + let path = datadir_path.join(rel_tag.to_segfile_name(segno)); + assert!(filesize % BLCKSZ as usize == 0); // TODO: this should result in an error + let nblocks = filesize / BLCKSZ as usize; + + PgDataDirDbFile { + path, + filesize, + rel_tag, + segno, + nblocks: Some(nblocks), // first non-cummulative sizes + } + }) + .collect(); + + // Set cummulative sizes. Do all of that math here, so that later we could easier + // parallelize over segments and know with which segments we need to write relsize + // entry. + let mut cumulative_nblocks: usize = 0; + let mut prev_rel_tag: Option = None; + for i in 0..files.len() { + if prev_rel_tag == Some(files[i].rel_tag) { + cumulative_nblocks += files[i].nblocks.unwrap(); + } else { + cumulative_nblocks = files[i].nblocks.unwrap(); + } + + files[i].nblocks = if i == files.len() - 1 || files[i + 1].rel_tag != files[i].rel_tag { + Some(cumulative_nblocks) + } else { + None + }; + + prev_rel_tag = Some(files[i].rel_tag); + } + + Ok(PgDataDirDb { + files, + path: db_path.clone(), + spcnode, + dboid, + }) + } +} + +trait ImportTask { + fn key_range(&self) -> Range; + + fn total_size(&self) -> usize { + // TODO: revisit this + if is_contiguous_range(&self.key_range()) { + contiguous_range_len(&self.key_range()) as usize * 8192 + } else { + u32::MAX as usize + } + } + + async fn doit( + self, + layer_writer: &mut ImageLayerWriter, + ctx: &RequestContext, + ) -> anyhow::Result; +} + +struct ImportSingleKeyTask { + key: Key, + buf: Bytes, +} + +impl ImportSingleKeyTask { + fn new(key: Key, buf: Bytes) -> Self { + ImportSingleKeyTask { key, buf } + } +} + +impl ImportTask for ImportSingleKeyTask { + fn key_range(&self) -> Range { + singleton_range(self.key) + } + + async fn doit( + self, + layer_writer: &mut ImageLayerWriter, + ctx: &RequestContext, + ) -> anyhow::Result { + layer_writer.put_image(self.key, self.buf, ctx).await?; + Ok(1) + } +} + +struct ImportRelBlocksTask { + shard_identity: ShardIdentity, + key_range: Range, + path: RemotePath, + storage: RemoteStorageWrapper, +} + +impl ImportRelBlocksTask { + fn new( + shard_identity: ShardIdentity, + key_range: Range, + path: &RemotePath, + storage: RemoteStorageWrapper, + ) -> Self { + ImportRelBlocksTask { + shard_identity, + key_range, + path: path.clone(), + storage, + } + } +} + +impl ImportTask for ImportRelBlocksTask { + fn key_range(&self) -> Range { + self.key_range.clone() + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%self.path))] + async fn doit( + self, + layer_writer: &mut ImageLayerWriter, + ctx: &RequestContext, + ) -> anyhow::Result { + debug!("Importing relation file"); + + let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?; + let (rel_tag_end, end_blk) = self.key_range.end.to_rel_block()?; + assert_eq!(rel_tag, rel_tag_end); + + let ranges = (start_blk..end_blk) + .enumerate() + .filter_map(|(i, blknum)| { + let key = rel_block_to_key(rel_tag, blknum); + if self.shard_identity.is_key_disposable(&key) { + return None; + } + let file_offset = i.checked_mul(8192).unwrap(); + Some(( + vec![key], + file_offset, + file_offset.checked_add(8192).unwrap(), + )) + }) + .coalesce(|(mut acc, acc_start, acc_end), (mut key, start, end)| { + assert_eq!(key.len(), 1); + assert!(!acc.is_empty()); + assert!(acc_end > acc_start); + if acc_end == start /* TODO additional max range check here, to limit memory consumption per task to X */ { + acc.push(key.pop().unwrap()); + Ok((acc, acc_start, end)) + } else { + Err(((acc, acc_start, acc_end), (key, start, end))) + } + }); + + let mut nimages = 0; + for (keys, range_start, range_end) in ranges { + let range_buf = self + .storage + .get_range(&self.path, range_start.into_u64(), range_end.into_u64()) + .await?; + let mut buf = Bytes::from(range_buf); + // TODO: batched writes + for key in keys { + let image = buf.split_to(8192); + layer_writer.put_image(key, image, ctx).await?; + nimages += 1; + } + } + + Ok(nimages) + } +} + +struct ImportSlruBlocksTask { + shard_identity: ShardIdentity, + key_range: Range, + path: RemotePath, + storage: RemoteStorageWrapper, +} + +impl ImportSlruBlocksTask { + fn new( + shard_identity: ShardIdentity, + key_range: Range, + path: &RemotePath, + storage: RemoteStorageWrapper, + ) -> Self { + ImportSlruBlocksTask { + shard_identity, + key_range, + path: path.clone(), + storage, + } + } +} + +impl ImportTask for ImportSlruBlocksTask { + fn key_range(&self) -> Range { + self.key_range.clone() + } + + async fn doit( + self, + layer_writer: &mut ImageLayerWriter, + ctx: &RequestContext, + ) -> anyhow::Result { + debug!("Importing SLRU segment file {}", self.path); + let buf = self.storage.get(&self.path).await?; + + let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?; + let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?; + let mut blknum = start_blk; + let mut nimages = 0; + let mut file_offset = 0; + while blknum < end_blk { + let key = slru_block_to_key(kind, segno, blknum); + assert!( + !self.shard_identity.is_key_disposable(&key), + "SLRU keys need to go into every shard" + ); + let buf = &buf[file_offset..(file_offset + 8192)]; + file_offset += 8192; + layer_writer + .put_image(key, Bytes::copy_from_slice(buf), ctx) + .await?; + blknum += 1; + nimages += 1; + } + Ok(nimages) + } +} + +enum AnyImportTask { + SingleKey(ImportSingleKeyTask), + RelBlocks(ImportRelBlocksTask), + SlruBlocks(ImportSlruBlocksTask), +} + +impl ImportTask for AnyImportTask { + fn key_range(&self) -> Range { + match self { + Self::SingleKey(t) => t.key_range(), + Self::RelBlocks(t) => t.key_range(), + Self::SlruBlocks(t) => t.key_range(), + } + } + /// returns the number of images put into the `layer_writer` + async fn doit( + self, + layer_writer: &mut ImageLayerWriter, + ctx: &RequestContext, + ) -> anyhow::Result { + match self { + Self::SingleKey(t) => t.doit(layer_writer, ctx).await, + Self::RelBlocks(t) => t.doit(layer_writer, ctx).await, + Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await, + } + } +} + +impl From for AnyImportTask { + fn from(t: ImportSingleKeyTask) -> Self { + Self::SingleKey(t) + } +} + +impl From for AnyImportTask { + fn from(t: ImportRelBlocksTask) -> Self { + Self::RelBlocks(t) + } +} + +impl From for AnyImportTask { + fn from(t: ImportSlruBlocksTask) -> Self { + Self::SlruBlocks(t) + } +} + +struct ChunkProcessingJob { + timeline: Arc, + range: Range, + tasks: Vec, + + pgdata_lsn: Lsn, +} + +impl ChunkProcessingJob { + fn new(range: Range, tasks: Vec, env: &Flow) -> Self { + assert!(env.pgdata_lsn.is_valid()); + Self { + timeline: env.timeline.clone(), + range, + tasks, + pgdata_lsn: env.pgdata_lsn, + } + } + + async fn run(self, ctx: &RequestContext) -> anyhow::Result<()> { + let mut writer = ImageLayerWriter::new( + self.timeline.conf, + self.timeline.timeline_id, + self.timeline.tenant_shard_id, + &self.range, + self.pgdata_lsn, + ctx, + ) + .await?; + + let mut nimages = 0; + for task in self.tasks { + nimages += task.doit(&mut writer, ctx).await?; + } + + let resident_layer = if nimages > 0 { + let (desc, path) = writer.finish(ctx).await?; + Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)? + } else { + // dropping the writer cleans up + return Ok(()); + }; + + // this is sharing the same code as create_image_layers + let mut guard = self.timeline.layers.write().await; + guard + .open_mut()? + .track_new_image_layers(&[resident_layer.clone()], &self.timeline.metrics); + crate::tenant::timeline::drop_wlock(guard); + + // Schedule the layer for upload but don't add barriers such as + // wait for completion or index upload, so we don't inhibit upload parallelism. + // TODO: limit upload parallelism somehow (e.g. by limiting concurrency of jobs?) + // TODO: or regulate parallelism by upload queue depth? Prob should happen at a higher level. + self.timeline + .remote_client + .schedule_layer_file_upload(resident_layer)?; + + Ok(()) + } +} diff --git a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs new file mode 100644 index 000000000000..8d5ab1780f70 --- /dev/null +++ b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs @@ -0,0 +1,315 @@ +use std::{ops::Bound, sync::Arc}; + +use anyhow::Context; +use bytes::Bytes; +use postgres_ffi::ControlFileData; +use remote_storage::{ + Download, DownloadError, DownloadOpts, GenericRemoteStorage, Listing, ListingObject, RemotePath, +}; +use serde::de::DeserializeOwned; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, instrument}; +use utils::lsn::Lsn; + +use crate::{assert_u64_eq_usize::U64IsUsize, config::PageServerConf}; + +use super::{importbucket_format, index_part_format}; + +pub async fn new( + conf: &'static PageServerConf, + location: &index_part_format::Location, + cancel: CancellationToken, +) -> Result { + // FIXME: we probably want some timeout, and we might be able to assume the max file + // size on S3 is 1GiB (postgres segment size). But the problem is that the individual + // downloaders don't know enough about concurrent downloads to make a guess on the + // expected bandwidth and resulting best timeout. + let timeout = std::time::Duration::from_secs(24 * 60 * 60); + let location_storage = match location { + #[cfg(feature = "testing")] + index_part_format::Location::LocalFs { path } => { + GenericRemoteStorage::LocalFs(remote_storage::LocalFs::new(path.clone(), timeout)?) + } + index_part_format::Location::AwsS3 { + region, + bucket, + key, + } => { + // TODO: think about security implications of letting the client specify the bucket & prefix. + // It's the most flexible right now, but, possibly we want to move bucket name into PS conf + // and force the timeline_id into the prefix? + GenericRemoteStorage::AwsS3(Arc::new( + remote_storage::S3Bucket::new( + &remote_storage::S3Config { + bucket_name: bucket.clone(), + prefix_in_bucket: Some(key.clone()), + bucket_region: region.clone(), + endpoint: conf + .import_pgdata_aws_endpoint_url + .clone() + .map(|url| url.to_string()), // by specifying None here, remote_storage/aws-sdk-rust will infer from env + concurrency_limit: 100.try_into().unwrap(), // TODO: think about this + max_keys_per_list_response: Some(1000), // TODO: think about this + upload_storage_class: None, // irrelevant + }, + timeout, + ) + .await + .context("setup s3 bucket")?, + )) + } + }; + let storage_wrapper = RemoteStorageWrapper::new(location_storage, cancel); + Ok(storage_wrapper) +} + +/// Wrap [`remote_storage`] APIs to make it look a bit more like a filesystem API +/// such as [`tokio::fs`], which was used in the original implementation of the import code. +#[derive(Clone)] +pub struct RemoteStorageWrapper { + storage: GenericRemoteStorage, + cancel: CancellationToken, +} + +impl RemoteStorageWrapper { + pub fn new(storage: GenericRemoteStorage, cancel: CancellationToken) -> Self { + Self { storage, cancel } + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] + pub async fn listfilesindir( + &self, + path: &RemotePath, + ) -> Result, DownloadError> { + assert!( + path.object_name().is_some(), + "must specify dirname, without trailing slash" + ); + let path = path.add_trailing_slash(); + + let res = crate::tenant::remote_timeline_client::download::download_retry_forever( + || async { + let Listing { keys, prefixes: _ } = self + .storage + .list( + Some(&path), + remote_storage::ListingMode::WithDelimiter, + None, + &self.cancel, + ) + .await?; + let res = keys + .into_iter() + .map(|ListingObject { key, size, .. }| (key, size.into_usize())) + .collect(); + Ok(res) + }, + &format!("listfilesindir {path:?}"), + &self.cancel, + ) + .await; + debug!(?res, "returning"); + res + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] + pub async fn listdir(&self, path: &RemotePath) -> Result, DownloadError> { + assert!( + path.object_name().is_some(), + "must specify dirname, without trailing slash" + ); + let path = path.add_trailing_slash(); + + let res = crate::tenant::remote_timeline_client::download::download_retry_forever( + || async { + let Listing { keys, prefixes } = self + .storage + .list( + Some(&path), + remote_storage::ListingMode::WithDelimiter, + None, + &self.cancel, + ) + .await?; + let res = keys + .into_iter() + .map(|ListingObject { key, .. }| key) + .chain(prefixes.into_iter()) + .collect(); + Ok(res) + }, + &format!("listdir {path:?}"), + &self.cancel, + ) + .await; + debug!(?res, "returning"); + res + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] + pub async fn get(&self, path: &RemotePath) -> Result { + let res = crate::tenant::remote_timeline_client::download::download_retry_forever( + || async { + let Download { + download_stream, .. + } = self + .storage + .download(path, &DownloadOpts::default(), &self.cancel) + .await?; + let mut reader = tokio_util::io::StreamReader::new(download_stream); + + // XXX optimize this, can we get the capacity hint from somewhere? + let mut buf = Vec::new(); + tokio::io::copy_buf(&mut reader, &mut buf).await?; + Ok(Bytes::from(buf)) + }, + &format!("download {path:?}"), + &self.cancel, + ) + .await; + debug!(len = res.as_ref().ok().map(|buf| buf.len()), "done"); + res + } + + pub async fn get_spec(&self) -> Result, anyhow::Error> { + self.get_json(&RemotePath::from_string("spec.json").unwrap()) + .await + .context("get spec") + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] + pub async fn get_json( + &self, + path: &RemotePath, + ) -> Result, DownloadError> { + let buf = match self.get(path).await { + Ok(buf) => buf, + Err(DownloadError::NotFound) => return Ok(None), + Err(err) => return Err(err), + }; + let res = serde_json::from_slice(&buf) + .context("serialize") + // TODO: own error type + .map_err(DownloadError::Other)?; + Ok(Some(res)) + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] + pub async fn put_json(&self, path: &RemotePath, value: &T) -> anyhow::Result<()> + where + T: serde::Serialize, + { + let buf = serde_json::to_vec(value)?; + let bytes = Bytes::from(buf); + utils::backoff::retry( + || async { + let size = bytes.len(); + let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone()))); + self.storage + .upload_storage_object(bytes, size, path, &self.cancel) + .await + }, + remote_storage::TimeoutOrCancel::caused_by_cancel, + 1, + u32::MAX, + &format!("put json {path}"), + &self.cancel, + ) + .await + .expect("practically infinite retries") + } + + #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] + pub async fn get_range( + &self, + path: &RemotePath, + start_inclusive: u64, + end_exclusive: u64, + ) -> Result, DownloadError> { + let len = end_exclusive + .checked_sub(start_inclusive) + .unwrap() + .into_usize(); + let res = crate::tenant::remote_timeline_client::download::download_retry_forever( + || async { + let Download { + download_stream, .. + } = self + .storage + .download( + path, + &DownloadOpts { + etag: None, + byte_start: Bound::Included(start_inclusive), + byte_end: Bound::Excluded(end_exclusive) + }, + &self.cancel) + .await?; + let mut reader = tokio_util::io::StreamReader::new(download_stream); + + let mut buf = Vec::with_capacity(len); + tokio::io::copy_buf(&mut reader, &mut buf).await?; + Ok(buf) + }, + &format!("download range len=0x{len:x} [0x{start_inclusive:x},0x{end_exclusive:x}) from {path:?}"), + &self.cancel, + ) + .await; + debug!(len = res.as_ref().ok().map(|buf| buf.len()), "done"); + res + } + + pub fn pgdata(&self) -> RemotePath { + RemotePath::from_string("pgdata").unwrap() + } + + pub async fn get_control_file(&self) -> Result { + let control_file_path = self.pgdata().join("global/pg_control"); + info!("get control file from {control_file_path}"); + let control_file_buf = self.get(&control_file_path).await?; + ControlFile::new(control_file_buf) + } +} + +pub struct ControlFile { + control_file_data: ControlFileData, + control_file_buf: Bytes, +} + +impl ControlFile { + pub(crate) fn new(control_file_buf: Bytes) -> Result { + // XXX ControlFileData is version-specific, we're always using v14 here. v17 had changes. + let control_file_data = ControlFileData::decode(&control_file_buf)?; + let control_file = ControlFile { + control_file_data, + control_file_buf, + }; + control_file.try_pg_version()?; // so that we can offer infallible pg_version() + Ok(control_file) + } + pub(crate) fn base_lsn(&self) -> Lsn { + Lsn(self.control_file_data.checkPoint).align() + } + pub(crate) fn pg_version(&self) -> u32 { + self.try_pg_version() + .expect("prepare() checks that try_pg_version doesn't error") + } + pub(crate) fn control_file_data(&self) -> &ControlFileData { + &self.control_file_data + } + pub(crate) fn control_file_buf(&self) -> &Bytes { + &self.control_file_buf + } + fn try_pg_version(&self) -> anyhow::Result { + Ok(match self.control_file_data.catalog_version_no { + // thesea are from catversion.h + 202107181 => 14, + 202209061 => 15, + 202307071 => 16, + /* XXX pg17 */ + catversion => { + anyhow::bail!("unrecognized catalog version {catversion}") + } + }) + } +} diff --git a/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs b/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs new file mode 100644 index 000000000000..04ba3c6f1fda --- /dev/null +++ b/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs @@ -0,0 +1,20 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] +pub struct PgdataStatus { + pub done: bool, + // TODO: remaining fields +} + +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] +pub struct ShardStatus { + pub done: bool, + // TODO: remaining fields +} + +// TODO: dedupe with fast_import code +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] +pub struct Spec { + pub project_id: String, + pub branch_id: String, +} diff --git a/pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs b/pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs new file mode 100644 index 000000000000..310d97a6a975 --- /dev/null +++ b/pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs @@ -0,0 +1,68 @@ +use serde::{Deserialize, Serialize}; + +#[cfg(feature = "testing")] +use camino::Utf8PathBuf; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub enum Root { + V1(V1), +} +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub enum V1 { + InProgress(InProgress), + Done(Done), +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(transparent)] +pub struct IdempotencyKey(String); + +impl IdempotencyKey { + pub fn new(s: String) -> Self { + Self(s) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct InProgress { + pub idempotency_key: IdempotencyKey, + pub location: Location, + pub started_at: chrono::NaiveDateTime, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct Done { + pub idempotency_key: IdempotencyKey, + pub started_at: chrono::NaiveDateTime, + pub finished_at: chrono::NaiveDateTime, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub enum Location { + #[cfg(feature = "testing")] + LocalFs { path: Utf8PathBuf }, + AwsS3 { + region: String, + bucket: String, + key: String, + }, +} + +impl Root { + pub fn is_done(&self) -> bool { + match self { + Root::V1(v1) => match v1 { + V1::Done(_) => true, + V1::InProgress(_) => false, + }, + } + } + pub fn idempotency_key(&self) -> &IdempotencyKey { + match self { + Root::V1(v1) => match v1 { + V1::InProgress(in_progress) => &in_progress.idempotency_key, + V1::Done(done) => &done.idempotency_key, + }, + } + } +} diff --git a/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs b/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs new file mode 100644 index 000000000000..c5210f9a30a7 --- /dev/null +++ b/pageserver/src/tenant/timeline/import_pgdata/upcall_api.rs @@ -0,0 +1,119 @@ +//! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate. +use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt}; +use serde::{Deserialize, Serialize}; +use tokio_util::sync::CancellationToken; +use tracing::error; + +use crate::config::PageServerConf; +use reqwest::Method; + +use super::importbucket_format::Spec; + +pub struct Client { + base_url: String, + authorization_header: Option, + client: reqwest::Client, + cancel: CancellationToken, +} + +pub type Result = std::result::Result; + +#[derive(Serialize, Deserialize, Debug)] +struct ImportProgressRequest { + // no fields yet, not sure if there every will be any +} + +#[derive(Serialize, Deserialize, Debug)] +struct ImportProgressResponse { + // we don't care +} + +impl Client { + pub fn new(conf: &PageServerConf, cancel: CancellationToken) -> anyhow::Result { + let Some(ref base_url) = conf.import_pgdata_upcall_api else { + anyhow::bail!("import_pgdata_upcall_api is not configured") + }; + Ok(Self { + base_url: base_url.to_string(), + client: reqwest::Client::new(), + cancel, + authorization_header: conf + .import_pgdata_upcall_api_token + .as_ref() + .map(|secret_string| secret_string.get_contents()) + .map(|jwt| format!("Bearer {jwt}")), + }) + } + + fn start_request( + &self, + method: Method, + uri: U, + ) -> reqwest::RequestBuilder { + let req = self.client.request(method, uri); + if let Some(value) = &self.authorization_header { + req.header(reqwest::header::AUTHORIZATION, value) + } else { + req + } + } + + async fn request_noerror( + &self, + method: Method, + uri: U, + body: B, + ) -> Result { + self.start_request(method, uri) + .json(&body) + .send() + .await + .map_err(Error::ReceiveBody) + } + + async fn request( + &self, + method: Method, + uri: U, + body: B, + ) -> Result { + let res = self.request_noerror(method, uri, body).await?; + let response = res.error_from_body().await?; + Ok(response) + } + + pub async fn send_progress_once(&self, spec: &Spec) -> Result<()> { + let url = format!( + "{}/projects/{}/branches/{}/import_progress", + self.base_url, spec.project_id, spec.branch_id + ); + let ImportProgressResponse {} = self + .request(Method::POST, url, &ImportProgressRequest {}) + .await? + .json() + .await + .map_err(Error::ReceiveBody)?; + Ok(()) + } + + pub async fn send_progress_until_success(&self, spec: &Spec) -> anyhow::Result<()> { + loop { + match self.send_progress_once(spec).await { + Ok(()) => return Ok(()), + Err(Error::Cancelled) => return Err(anyhow::anyhow!("cancelled")), + Err(err) => { + error!(?err, "error sending progress, retrying"); + if tokio::time::timeout( + std::time::Duration::from_secs(10), + self.cancel.cancelled(), + ) + .await + .is_ok() + { + anyhow::bail!("cancelled while sending early progress update"); + } + } + } + } + } +} diff --git a/pageserver/src/tenant/timeline/uninit.rs b/pageserver/src/tenant/timeline/uninit.rs index a93bdde3f8d3..80a09b4840d0 100644 --- a/pageserver/src/tenant/timeline/uninit.rs +++ b/pageserver/src/tenant/timeline/uninit.rs @@ -3,7 +3,7 @@ use std::{collections::hash_map::Entry, fs, sync::Arc}; use anyhow::Context; use camino::Utf8PathBuf; use tracing::{error, info, info_span}; -use utils::{fs_ext, id::TimelineId, lsn::Lsn}; +use utils::{fs_ext, id::TimelineId, lsn::Lsn, sync::gate::GateGuard}; use crate::{ context::RequestContext, @@ -23,14 +23,14 @@ use super::Timeline; pub struct UninitializedTimeline<'t> { pub(crate) owning_tenant: &'t Tenant, timeline_id: TimelineId, - raw_timeline: Option<(Arc, TimelineCreateGuard<'t>)>, + raw_timeline: Option<(Arc, TimelineCreateGuard)>, } impl<'t> UninitializedTimeline<'t> { pub(crate) fn new( owning_tenant: &'t Tenant, timeline_id: TimelineId, - raw_timeline: Option<(Arc, TimelineCreateGuard<'t>)>, + raw_timeline: Option<(Arc, TimelineCreateGuard)>, ) -> Self { Self { owning_tenant, @@ -87,6 +87,10 @@ impl<'t> UninitializedTimeline<'t> { } } + pub(crate) fn finish_creation_myself(&mut self) -> (Arc, TimelineCreateGuard) { + self.raw_timeline.take().expect("already checked") + } + /// Prepares timeline data by loading it from the basebackup archive. pub(crate) async fn import_basebackup_from_tar( self, @@ -167,9 +171,10 @@ pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) { /// A guard for timeline creations in process: as long as this object exists, the timeline ID /// is kept in `[Tenant::timelines_creating]` to exclude concurrent attempts to create the same timeline. #[must_use] -pub(crate) struct TimelineCreateGuard<'t> { - owning_tenant: &'t Tenant, - timeline_id: TimelineId, +pub(crate) struct TimelineCreateGuard { + pub(crate) _tenant_gate_guard: GateGuard, + pub(crate) owning_tenant: Arc, + pub(crate) timeline_id: TimelineId, pub(crate) timeline_path: Utf8PathBuf, pub(crate) idempotency: CreateTimelineIdempotency, } @@ -184,20 +189,27 @@ pub(crate) enum TimelineExclusionError { }, #[error("Already creating")] AlreadyCreating, + #[error("Shutting down")] + ShuttingDown, // e.g. I/O errors, or some failure deep in postgres initdb #[error(transparent)] Other(#[from] anyhow::Error), } -impl<'t> TimelineCreateGuard<'t> { +impl TimelineCreateGuard { pub(crate) fn new( - owning_tenant: &'t Tenant, + owning_tenant: &Arc, timeline_id: TimelineId, timeline_path: Utf8PathBuf, idempotency: CreateTimelineIdempotency, allow_offloaded: bool, ) -> Result { + let _tenant_gate_guard = owning_tenant + .gate + .enter() + .map_err(|_| TimelineExclusionError::ShuttingDown)?; + // Lock order: this is the only place we take both locks. During drop() we only // lock creating_timelines let timelines = owning_tenant.timelines.lock().unwrap(); @@ -225,8 +237,12 @@ impl<'t> TimelineCreateGuard<'t> { return Err(TimelineExclusionError::AlreadyCreating); } creating_timelines.insert(timeline_id); + drop(creating_timelines); + drop(timelines_offloaded); + drop(timelines); Ok(Self { - owning_tenant, + _tenant_gate_guard, + owning_tenant: Arc::clone(owning_tenant), timeline_id, timeline_path, idempotency, @@ -234,7 +250,7 @@ impl<'t> TimelineCreateGuard<'t> { } } -impl Drop for TimelineCreateGuard<'_> { +impl Drop for TimelineCreateGuard { fn drop(&mut self) { self.owning_tenant .timelines_creating diff --git a/test_runner/fixtures/common_types.py b/test_runner/fixtures/common_types.py index c73d5411fa09..6c22b31e0092 100644 --- a/test_runner/fixtures/common_types.py +++ b/test_runner/fixtures/common_types.py @@ -190,6 +190,25 @@ def from_json(cls, d: dict[str, Any]) -> TenantTimelineId: ) +@dataclass +class ShardIndex: + shard_number: int + shard_count: int + + # cf impl Display for ShardIndex + @override + def __str__(self) -> str: + return f"{self.shard_number:02x}{self.shard_count:02x}" + + @classmethod + def parse(cls: type[ShardIndex], input: str) -> ShardIndex: + assert len(input) == 4 + return cls( + shard_number=int(input[0:2], 16), + shard_count=int(input[2:4], 16), + ) + + class TenantShardId: def __init__(self, tenant_id: TenantId, shard_number: int, shard_count: int): self.tenant_id = tenant_id @@ -222,6 +241,10 @@ def __str__(self): # Unsharded case: equivalent of Rust TenantShardId::unsharded(tenant_id) return str(self.tenant_id) + @property + def shard_index(self) -> ShardIndex: + return ShardIndex(self.shard_number, self.shard_count) + @override def __repr__(self): return self.__str__() diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index d8d2b87b4e3d..78e242217177 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1883,6 +1883,20 @@ def tenant_create( response.raise_for_status() log.info(f"tenant_create success: {response.json()}") + def timeline_create( + self, + tenant_id: TenantId, + body: dict[str, Any], + ): + response = self.request( + "POST", + f"{self.api}/v1/tenant/{tenant_id}/timeline", + json=body, + headers=self.headers(TokenScope.PAGE_SERVER_API), + ) + response.raise_for_status() + log.info(f"timeline_create success: {response.json()}") + def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]: """ :return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr": str, "listen_http_port": int} diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 56386fdd373f..4cf3ece39634 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -1,5 +1,9 @@ from __future__ import annotations +import dataclasses +import json +import random +import string import time from collections import defaultdict from dataclasses import dataclass @@ -10,7 +14,14 @@ from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry -from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineArchivalState, TimelineId +from fixtures.common_types import ( + Id, + Lsn, + TenantId, + TenantShardId, + TimelineArchivalState, + TimelineId, +) from fixtures.log_helper import log from fixtures.metrics import Metrics, MetricsGetter, parse_metrics from fixtures.pg_version import PgVersion @@ -24,6 +35,69 @@ def __init__(self, message, status_code: int): self.status_code = status_code +@dataclass +class ImportPgdataIdemptencyKey: + key: str + + @staticmethod + def random() -> ImportPgdataIdemptencyKey: + return ImportPgdataIdemptencyKey( + "".join(random.choices(string.ascii_letters + string.digits, k=20)) + ) + + +@dataclass +class LocalFs: + path: str + + +@dataclass +class AwsS3: + region: str + bucket: str + key: str + + +@dataclass +class ImportPgdataLocation: + LocalFs: None | LocalFs = None + AwsS3: None | AwsS3 = None + + +@dataclass +class TimelineCreateRequestModeImportPgdata: + location: ImportPgdataLocation + idempotency_key: ImportPgdataIdemptencyKey + + +@dataclass +class TimelineCreateRequestMode: + Branch: None | dict[str, Any] = None + Bootstrap: None | dict[str, Any] = None + ImportPgdata: None | TimelineCreateRequestModeImportPgdata = None + + +@dataclass +class TimelineCreateRequest: + new_timeline_id: TimelineId + mode: TimelineCreateRequestMode + + def to_json(self) -> str: + class EnhancedJSONEncoder(json.JSONEncoder): + def default(self, o): + if dataclasses.is_dataclass(o) and not isinstance(o, type): + return dataclasses.asdict(o) + elif isinstance(o, Id): + return o.id.hex() + return super().default(o) + + # mode is flattened + this = dataclasses.asdict(self) + mode = this.pop("mode") + this.update(mode) + return json.dumps(self, cls=EnhancedJSONEncoder) + + class TimelineCreate406(PageserverApiException): def __init__(self, res: requests.Response): assert res.status_code == 406 diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 010801be6cb5..30720e648d24 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -674,6 +674,13 @@ def run_only_on_default_postgres(reason: str): ) +def run_only_on_postgres(versions: Iterable[PgVersion], reason: str): + return pytest.mark.skipif( + PgVersion(os.getenv("DEFAULT_PG_VERSION", PgVersion.DEFAULT)) not in versions, + reason=reason, + ) + + def skip_in_debug_build(reason: str): return pytest.mark.skipif( os.getenv("BUILD_TYPE", "debug") == "debug", diff --git a/test_runner/regress/test_import_pgdata.py b/test_runner/regress/test_import_pgdata.py new file mode 100644 index 000000000000..29229b73c156 --- /dev/null +++ b/test_runner/regress/test_import_pgdata.py @@ -0,0 +1,307 @@ +import json +import re +import time +from enum import Enum + +import psycopg2 +import psycopg2.errors +import pytest +from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder, VanillaPostgres +from fixtures.pageserver.http import ( + ImportPgdataIdemptencyKey, + PageserverApiException, +) +from fixtures.pg_version import PgVersion +from fixtures.remote_storage import RemoteStorageKind +from fixtures.utils import run_only_on_postgres +from pytest_httpserver import HTTPServer +from werkzeug.wrappers.request import Request +from werkzeug.wrappers.response import Response + +num_rows = 1000 + + +class RelBlockSize(Enum): + ONE_STRIPE_SIZE = 1 + TWO_STRPES_PER_SHARD = 2 + MULTIPLE_RELATION_SEGMENTS = 3 + + +smoke_params = [ + # unsharded (the stripe size needs to be given for rel block size calculations) + *[(None, 1024, s) for s in RelBlockSize], + # many shards, small stripe size to speed up test + *[(8, 1024, s) for s in RelBlockSize], +] + + +@run_only_on_postgres( + [PgVersion.V14, PgVersion.V15, PgVersion.V16], + "newer control file catalog version and struct format isn't supported", +) +@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params) +def test_pgdata_import_smoke( + vanilla_pg: VanillaPostgres, + neon_env_builder: NeonEnvBuilder, + shard_count: int | None, + stripe_size: int, + rel_block_size: RelBlockSize, + make_httpserver: HTTPServer, +): + # + # Setup fake control plane for import progress + # + def handler(request: Request) -> Response: + log.info(f"control plane request: {request.json}") + return Response(json.dumps({}), status=200) + + cplane_mgmt_api_server = make_httpserver + cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler) + + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + env = neon_env_builder.init_start() + + env.pageserver.patch_config_toml_nonrecursive( + { + "import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api" + } + ) + env.pageserver.stop() + env.pageserver.start() + + # + # Put data in vanilla pg + # + + vanilla_pg.start() + vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser") + + log.info("create relblock data") + if rel_block_size == RelBlockSize.ONE_STRIPE_SIZE: + target_relblock_size = stripe_size * 8192 + elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD: + target_relblock_size = (shard_count or 1) * stripe_size * 8192 * 2 + elif rel_block_size == RelBlockSize.MULTIPLE_RELATION_SEGMENTS: + target_relblock_size = int(((2.333 * 1024 * 1024 * 1024) // 8192) * 8192) + else: + raise ValueError + + # fillfactor so we don't need to produce that much data + # 900 byte per row is > 10% => 1 row per page + vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""") + + nrows = 0 + while True: + relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')") + log.info( + f"relblock size: {relblock_size/8192} pages (target: {target_relblock_size//8192}) pages" + ) + if relblock_size >= target_relblock_size: + break + addrows = int((target_relblock_size - relblock_size) // 8192) + assert addrows >= 1, "forward progress" + vanilla_pg.safe_psql(f"insert into t select generate_series({nrows+1}, {nrows + addrows})") + nrows += addrows + expect_nrows = nrows + expect_sum = ( + (nrows) * (nrows + 1) // 2 + ) # https://stackoverflow.com/questions/43901484/sum-of-the-integers-from-1-to-n + + def validate_vanilla_equivalence(ep): + # TODO: would be nicer to just compare pgdump + assert ep.safe_psql("select count(*), sum(data::bigint)::bigint from t") == [ + (expect_nrows, expect_sum) + ] + + validate_vanilla_equivalence(vanilla_pg) + + vanilla_pg.stop() + + # + # We have a Postgres data directory now. + # Make a localfs remote storage that looks like how after `fast_import` ran. + # TODO: actually exercise fast_import here + # TODO: test s3 remote storage + # + importbucket = neon_env_builder.repo_dir / "importbucket" + importbucket.mkdir() + # what cplane writes before scheduling fast_import + specpath = importbucket / "spec.json" + specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"})) + # what fast_import writes + vanilla_pg.pgdatadir.rename(importbucket / "pgdata") + statusdir = importbucket / "status" + statusdir.mkdir() + (statusdir / "pgdata").write_text(json.dumps({"done": True})) + + # + # Do the import + # + + tenant_id = TenantId.generate() + env.storage_controller.tenant_create( + tenant_id, shard_count=shard_count, shard_stripe_size=stripe_size + ) + + timeline_id = TimelineId.generate() + log.info("starting import") + start = time.monotonic() + + idempotency = ImportPgdataIdemptencyKey.random() + log.info(f"idempotency key {idempotency}") + # TODO: teach neon_local CLI about the idempotency & 429 error so we can run inside the loop + # and check for 429 + + import_branch_name = "imported" + env.storage_controller.timeline_create( + tenant_id, + { + "new_timeline_id": str(timeline_id), + "import_pgdata": { + "idempotency_key": str(idempotency), + "location": {"LocalFs": {"path": str(importbucket.absolute())}}, + }, + }, + ) + env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id) + + while True: + locations = env.storage_controller.locate(tenant_id) + active_count = 0 + for location in locations: + shard_id = TenantShardId.parse(location["shard_id"]) + ps = env.get_pageserver(location["node_id"]) + try: + detail = ps.http_client().timeline_detail(shard_id, timeline_id) + state = detail["state"] + log.info(f"shard {shard_id} state: {state}") + if state == "Active": + active_count += 1 + except PageserverApiException as e: + if e.status_code == 404: + log.info("not found, import is in progress") + continue + elif e.status_code == 429: + log.info("import is in progress") + continue + else: + raise + + shard_status_file = statusdir / f"shard-{shard_id.shard_index}" + if state == "Active": + shard_status_file_contents = ( + shard_status_file.read_text() + ) # Active state implies import is done + shard_status = json.loads(shard_status_file_contents) + assert shard_status["done"] is True + + if active_count == len(locations): + log.info("all shards are active") + break + time.sleep(1) + + import_duration = time.monotonic() - start + log.info(f"import complete; duration={import_duration:.2f}s") + + # + # Get some timeline details for later. + # + locations = env.storage_controller.locate(tenant_id) + [shard_zero] = [ + loc for loc in locations if TenantShardId.parse(loc["shard_id"]).shard_number == 0 + ] + shard_zero_ps = env.get_pageserver(shard_zero["node_id"]) + shard_zero_http = shard_zero_ps.http_client() + shard_zero_timeline_info = shard_zero_http.timeline_detail(shard_zero["shard_id"], timeline_id) + initdb_lsn = Lsn(shard_zero_timeline_info["initdb_lsn"]) + latest_gc_cutoff_lsn = Lsn(shard_zero_timeline_info["latest_gc_cutoff_lsn"]) + last_record_lsn = Lsn(shard_zero_timeline_info["last_record_lsn"]) + disk_consistent_lsn = Lsn(shard_zero_timeline_info["disk_consistent_lsn"]) + _remote_consistent_lsn = Lsn(shard_zero_timeline_info["remote_consistent_lsn"]) + remote_consistent_lsn_visible = Lsn(shard_zero_timeline_info["remote_consistent_lsn_visible"]) + # assert remote_consistent_lsn_visible == remote_consistent_lsn TODO: this fails initially and after restart, presumably because `UploadQueue::clean.1` is still `None` + assert remote_consistent_lsn_visible == disk_consistent_lsn + assert initdb_lsn == latest_gc_cutoff_lsn + assert disk_consistent_lsn == initdb_lsn + 8 + assert last_record_lsn == disk_consistent_lsn + # TODO: assert these values are the same everywhere + + # + # Validate the resulting remote storage state. + # + + # + # Validate the imported data + # + + ro_endpoint = env.endpoints.create_start( + branch_name=import_branch_name, endpoint_id="ro", tenant_id=tenant_id, lsn=last_record_lsn + ) + + validate_vanilla_equivalence(ro_endpoint) + + # ensure the import survives restarts + ro_endpoint.stop() + env.pageserver.stop(immediate=True) + env.pageserver.start() + ro_endpoint.start() + validate_vanilla_equivalence(ro_endpoint) + + # + # validate the layer files in each shard only have the shard-specific data + # (the implementation would be functional but not efficient without this characteristic) + # + + shards = env.storage_controller.locate(tenant_id) + for shard in shards: + shard_ps = env.get_pageserver(shard["node_id"]) + result = shard_ps.timeline_scan_no_disposable_keys(shard["shard_id"], timeline_id) + assert result.tally.disposable_count == 0 + assert ( + result.tally.not_disposable_count > 0 + ), "sanity check, each shard should have some data" + + # + # validate that we can write + # + rw_endpoint = env.endpoints.create_start( + branch_name=import_branch_name, endpoint_id="rw", tenant_id=tenant_id + ) + rw_endpoint.safe_psql("create table othertable(values text)") + rw_lsn = Lsn(rw_endpoint.safe_psql_scalar("select pg_current_wal_flush_lsn()")) + + # TODO: consider using `class Workload` here + # to do compaction and whatnot? + + # + # validate that we can branch (important use case) + # + + # ... at the tip + _ = env.create_branch( + new_branch_name="br-tip", + ancestor_branch_name=import_branch_name, + tenant_id=tenant_id, + ancestor_start_lsn=rw_lsn, + ) + br_tip_endpoint = env.endpoints.create_start( + branch_name="br-tip", endpoint_id="br-tip-ro", tenant_id=tenant_id + ) + validate_vanilla_equivalence(br_tip_endpoint) + br_tip_endpoint.safe_psql("select * from othertable") + + # ... at the initdb lsn + _ = env.create_branch( + new_branch_name="br-initdb", + ancestor_branch_name=import_branch_name, + tenant_id=tenant_id, + ancestor_start_lsn=initdb_lsn, + ) + br_initdb_endpoint = env.endpoints.create_start( + branch_name="br-initdb", endpoint_id="br-initdb-ro", tenant_id=tenant_id + ) + validate_vanilla_equivalence(br_initdb_endpoint) + with pytest.raises(psycopg2.errors.UndefinedTable): + br_initdb_endpoint.safe_psql("select * from othertable") diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index 667d54df027a..a73d9d635288 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -19,7 +19,8 @@ ahash = { version = "0.8" } anyhow = { version = "1", features = ["backtrace"] } axum = { version = "0.7", features = ["ws"] } axum-core = { version = "0.4", default-features = false, features = ["tracing"] } -base64 = { version = "0.21", features = ["alloc"] } +base64-594e8ee84c453af0 = { package = "base64", version = "0.13", features = ["alloc"] } +base64-647d43efb71741da = { package = "base64", version = "0.21", features = ["alloc"] } base64ct = { version = "1", default-features = false, features = ["std"] } bytes = { version = "1", features = ["serde"] } camino = { version = "1", default-features = false, features = ["serde1"] }