Skip to content

Commit

Permalink
Adopt list_streaming in tenant deletion (#8504)
Browse files Browse the repository at this point in the history
Uses the Stream based `list_streaming` function added by #8457 in tenant
deletion, as suggested in #7932 (comment) .

We don't have to worry about retries, as the function is wrapped inside
an outer retry block. If there is a retryable error either during the
listing or during deletion, we just do a fresh start.

Also adds `+ Send` bounds as they are required by the
`delete_tenant_remote` function.
  • Loading branch information
arpad-m authored Jul 29, 2024
1 parent da6bdff commit 859f019
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 31 deletions.
6 changes: 3 additions & 3 deletions libs/remote_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>>;
) -> impl Stream<Item = Result<Listing, DownloadError>> + Send;

async fn list(
&self,
Expand Down Expand Up @@ -351,10 +351,10 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &'a CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a {
) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a + Send {
match self {
Self::LocalFs(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel))
as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>>>>,
as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>> + Send>>,
Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Expand Down
2 changes: 1 addition & 1 deletion libs/remote_storage/src/simulate_failures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl RemoteStorage for UnreliableWrapper {
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> {
) -> impl Stream<Item = Result<Listing, DownloadError>> + Send {
async_stream::stream! {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
.map_err(DownloadError::Other)?;
Expand Down
52 changes: 25 additions & 27 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1384,34 +1384,32 @@ impl TenantManager {
tenant_shard_id: TenantShardId,
) -> Result<(), DeleteTenantError> {
let remote_path = remote_tenant_path(&tenant_shard_id);
let keys = match self
.resources
.remote_storage
.list(
Some(&remote_path),
remote_storage::ListingMode::NoDelimiter,
None,
&self.cancel,
)
.await
{
Ok(listing) => listing.keys,
Err(remote_storage::DownloadError::Cancelled) => {
return Err(DeleteTenantError::Cancelled)
}
Err(remote_storage::DownloadError::NotFound) => return Ok(()),
Err(other) => return Err(DeleteTenantError::Other(anyhow::anyhow!(other))),
};
let mut keys_stream = self.resources.remote_storage.list_streaming(
Some(&remote_path),
remote_storage::ListingMode::NoDelimiter,
None,
&self.cancel,
);
while let Some(chunk) = keys_stream.next().await {
let keys = match chunk {
Ok(listing) => listing.keys,
Err(remote_storage::DownloadError::Cancelled) => {
return Err(DeleteTenantError::Cancelled)
}
Err(remote_storage::DownloadError::NotFound) => return Ok(()),
Err(other) => return Err(DeleteTenantError::Other(anyhow::anyhow!(other))),
};

if keys.is_empty() {
tracing::info!("Remote storage already deleted");
} else {
tracing::info!("Deleting {} keys from remote storage", keys.len());
let keys = keys.into_iter().map(|o| o.key).collect::<Vec<_>>();
self.resources
.remote_storage
.delete_objects(&keys, &self.cancel)
.await?;
if keys.is_empty() {
tracing::info!("Remote storage already deleted");
} else {
tracing::info!("Deleting {} keys from remote storage", keys.len());
let keys = keys.into_iter().map(|o| o.key).collect::<Vec<_>>();
self.resources
.remote_storage
.delete_objects(&keys, &self.cancel)
.await?;
}
}

Ok(())
Expand Down

1 comment on commit 859f019

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3138 tests run: 3017 passed, 0 failed, 121 skipped (full report)


Code coverage* (full report)

  • functions: 32.8% (7010 of 21360 functions)
  • lines: 50.0% (55813 of 111571 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
859f019 at 2024-07-29T11:10:04.490Z :recycle:

Please sign in to comment.