Skip to content

Commit

Permalink
storcon: add safekeeper list API (#10089)
Browse files Browse the repository at this point in the history
This adds an API to the storage controller to list safekeepers
registered to it.

This PR does a `diesel print-schema > storage_controller/src/schema.rs`
because of an inconsistency between up.sql and schema.rs, introduced by
[this](2c142f1)
commit, so there is some updates of `schema.rs` due to that. As a
followup to this, we should maybe think about running `diesel
print-schema` in CI.

Part of #9981
  • Loading branch information
arpad-m authored Dec 12, 2024
1 parent b391b29 commit 342cbea
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 24 deletions.
41 changes: 32 additions & 9 deletions storage_controller/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,21 @@ async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, A
json_response(StatusCode::ACCEPTED, ())
}

async fn handle_safekeeper_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Infra)?;

let req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};

let state = get_state(&req);
let safekeepers = state.service.safekeepers_list().await?;
json_response(StatusCode::OK, safekeepers)
}

async fn handle_metadata_health_update(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Scrubber)?;

Expand Down Expand Up @@ -1203,7 +1218,7 @@ impl From<ReconcileError> for ApiError {
///
/// Not used by anything except manual testing.
async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
check_permissions(&req, Scope::Infra)?;

let id = parse_request_param::<i64>(&req, "id")?;

Expand All @@ -1221,7 +1236,7 @@ async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, Api
match res {
Ok(b) => json_response(StatusCode::OK, b),
Err(crate::persistence::DatabaseError::Query(diesel::result::Error::NotFound)) => {
Err(ApiError::NotFound("unknown instance_id".into()))
Err(ApiError::NotFound("unknown instance id".into()))
}
Err(other) => Err(other.into()),
}
Expand Down Expand Up @@ -1817,6 +1832,21 @@ pub fn make_router(
RequestName("control_v1_metadata_health_list_outdated"),
)
})
// Safekeepers
.get("/control/v1/safekeeper", |r| {
named_request_span(
r,
handle_safekeeper_list,
RequestName("control_v1_safekeeper_list"),
)
})
.get("/control/v1/safekeeper/:id", |r| {
named_request_span(r, handle_get_safekeeper, RequestName("v1_safekeeper"))
})
.post("/control/v1/safekeeper/:id", |r| {
// id is in the body
named_request_span(r, handle_upsert_safekeeper, RequestName("v1_safekeeper"))
})
// Tenant Shard operations
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
tenant_service_handler(
Expand Down Expand Up @@ -1869,13 +1899,6 @@ pub fn make_router(
.put("/control/v1/step_down", |r| {
named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
})
.get("/control/v1/safekeeper/:id", |r| {
named_request_span(r, handle_get_safekeeper, RequestName("v1_safekeeper"))
})
.post("/control/v1/safekeeper/:id", |r| {
// id is in the body
named_request_span(r, handle_upsert_safekeeper, RequestName("v1_safekeeper"))
})
// Tenant operations
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
Expand Down
17 changes: 17 additions & 0 deletions storage_controller/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub(crate) enum DatabaseOperation {
ListMetadataHealth,
ListMetadataHealthUnhealthy,
ListMetadataHealthOutdated,
ListSafekeepers,
GetLeader,
UpdateLeader,
SetPreferredAzs,
Expand Down Expand Up @@ -1011,6 +1012,22 @@ impl Persistence {
Ok(())
}

/// At startup, populate the list of nodes which our shards may be placed on
pub(crate) async fn list_safekeepers(&self) -> DatabaseResult<Vec<SafekeeperPersistence>> {
let safekeepers: Vec<SafekeeperPersistence> = self
.with_measured_conn(
DatabaseOperation::ListNodes,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::safekeepers::table.load::<SafekeeperPersistence>(conn)?)
},
)
.await?;

tracing::info!("list_safekeepers: loaded {} nodes", safekeepers.len());

Ok(safekeepers)
}

pub(crate) async fn safekeeper_get(
&self,
id: i64,
Expand Down
35 changes: 20 additions & 15 deletions storage_controller/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ diesel::table! {
}
}

diesel::table! {
safekeepers (id) {
id -> Int8,
region_id -> Text,
version -> Int8,
host -> Text,
port -> Int4,
active -> Bool,
http_port -> Int4,
availability_zone_id -> Text,
}
}

diesel::table! {
tenant_shards (tenant_id, shard_number, shard_count) {
tenant_id -> Varchar,
Expand All @@ -45,18 +58,10 @@ diesel::table! {
}
}

diesel::allow_tables_to_appear_in_same_query!(controllers, metadata_health, nodes, tenant_shards,);

diesel::table! {
safekeepers {
id -> Int8,
region_id -> Text,
version -> Int8,
instance_id -> Text,
host -> Text,
port -> Int4,
active -> Bool,
http_port -> Int4,
availability_zone_id -> Text,
}
}
diesel::allow_tables_to_appear_in_same_query!(
controllers,
metadata_health,
nodes,
safekeepers,
tenant_shards,
);
6 changes: 6 additions & 0 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7185,6 +7185,12 @@ impl Service {
global_observed
}

pub(crate) async fn safekeepers_list(
&self,
) -> Result<Vec<crate::persistence::SafekeeperPersistence>, DatabaseError> {
self.persistence.list_safekeepers().await
}

pub(crate) async fn get_safekeeper(
&self,
id: i64,
Expand Down
10 changes: 10 additions & 0 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -2329,6 +2329,16 @@ def get_safekeeper(self, id: int) -> dict[str, Any] | None:
return None
raise e

def get_safekeepers(self) -> list[dict[str, Any]]:
response = self.request(
"GET",
f"{self.api}/control/v1/safekeeper",
headers=self.headers(TokenScope.ADMIN),
)
json = response.json()
assert isinstance(json, list)
return json

def set_preferred_azs(self, preferred_azs: dict[TenantShardId, str]) -> list[TenantShardId]:
response = self.request(
"PUT",
Expand Down
5 changes: 5 additions & 0 deletions test_runner/regress/test_storage_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2955,6 +2955,8 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):

assert target.get_safekeeper(fake_id) is None

assert len(target.get_safekeepers()) == 0

body = {
"active": True,
"id": fake_id,
Expand All @@ -2972,6 +2974,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):

inserted = target.get_safekeeper(fake_id)
assert inserted is not None
assert target.get_safekeepers() == [inserted]
assert eq_safekeeper_records(body, inserted)

# error out if pk is changed (unexpected)
Expand All @@ -2983,6 +2986,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
assert exc.value.status_code == 400

inserted_again = target.get_safekeeper(fake_id)
assert target.get_safekeepers() == [inserted_again]
assert inserted_again is not None
assert eq_safekeeper_records(inserted, inserted_again)

Expand All @@ -2991,6 +2995,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
body["version"] += 1
target.on_safekeeper_deploy(fake_id, body)
inserted_now = target.get_safekeeper(fake_id)
assert target.get_safekeepers() == [inserted_now]
assert inserted_now is not None

assert eq_safekeeper_records(body, inserted_now)
Expand Down

1 comment on commit 342cbea

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7740 tests run: 7407 passed, 2 failed, 331 skipped (full report)


Failures on Postgres 17

Failures on Postgres 15

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_parallel_copy[release-pg15] or test_idle_checkpoints[debug-pg17]"
Flaky tests (2)

Postgres 17

Test coverage report is not available

The comment gets automatically updated with the latest test results
342cbea at 2024-12-12T02:09:13.404Z :recycle:

Please sign in to comment.