Skip to content

Commit

Permalink
fix(pageserver): ensure upload happens after delete (#9844)
Browse files Browse the repository at this point in the history
## Problem

Follow up of #9682, that patch
didn't fully address the problem: what if shutdown fails due to whatever
reason and then we reattach the tenant? Then we will still remove the
future layer. The underlying problem is that the fix for #5878 gets
voided because of the generation optimizations.

Of course, we also need to ensure that delete happens after uploads, but
note that we only schedule deletes when there are no ongoing upload
tasks, so that's fine.

## Summary of changes

* Add a test case to reproduce the behavior (by changing the original
test case to attach the same generation).
* If layer upload happens after the deletion, drain the deletion queue
before uploading.
* If blocked_deletion is enabled, directly remove it from the
blocked_deletion queue.
* Local fs backend fix to avoid race between deletion and preload.
* test_emergency_mode does not need to wait for uploads (and it's
generally not possible to wait for uploads).
* ~~Optimize deletion executor to skip validation if there are no files
to delete.~~ this doesn't work

---------

Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh authored Nov 22, 2024
1 parent 6f8b1eb commit c1937d0
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 42 deletions.
7 changes: 6 additions & 1 deletion libs/remote_storage/src/local_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,12 @@ impl RemoteStorage for LocalFs {
let mut objects = Vec::with_capacity(keys.len());
for key in keys {
let path = key.with_base(&self.storage_root);
let metadata = file_metadata(&path).await?;
let metadata = file_metadata(&path).await;
if let Err(DownloadError::NotFound) = metadata {
// Race: if the file is deleted between listing and metadata check, ignore it.
continue;
}
let metadata = metadata?;
if metadata.is_dir() {
continue;
}
Expand Down
2 changes: 2 additions & 0 deletions pageserver/src/deletion_queue/deleter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use utils::backoff;
use utils::pausable_failpoint;

use crate::metrics;

Expand Down Expand Up @@ -90,6 +91,7 @@ impl Deleter {
/// Block until everything in accumulator has been executed
async fn flush(&mut self) -> Result<(), DeletionQueueError> {
while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
pausable_failpoint!("deletion-queue-before-execute-pause");
match self.remote_delete().await {
Ok(()) => {
// Note: we assume that the remote storage layer returns Ok(()) if some
Expand Down
97 changes: 85 additions & 12 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ use utils::backoff::{
use utils::pausable_failpoint;
use utils::shard::ShardNumber;

use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::Duration;
Expand All @@ -223,7 +223,7 @@ use crate::task_mgr::shutdown_token;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::download::download_retry;
use crate::tenant::storage_layer::AsLayerDesc;
use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable};
use crate::tenant::upload_queue::{Delete, OpType, UploadQueueStoppedDeletable};
use crate::tenant::TIMELINES_SEGMENT_NAME;
use crate::{
config::PageServerConf,
Expand Down Expand Up @@ -1090,7 +1090,7 @@ impl RemoteTimelineClient {
"scheduled layer file upload {layer}",
);

let op = UploadOp::UploadLayer(layer, metadata);
let op = UploadOp::UploadLayer(layer, metadata, None);
self.metric_begin(&op);
upload_queue.queued_operations.push_back(op);
}
Expand Down Expand Up @@ -1805,7 +1805,7 @@ impl RemoteTimelineClient {
// have finished.
upload_queue.inprogress_tasks.is_empty()
}
UploadOp::Delete(_) => {
UploadOp::Delete(..) => {
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
}
Expand Down Expand Up @@ -1833,19 +1833,32 @@ impl RemoteTimelineClient {
}

// We can launch this task. Remove it from the queue first.
let next_op = upload_queue.queued_operations.pop_front().unwrap();
let mut next_op = upload_queue.queued_operations.pop_front().unwrap();

debug!("starting op: {}", next_op);

// Update the counters
match next_op {
UploadOp::UploadLayer(_, _) => {
// Update the counters and prepare
match &mut next_op {
UploadOp::UploadLayer(layer, meta, mode) => {
if upload_queue
.recently_deleted
.remove(&(layer.layer_desc().layer_name().clone(), meta.generation))
{
*mode = Some(OpType::FlushDeletion);
} else {
*mode = Some(OpType::MayReorder)
}
upload_queue.num_inprogress_layer_uploads += 1;
}
UploadOp::UploadMetadata { .. } => {
upload_queue.num_inprogress_metadata_uploads += 1;
}
UploadOp::Delete(_) => {
UploadOp::Delete(Delete { layers }) => {
for (name, meta) in layers {
upload_queue
.recently_deleted
.insert((name.clone(), meta.generation));
}
upload_queue.num_inprogress_deletions += 1;
}
UploadOp::Barrier(sender) => {
Expand Down Expand Up @@ -1921,7 +1934,66 @@ impl RemoteTimelineClient {
}

let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
UploadOp::UploadLayer(ref layer, ref layer_metadata, mode) => {
if let Some(OpType::FlushDeletion) = mode {
if self.config.read().unwrap().block_deletions {
// Of course, this is not efficient... but usually the queue should be empty.
let mut queue_locked = self.upload_queue.lock().unwrap();
let mut detected = false;
if let Ok(queue) = queue_locked.initialized_mut() {
for list in queue.blocked_deletions.iter_mut() {
list.layers.retain(|(name, meta)| {
if name == &layer.layer_desc().layer_name()
&& meta.generation == layer_metadata.generation
{
detected = true;
// remove the layer from deletion queue
false
} else {
// keep the layer
true
}
});
}
}
if detected {
info!(
"cancelled blocked deletion of layer {} at gen {:?}",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
}
} else {
// TODO: we did not guarantee that upload task starts after deletion task, so there could be possibly race conditions
// that we still get the layer deleted. But this only happens if someone creates a layer immediately after it's deleted,
// which is not possible in the current system.
info!(
"waiting for deletion queue flush to complete before uploading layer {} at gen {:?}",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
{
// We are going to flush, we can clean up the recently deleted list.
let mut queue_locked = self.upload_queue.lock().unwrap();
if let Ok(queue) = queue_locked.initialized_mut() {
queue.recently_deleted.clear();
}
}
if let Err(e) = self.deletion_queue_client.flush_execute().await {
warn!(
"failed to flush the deletion queue before uploading layer {} at gen {:?}, still proceeding to upload: {e:#} ",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
} else {
info!(
"done flushing deletion queue before uploading layer {} at gen {:?}",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
}
}
}
let local_path = layer.local_path();

// We should only be uploading layers created by this `Tenant`'s lifetime, so
Expand Down Expand Up @@ -2085,7 +2157,7 @@ impl RemoteTimelineClient {
upload_queue.inprogress_tasks.remove(&task.task_id);

let lsn_update = match task.op {
UploadOp::UploadLayer(_, _) => {
UploadOp::UploadLayer(_, _, _) => {
upload_queue.num_inprogress_layer_uploads -= 1;
None
}
Expand Down Expand Up @@ -2162,7 +2234,7 @@ impl RemoteTimelineClient {
)> {
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
let res = match op {
UploadOp::UploadLayer(_, m) => (
UploadOp::UploadLayer(_, m, _) => (
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
Expand Down Expand Up @@ -2259,6 +2331,7 @@ impl RemoteTimelineClient {
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
recently_deleted: HashSet::new(),
};

let upload_queue = std::mem::replace(
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2652,6 +2652,7 @@ impl Timeline {
//
// NB: generation numbers naturally protect against this because they disambiguate
// (1) and (4)
// TODO: this is basically a no-op now, should we remove it?
self.remote_client.schedule_barrier()?;
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.
Expand Down
23 changes: 17 additions & 6 deletions pageserver/src/tenant/upload_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use super::storage_layer::ResidentLayer;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::remote_timeline_client::index::IndexPart;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use std::collections::HashSet;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;

Expand All @@ -14,7 +15,6 @@ use utils::lsn::AtomicLsn;
use std::sync::atomic::AtomicU32;
use utils::lsn::Lsn;

#[cfg(feature = "testing")]
use utils::generation::Generation;

// clippy warns that Uninitialized is much smaller than Initialized, which wastes
Expand All @@ -38,6 +38,12 @@ impl UploadQueue {
}
}

#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub(crate) enum OpType {
MayReorder,
FlushDeletion,
}

/// This keeps track of queued and in-progress tasks.
pub(crate) struct UploadQueueInitialized {
/// Counter to assign task IDs
Expand Down Expand Up @@ -88,6 +94,9 @@ pub(crate) struct UploadQueueInitialized {
#[cfg(feature = "testing")]
pub(crate) dangling_files: HashMap<LayerName, Generation>,

/// Ensure we order file operations correctly.
pub(crate) recently_deleted: HashSet<(LayerName, Generation)>,

/// Deletions that are blocked by the tenant configuration
pub(crate) blocked_deletions: Vec<Delete>,

Expand Down Expand Up @@ -183,6 +192,7 @@ impl UploadQueue {
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
dangling_files: HashMap::new(),
recently_deleted: HashSet::new(),
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
Expand Down Expand Up @@ -224,6 +234,7 @@ impl UploadQueue {
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
dangling_files: HashMap::new(),
recently_deleted: HashSet::new(),
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
Expand Down Expand Up @@ -282,8 +293,8 @@ pub(crate) struct Delete {

#[derive(Debug)]
pub(crate) enum UploadOp {
/// Upload a layer file
UploadLayer(ResidentLayer, LayerFileMetadata),
/// Upload a layer file. The last field indicates the last operation for thie file.
UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),

/// Upload a index_part.json file
UploadMetadata {
Expand All @@ -305,11 +316,11 @@ pub(crate) enum UploadOp {
impl std::fmt::Display for UploadOp {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
UploadOp::UploadLayer(layer, metadata) => {
UploadOp::UploadLayer(layer, metadata, mode) => {
write!(
f,
"UploadLayer({}, size={:?}, gen={:?})",
layer, metadata.file_size, metadata.generation
"UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
layer, metadata.file_size, metadata.generation, mode
)
}
UploadOp::UploadMetadata { uploaded, .. } => {
Expand Down
22 changes: 18 additions & 4 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -4942,6 +4942,7 @@ def last_flush_lsn_upload(
timeline_id: TimelineId,
pageserver_id: int | None = None,
auth_token: str | None = None,
wait_until_uploaded: bool = True,
) -> Lsn:
"""
Wait for pageserver to catch to the latest flush LSN of given endpoint,
Expand All @@ -4955,7 +4956,9 @@ def last_flush_lsn_upload(
for tenant_shard_id, pageserver in shards:
ps_http = pageserver.http_client(auth_token=auth_token)
wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn)
ps_http.timeline_checkpoint(tenant_shard_id, timeline_id, wait_until_uploaded=True)
ps_http.timeline_checkpoint(
tenant_shard_id, timeline_id, wait_until_uploaded=wait_until_uploaded
)
return last_flush_lsn


Expand All @@ -4980,6 +4983,7 @@ def generate_uploads_and_deletions(
timeline_id: TimelineId | None = None,
data: str | None = None,
pageserver: NeonPageserver,
wait_until_uploaded: bool = True,
):
"""
Using the environment's default tenant + timeline, generate a load pattern
Expand All @@ -5002,7 +5006,12 @@ def generate_uploads_and_deletions(
if init:
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
env,
endpoint,
tenant_id,
timeline_id,
pageserver_id=pageserver.id,
wait_until_uploaded=wait_until_uploaded,
)

def churn(data):
Expand All @@ -5025,7 +5034,12 @@ def churn(data):
# in a state where there are "future layers" in remote storage that will generate deletions
# after a restart.
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
env,
endpoint,
tenant_id,
timeline_id,
pageserver_id=pageserver.id,
wait_until_uploaded=wait_until_uploaded,
)

# Compaction should generate some GC-elegible layers
Expand All @@ -5041,4 +5055,4 @@ def churn(data):
# background ingest, no more uploads pending, and therefore no non-determinism
# in subsequent actions like pageserver restarts.
flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=wait_until_uploaded)
4 changes: 3 additions & 1 deletion test_runner/fixtures/pageserver/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,9 @@ def timeline_checkpoint(
if compact is not None:
query["compact"] = "true" if compact else "false"

log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
log.info(
f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}"
)
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
params=query,
Expand Down
Loading

1 comment on commit c1937d0

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5644 tests run: 5415 passed, 1 failed, 228 skipped (full report)


Failures on Postgres 16

  • test_compaction_l0_memory[github-actions-selfhosted]: release-x86-64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_compaction_l0_memory[release-pg16-github-actions-selfhosted]"
Flaky tests (1)

Postgres 14

Code coverage* (full report)

  • functions: 31.4% (7957 of 25346 functions)
  • lines: 49.3% (63151 of 128158 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
c1937d0 at 2024-11-22T20:27:42.342Z :recycle:

Please sign in to comment.