Skip to content

Commit

Permalink
Add more substantial tests for compute migrations
Browse files Browse the repository at this point in the history
The previous tests really didn't do much. This set should be quite a bit
more encompassing.

Signed-off-by: Tristan Partin <[email protected]>
  • Loading branch information
tristan957 committed Dec 19, 2024
1 parent 04517c6 commit 183df72
Show file tree
Hide file tree
Showing 26 changed files with 315 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion compute_tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license.workspace = true
[features]
default = []
# Enables test specific features.
testing = []
testing = ["fail/failpoints"]

[dependencies]
base64.workspace = true
Expand All @@ -19,6 +19,7 @@ camino.workspace = true
chrono.workspace = true
cfg-if.workspace = true
clap.workspace = true
fail.workspace = true
flate2.workspace = true
futures.workspace = true
hyper0 = { workspace = true, features = ["full"] }
Expand Down
5 changes: 5 additions & 0 deletions compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,15 @@ use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::swap::resize_swap;
use rlimit::{setrlimit, Resource};
use utils::failpoint_support;

// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";

fn main() -> Result<()> {
let scenario = failpoint_support::init();

let (build_tag, clap_args) = init()?;

// enable core dumping for all child processes
Expand Down Expand Up @@ -100,6 +103,8 @@ fn main() -> Result<()> {

maybe_delay_exit(delay_exit);

scenario.teardown();

deinit_and_exit(wait_pg_result);
}

Expand Down
15 changes: 13 additions & 2 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1181,8 +1181,19 @@ impl ComputeNode {
let mut conf = postgres::config::Config::from(conf);
conf.application_name("compute_ctl:migrations");

let mut client = conf.connect(NoTls)?;
handle_migrations(&mut client).context("apply_config handle_migrations")
match conf.connect(NoTls) {
Ok(mut client) => {
if let Err(e) = handle_migrations(&mut client) {
error!("Failed to run migrations: {}", e);
}
}
Err(e) => {
error!(
"Failed to connect to the compute for running migrations: {}",
e
);
}
};
});

Ok::<(), anyhow::Error>(())
Expand Down
15 changes: 15 additions & 0 deletions compute_tools/src/http/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ use metrics::proto::MetricFamily;
use metrics::Encoder;
use metrics::TextEncoder;
use tokio::task;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use tracing_utils::http::OtelName;
use utils::failpoint_support::failpoints_handler;
use utils::http::error::ApiError;
use utils::http::request::must_get_query_param;

fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
Expand Down Expand Up @@ -310,6 +313,18 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}

(&Method::POST, "/failpoints") if cfg!(feature = "testing") => {
match failpoints_handler(req, CancellationToken::new()).await {
Ok(r) => r,
Err(ApiError::BadRequest(e)) => {
render_json_error(&e.to_string(), StatusCode::BAD_REQUEST)
}
Err(_) => {
render_json_error("Internal server error", StatusCode::INTERNAL_SERVER_ERROR)
}
}
}

// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
Expand Down
27 changes: 26 additions & 1 deletion compute_tools/src/migration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{Context, Result};
use fail::fail_point;
use postgres::Client;
use tracing::info;

Expand Down Expand Up @@ -65,10 +66,35 @@ impl<'m> MigrationRunner<'m> {
};
}

// We use this fail point in order to check that failing in the
// middle of applying a series of migrations fails in an expected
// manner
{
let fail = (|| {
fail_point!("compute-migration", |migration_id| {
migration_id!(current_migration)
== migration_id.unwrap().parse::<i64>().unwrap()
});

false
})();

if fail {
return Err(anyhow::anyhow!(
"the current migration was configured to fail because of a failpoint"
));
}
}

let migration = self.migrations[current_migration];

if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", migration_id!(current_migration));

// Even though we are skipping the migration, updating the
// migration ID should help keep logic easy to understand when
// trying to understand the state of a cluster.
self.update_migration_id(migration_id!(current_migration))?;
} else {
info!(
"Running migration id={}:\n{}\n",
Expand All @@ -87,7 +113,6 @@ impl<'m> MigrationRunner<'m> {
)
})?;

// Migration IDs start at 1
self.update_migration_id(migration_id!(current_migration))?;

self.client
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
DO $$
DECLARE
bypassrls boolean;
BEGIN
SELECT rolbypassrls INTO bypassrls FROM pg_roles WHERE rolname = 'neon_superuser';
IF NOT bypassrls THEN
RAISE EXCEPTION 'neon_superuser cannot bypass RLS';
END IF;
END $$;
25 changes: 25 additions & 0 deletions compute_tools/src/migrations/tests/0002-alter_roles.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
DO $$
DECLARE
role record;
BEGIN
FOR role IN
SELECT rolname AS name, rolinherit AS inherit
FROM pg_roles
WHERE pg_has_role(rolname, 'neon_superuser', 'member')
LOOP
IF NOT role.inherit THEN
RAISE EXCEPTION '% cannot inherit', quote_ident(role.name);
END IF;
END LOOP;

