Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: return proper status code for heatmap_upload errors #9991

Merged
merged 3 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2462,8 +2462,7 @@ async fn secondary_upload_handler(
state
.secondary_controller
.upload_tenant(tenant_shard_id)
.await
.map_err(ApiError::InternalServerError)?;
.await?;

json_response(StatusCode::OK, ())
}
Expand Down Expand Up @@ -2578,7 +2577,7 @@ async fn secondary_download_handler(
// Edge case: downloads aren't usually fallible: things like a missing heatmap are considered
// okay. We could get an error here in the unlikely edge case that the tenant
// was detached between our check above and executing the download job.
Ok(Err(e)) => return Err(ApiError::InternalServerError(e)),
Ok(Err(e)) => return Err(e),
// A timeout is not an error: we have started the download, we're just not done
// yet. The caller will get a response body indicating status.
Err(_) => StatusCode::ACCEPTED,
Expand Down
16 changes: 7 additions & 9 deletions pageserver/src/tenant/secondary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use remote_storage::GenericRemoteStorage;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use utils::{completion::Barrier, id::TimelineId, sync::gate::Gate};
use utils::{completion::Barrier, http::error::ApiError, id::TimelineId, sync::gate::Gate};

enum DownloadCommand {
Download(TenantShardId),
Expand Down Expand Up @@ -66,7 +66,7 @@ struct CommandRequest<T> {
}

struct CommandResponse {
result: anyhow::Result<()>,
result: Result<(), ApiError>,
erikgrinaker marked this conversation as resolved.
Show resolved Hide resolved
}

// Whereas [`Tenant`] represents an attached tenant, this type represents the work
Expand Down Expand Up @@ -285,7 +285,7 @@ impl SecondaryController {
&self,
queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
payload: T,
) -> anyhow::Result<()> {
) -> Result<(), ApiError> {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();

queue
Expand All @@ -294,20 +294,18 @@ impl SecondaryController {
response_tx,
})
.await
.map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
.map_err(|_| ApiError::ShuttingDown)?;

let response = response_rx
.await
.map_err(|_| anyhow::anyhow!("Request dropped"))?;
let response = response_rx.await.map_err(|_| ApiError::ShuttingDown)?;
erikgrinaker marked this conversation as resolved.
Show resolved Hide resolved

response.result
}

pub async fn upload_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
pub async fn upload_tenant(&self, tenant_shard_id: TenantShardId) -> Result<(), ApiError> {
self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id))
.await
}
pub async fn download_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
pub async fn download_tenant(&self, tenant_shard_id: TenantShardId) -> Result<(), ApiError> {
self.dispatch(
&self.download_req_tx,
DownloadCommand::Download(tenant_shard_id),
Expand Down
14 changes: 8 additions & 6 deletions pageserver/src/tenant/secondary/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{info_span, instrument, warn, Instrument};
use utils::{
backoff, completion::Barrier, crashsafe::path_with_suffix_extension, failpoint_support, fs_ext,
id::TimelineId, pausable_failpoint, serde_system_time,
http::error::ApiError, id::TimelineId, pausable_failpoint, serde_system_time,
};

use super::{
Expand Down Expand Up @@ -470,15 +470,17 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
result
}

fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
fn on_command(&mut self, command: DownloadCommand) -> Result<PendingDownload, ApiError> {
let tenant_shard_id = command.get_tenant_shard_id();

let tenant = self
.tenant_manager
.get_secondary_tenant_shard(*tenant_shard_id);
let Some(tenant) = tenant else {
return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
};
.get_secondary_tenant_shard(*tenant_shard_id)
.ok_or_else(|| {
ApiError::NotFound(
anyhow::anyhow!("secondary shard {tenant_shard_id} not found").into(),
)
})?;

Ok(PendingDownload {
target_time: None,
Expand Down
7 changes: 3 additions & 4 deletions pageserver/src/tenant/secondary/heatmap_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use super::{
};
use tokio_util::sync::CancellationToken;
use tracing::{info_span, instrument, Instrument};
use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop};
use utils::{backoff, completion::Barrier, http::error::ApiError, yielding_loop::yielding_loop};

pub(super) async fn heatmap_uploader_task(
tenant_manager: Arc<TenantManager>,
Expand Down Expand Up @@ -279,16 +279,15 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
}.instrument(info_span!(parent: None, "heatmap_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
}

fn on_command(&mut self, command: UploadCommand) -> anyhow::Result<UploadPending> {
fn on_command(&mut self, command: UploadCommand) -> Result<UploadPending, ApiError> {
let tenant_shard_id = command.get_tenant_shard_id();

tracing::info!(
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
"Starting heatmap write on command");
let tenant = self
.tenant_manager
.get_attached_tenant_shard(*tenant_shard_id)
.map_err(|e| anyhow::anyhow!(e))?;
.get_attached_tenant_shard(*tenant_shard_id)?;
if !tenant.is_active() {
return Err(GetTenantError::NotActive(*tenant_shard_id).into());
}
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/secondary/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
use pageserver_api::shard::TenantShardId;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use utils::{completion::Barrier, yielding_loop::yielding_loop};
use utils::{completion::Barrier, http::error::ApiError, yielding_loop::yielding_loop};

use super::{CommandRequest, CommandResponse};

Expand Down Expand Up @@ -112,7 +112,7 @@ where

/// Called when a command is received. A job will be spawned immediately if the return
/// value is Some, ignoring concurrency limits and the pending queue.
fn on_command(&mut self, cmd: CMD) -> anyhow::Result<PJ>;
fn on_command(&mut self, cmd: CMD) -> Result<PJ, ApiError>;
}

/// [`JobGenerator`] returns this to provide pending jobs, and hints about scheduling
Expand Down
Loading