FOR role IN
SELECT rolname AS name, rolbypassrls AS bypassrls
FROM pg_roles
WHERE NOT pg_has_role(rolname, 'neon_superuser', 'member')
AND NOT starts_with(rolname, 'pg_')
LOOP
IF role.bypassrls THEN
RAISE EXCEPTION '% can bypass RLS', quote_ident(role.name);
END IF;
END LOOP;
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
DO $$
BEGIN
IF (SELECT current_setting('server_version_num')::numeric < 160000) THEN
RETURN;
END IF;

IF NOT (SELECT pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
RAISE EXCEPTION 'neon_superuser cannot execute pg_create_subscription';
END IF;
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
DO $$
DECLARE
monitor record;
BEGIN
SELECT pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
admin_option AS admin
INTO monitor
FROM pg_auth_members
WHERE roleid = 'pg_monitor'::regrole
AND member = 'pg_monitor'::regrole;

IF NOT monitor.member THEN
RAISE EXCEPTION 'neon_superuser is not a member of pg_monitor';
END IF;

IF NOT monitor.admin THEN
RAISE EXCEPTION 'neon_superuser cannot grant pg_monitor';
END IF;
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
DO $$
DECLARE
can_execute boolean;
BEGIN
SELECT bool_and(has_function_privilege('neon_superuser', oid, 'execute'))
INTO can_execute
FROM pg_proc
WHERE proname IN ('pg_export_snapshot', 'pg_log_standby_snapshot')
AND pronamespace = 'pg_catalog'::regnamespace;
IF NOT can_execute THEN
RAISE EXCEPTION 'neon_superuser cannot execute both pg_export_snapshot and pg_log_standby_snapshot';
END IF;
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
DO $$
DECLARE
can_execute boolean;
BEGIN
SELECT has_function_privilege('neon_superuser', oid, 'execute')
INTO can_execute
FROM pg_proc
WHERE proname = 'pg_show_replication_origin_status'
AND pronamespace = 'pg_catalog'::regnamespace;
IF NOT can_execute THEN
RAISE EXCEPTION 'neon_superuser cannot execute pg_show_replication_origin_status';
END IF;
END $$;
4 changes: 2 additions & 2 deletions libs/utils/src/failpoint_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::*;

/// Declare a failpoint that can use the `pause` failpoint action.
/// Declare a failpoint that can use to `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
#[macro_export]
macro_rules! pausable_failpoint {
Expand Down Expand Up @@ -181,7 +181,7 @@ pub async fn failpoints_handler(
) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Cannot manage failpoints because storage was compiled without failpoints support"
"Cannot manage failpoints because neon was compiled without failpoints support"
)));
}

Expand Down
1 change: 1 addition & 0 deletions test_runner/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"fixtures.compute_reconfigure",
"fixtures.storage_controller_proxy",
"fixtures.paths",
"fixtures.compute_migrations",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",
Expand Down
34 changes: 34 additions & 0 deletions test_runner/fixtures/compute_migrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING

import pytest

from fixtures.paths import BASE_DIR

if TYPE_CHECKING:
from collections.abc import Iterator
from pathlib import Path

COMPUTE_MIGRATIONS_DIR = BASE_DIR / "compute_tools" / "src" / "migrations"
COMPUTE_MIGRATIONS_TEST_DIR = COMPUTE_MIGRATIONS_DIR / "tests"

COMPUTE_MIGRATIONS = sorted(next(os.walk(COMPUTE_MIGRATIONS_DIR))[2])
NUM_COMPUTE_MIGRATIONS = len(COMPUTE_MIGRATIONS)


@pytest.fixture(scope="session")
def compute_migrations_dir() -> Iterator[Path]:
"""
Retrieve the path to the compute migrations directory.
"""
yield COMPUTE_MIGRATIONS_DIR


@pytest.fixture(scope="session")
def compute_migrations_test_dir() -> Iterator[Path]:
"""
Retrieve the path to the compute migrations test directory.
"""
yield COMPUTE_MIGRATIONS_TEST_DIR
14 changes: 14 additions & 0 deletions test_runner/fixtures/endpoint/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,17 @@ def metrics(self) -> str:
res = self.get(f"http://localhost:{self.port}/metrics")
res.raise_for_status()
return res.text

def configure_failpoints(self, *args: tuple[str, str]) -> None:
body: list[dict[str, str]] = []

for fp in args:
body.append(
{
"name": fp[0],
"action": fp[1],
}
)

res = self.post(f"http://localhost:{self.port}/failpoints", json=body)
res.raise_for_status()
5 changes: 3 additions & 2 deletions test_runner/fixtures/neon_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,14 +522,15 @@ def endpoint_start(
safekeepers: list[int] | None = None,
remote_ext_config: str | None = None,
pageserver_id: int | None = None,
allow_multiple=False,
allow_multiple: bool = False,
basebackup_request_tries: int | None = None,
env: dict[str, str] | None = None,
) -> subprocess.CompletedProcess[str]:
args = [
"endpoint",
"start",
]
extra_env_vars = {}
extra_env_vars = env or {}
if basebackup_request_tries is not None:
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries)
if remote_ext_config is not None:
Expand Down
Loading

0 comments on commit 183df72

Please sign in to comment.