diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 553153826e94..ee2fc9d6e2dd 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -360,7 +360,12 @@ impl RemoteStorage for LocalFs { let mut objects = Vec::with_capacity(keys.len()); for key in keys { let path = key.with_base(&self.storage_root); - let metadata = file_metadata(&path).await?; + let metadata = file_metadata(&path).await; + if let Err(DownloadError::NotFound) = metadata { + // Race: if the file is deleted between listing and metadata check, ignore it. + continue; + } + let metadata = metadata?; if metadata.is_dir() { continue; } diff --git a/pageserver/src/deletion_queue/deleter.rs b/pageserver/src/deletion_queue/deleter.rs index 1f04bc0410f5..3d02387c98d8 100644 --- a/pageserver/src/deletion_queue/deleter.rs +++ b/pageserver/src/deletion_queue/deleter.rs @@ -15,6 +15,7 @@ use tokio_util::sync::CancellationToken; use tracing::info; use tracing::warn; use utils::backoff; +use utils::pausable_failpoint; use crate::metrics; @@ -90,6 +91,7 @@ impl Deleter { /// Block until everything in accumulator has been executed async fn flush(&mut self) -> Result<(), DeletionQueueError> { while !self.accumulator.is_empty() && !self.cancel.is_cancelled() { + pausable_failpoint!("deletion-queue-before-execute-pause"); match self.remote_delete().await { Ok(()) => { // Note: we assume that the remote storage layer returns Ok(()) if some diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 377bc23542b1..4c8828221416 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -199,7 +199,7 @@ use utils::backoff::{ use utils::pausable_failpoint; use utils::shard::ShardNumber; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex, OnceLock}; use std::time::Duration; @@ -223,7 +223,7 @@ use crate::task_mgr::shutdown_token; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::download::download_retry; use crate::tenant::storage_layer::AsLayerDesc; -use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable}; +use crate::tenant::upload_queue::{Delete, OpType, UploadQueueStoppedDeletable}; use crate::tenant::TIMELINES_SEGMENT_NAME; use crate::{ config::PageServerConf, @@ -1090,7 +1090,7 @@ impl RemoteTimelineClient { "scheduled layer file upload {layer}", ); - let op = UploadOp::UploadLayer(layer, metadata); + let op = UploadOp::UploadLayer(layer, metadata, None); self.metric_begin(&op); upload_queue.queued_operations.push_back(op); } @@ -1805,7 +1805,7 @@ impl RemoteTimelineClient { // have finished. upload_queue.inprogress_tasks.is_empty() } - UploadOp::Delete(_) => { + UploadOp::Delete(..) => { // Wait for preceding uploads to finish. Concurrent deletions are OK, though. upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len() } @@ -1833,19 +1833,32 @@ impl RemoteTimelineClient { } // We can launch this task. Remove it from the queue first. - let next_op = upload_queue.queued_operations.pop_front().unwrap(); + let mut next_op = upload_queue.queued_operations.pop_front().unwrap(); debug!("starting op: {}", next_op); - // Update the counters - match next_op { - UploadOp::UploadLayer(_, _) => { + // Update the counters and prepare + match &mut next_op { + UploadOp::UploadLayer(layer, meta, mode) => { + if upload_queue + .recently_deleted + .remove(&(layer.layer_desc().layer_name().clone(), meta.generation)) + { + *mode = Some(OpType::FlushDeletion); + } else { + *mode = Some(OpType::MayReorder) + } upload_queue.num_inprogress_layer_uploads += 1; } UploadOp::UploadMetadata { .. } => { upload_queue.num_inprogress_metadata_uploads += 1; } - UploadOp::Delete(_) => { + UploadOp::Delete(Delete { layers }) => { + for (name, meta) in layers { + upload_queue + .recently_deleted + .insert((name.clone(), meta.generation)); + } upload_queue.num_inprogress_deletions += 1; } UploadOp::Barrier(sender) => { @@ -1921,7 +1934,66 @@ impl RemoteTimelineClient { } let upload_result: anyhow::Result<()> = match &task.op { - UploadOp::UploadLayer(ref layer, ref layer_metadata) => { + UploadOp::UploadLayer(ref layer, ref layer_metadata, mode) => { + if let Some(OpType::FlushDeletion) = mode { + if self.config.read().unwrap().block_deletions { + // Of course, this is not efficient... but usually the queue should be empty. + let mut queue_locked = self.upload_queue.lock().unwrap(); + let mut detected = false; + if let Ok(queue) = queue_locked.initialized_mut() { + for list in queue.blocked_deletions.iter_mut() { + list.layers.retain(|(name, meta)| { + if name == &layer.layer_desc().layer_name() + && meta.generation == layer_metadata.generation + { + detected = true; + // remove the layer from deletion queue + false + } else { + // keep the layer + true + } + }); + } + } + if detected { + info!( + "cancelled blocked deletion of layer {} at gen {:?}", + layer.layer_desc().layer_name(), + layer_metadata.generation + ); + } + } else { + // TODO: we did not guarantee that upload task starts after deletion task, so there could be possibly race conditions + // that we still get the layer deleted. But this only happens if someone creates a layer immediately after it's deleted, + // which is not possible in the current system. + info!( + "waiting for deletion queue flush to complete before uploading layer {} at gen {:?}", + layer.layer_desc().layer_name(), + layer_metadata.generation + ); + { + // We are going to flush, we can clean up the recently deleted list. + let mut queue_locked = self.upload_queue.lock().unwrap(); + if let Ok(queue) = queue_locked.initialized_mut() { + queue.recently_deleted.clear(); + } + } + if let Err(e) = self.deletion_queue_client.flush_execute().await { + warn!( + "failed to flush the deletion queue before uploading layer {} at gen {:?}, still proceeding to upload: {e:#} ", + layer.layer_desc().layer_name(), + layer_metadata.generation + ); + } else { + info!( + "done flushing deletion queue before uploading layer {} at gen {:?}", + layer.layer_desc().layer_name(), + layer_metadata.generation + ); + } + } + } let local_path = layer.local_path(); // We should only be uploading layers created by this `Tenant`'s lifetime, so @@ -2085,7 +2157,7 @@ impl RemoteTimelineClient { upload_queue.inprogress_tasks.remove(&task.task_id); let lsn_update = match task.op { - UploadOp::UploadLayer(_, _) => { + UploadOp::UploadLayer(_, _, _) => { upload_queue.num_inprogress_layer_uploads -= 1; None } @@ -2162,7 +2234,7 @@ impl RemoteTimelineClient { )> { use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize; let res = match op { - UploadOp::UploadLayer(_, m) => ( + UploadOp::UploadLayer(_, m, _) => ( RemoteOpFileKind::Layer, RemoteOpKind::Upload, RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size), @@ -2259,6 +2331,7 @@ impl RemoteTimelineClient { blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), + recently_deleted: HashSet::new(), }; let upload_queue = std::mem::replace( diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0c7f3204f6e0..d1285e7c8ae8 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2652,6 +2652,7 @@ impl Timeline { // // NB: generation numbers naturally protect against this because they disambiguate // (1) and (4) + // TODO: this is basically a no-op now, should we remove it? self.remote_client.schedule_barrier()?; // Tenant::create_timeline will wait for these uploads to happen before returning, or // on retry. diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index f14bf2f8c381..ef3aa759f303 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -3,6 +3,7 @@ use super::storage_layer::ResidentLayer; use crate::tenant::metadata::TimelineMetadata; use crate::tenant::remote_timeline_client::index::IndexPart; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; +use std::collections::HashSet; use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; @@ -14,7 +15,6 @@ use utils::lsn::AtomicLsn; use std::sync::atomic::AtomicU32; use utils::lsn::Lsn; -#[cfg(feature = "testing")] use utils::generation::Generation; // clippy warns that Uninitialized is much smaller than Initialized, which wastes @@ -38,6 +38,12 @@ impl UploadQueue { } } +#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)] +pub(crate) enum OpType { + MayReorder, + FlushDeletion, +} + /// This keeps track of queued and in-progress tasks. pub(crate) struct UploadQueueInitialized { /// Counter to assign task IDs @@ -88,6 +94,9 @@ pub(crate) struct UploadQueueInitialized { #[cfg(feature = "testing")] pub(crate) dangling_files: HashMap, + /// Ensure we order file operations correctly. + pub(crate) recently_deleted: HashSet<(LayerName, Generation)>, + /// Deletions that are blocked by the tenant configuration pub(crate) blocked_deletions: Vec, @@ -183,6 +192,7 @@ impl UploadQueue { queued_operations: VecDeque::new(), #[cfg(feature = "testing")] dangling_files: HashMap::new(), + recently_deleted: HashSet::new(), blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), @@ -224,6 +234,7 @@ impl UploadQueue { queued_operations: VecDeque::new(), #[cfg(feature = "testing")] dangling_files: HashMap::new(), + recently_deleted: HashSet::new(), blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), @@ -282,8 +293,8 @@ pub(crate) struct Delete { #[derive(Debug)] pub(crate) enum UploadOp { - /// Upload a layer file - UploadLayer(ResidentLayer, LayerFileMetadata), + /// Upload a layer file. The last field indicates the last operation for thie file. + UploadLayer(ResidentLayer, LayerFileMetadata, Option), /// Upload a index_part.json file UploadMetadata { @@ -305,11 +316,11 @@ pub(crate) enum UploadOp { impl std::fmt::Display for UploadOp { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { - UploadOp::UploadLayer(layer, metadata) => { + UploadOp::UploadLayer(layer, metadata, mode) => { write!( f, - "UploadLayer({}, size={:?}, gen={:?})", - layer, metadata.file_size, metadata.generation + "UploadLayer({}, size={:?}, gen={:?}, mode={:?})", + layer, metadata.file_size, metadata.generation, mode ) } UploadOp::UploadMetadata { uploaded, .. } => { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e04cadf46f4b..d8d2b87b4e3d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4942,6 +4942,7 @@ def last_flush_lsn_upload( timeline_id: TimelineId, pageserver_id: int | None = None, auth_token: str | None = None, + wait_until_uploaded: bool = True, ) -> Lsn: """ Wait for pageserver to catch to the latest flush LSN of given endpoint, @@ -4955,7 +4956,9 @@ def last_flush_lsn_upload( for tenant_shard_id, pageserver in shards: ps_http = pageserver.http_client(auth_token=auth_token) wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn) - ps_http.timeline_checkpoint(tenant_shard_id, timeline_id, wait_until_uploaded=True) + ps_http.timeline_checkpoint( + tenant_shard_id, timeline_id, wait_until_uploaded=wait_until_uploaded + ) return last_flush_lsn @@ -4980,6 +4983,7 @@ def generate_uploads_and_deletions( timeline_id: TimelineId | None = None, data: str | None = None, pageserver: NeonPageserver, + wait_until_uploaded: bool = True, ): """ Using the environment's default tenant + timeline, generate a load pattern @@ -5002,7 +5006,12 @@ def generate_uploads_and_deletions( if init: endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)") last_flush_lsn_upload( - env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id + env, + endpoint, + tenant_id, + timeline_id, + pageserver_id=pageserver.id, + wait_until_uploaded=wait_until_uploaded, ) def churn(data): @@ -5025,7 +5034,12 @@ def churn(data): # in a state where there are "future layers" in remote storage that will generate deletions # after a restart. last_flush_lsn_upload( - env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id + env, + endpoint, + tenant_id, + timeline_id, + pageserver_id=pageserver.id, + wait_until_uploaded=wait_until_uploaded, ) # Compaction should generate some GC-elegible layers @@ -5041,4 +5055,4 @@ def churn(data): # background ingest, no more uploads pending, and therefore no non-determinism # in subsequent actions like pageserver restarts. flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id) - ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True) + ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=wait_until_uploaded) diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 4df624def32b..56386fdd373f 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -794,7 +794,9 @@ def timeline_checkpoint( if compact is not None: query["compact"] = "true" if compact else "false" - log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}") + log.info( + f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}" + ) res = self.put( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint", params=query, diff --git a/test_runner/regress/test_layers_from_future.py b/test_runner/regress/test_layers_from_future.py index 309e0f301525..761ec7568f4b 100644 --- a/test_runner/regress/test_layers_from_future.py +++ b/test_runner/regress/test_layers_from_future.py @@ -2,6 +2,7 @@ import time +import pytest from fixtures.common_types import Lsn from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver @@ -19,7 +20,11 @@ from fixtures.utils import query_scalar, wait_until -def test_issue_5878(neon_env_builder: NeonEnvBuilder): +@pytest.mark.parametrize( + "attach_mode", + ["default_generation", "same_generation"], +) +def test_issue_5878(neon_env_builder: NeonEnvBuilder, attach_mode: str): """ Regression test for issue https://github.com/neondatabase/neon/issues/5878 . @@ -168,11 +173,32 @@ def get_generation_number(): tenant_conf = ps_http.tenant_config(tenant_id) generation_before_detach = get_generation_number() env.pageserver.tenant_detach(tenant_id) - failpoint_name = "before-delete-layer-pausable" + failpoint_deletion_queue = "deletion-queue-before-execute-pause" + + ps_http.configure_failpoints((failpoint_deletion_queue, "pause")) + + if attach_mode == "default_generation": + env.pageserver.tenant_attach(tenant_id, tenant_conf.tenant_specific_overrides) + elif attach_mode == "same_generation": + # Attach with the same generation number -- this is possible with timeline offload and detach ancestor + env.pageserver.tenant_attach( + tenant_id, + tenant_conf.tenant_specific_overrides, + generation=generation_before_detach, + # We want to avoid the generation bump and don't want to talk with the storcon + override_storage_controller_generation=False, + ) + else: + raise AssertionError(f"Unknown attach_mode: {attach_mode}") + + # Get it from pageserver API instead of storcon API b/c we might not have attached using the storcon + # API if attach_mode == "same_generation" + tenant_location = env.pageserver.http_client().tenant_get_location(tenant_id) + generation_after_reattach = tenant_location["generation"] - ps_http.configure_failpoints((failpoint_name, "pause")) - env.pageserver.tenant_attach(tenant_id, tenant_conf.tenant_specific_overrides) - generation_after_reattach = get_generation_number() + if attach_mode == "same_generation": + # The generation number should be the same as before the detach + assert generation_before_detach == generation_after_reattach wait_until_tenant_active(ps_http, tenant_id) # Ensure the IndexPart upload that unlinks the layer file finishes, i.e., doesn't clog the queue. @@ -182,15 +208,8 @@ def future_layer_is_gone_from_index_part(): wait_until(10, 0.5, future_layer_is_gone_from_index_part) - # NB: the layer file is unlinked index part now, but, because we made the delete - # operation stuck, the layer file itself is still in the remote_storage - wait_until( - 10, - 0.5, - lambda: env.pageserver.assert_log_contains( - f".*{tenant_id}.*at failpoint.*{failpoint_name}" - ), - ) + # We already make deletion stuck here, but we don't necessarily hit the failpoint + # because deletions are batched. future_layer_path = env.pageserver_remote_storage.remote_layer_path( tenant_id, timeline_id, future_layer.to_str(), generation=generation_before_detach ) @@ -224,11 +243,13 @@ def future_layer_is_gone_from_index_part(): break time.sleep(1) - # Window has passed, unstuck the delete, let upload queue drain. + # Window has passed, unstuck the delete, let deletion queue drain; the upload queue should + # have drained because we put these layer deletion operations into the deletion queue and + # have consumed the operation from the upload queue. log.info("unstuck the DELETE") - ps_http.configure_failpoints(("before-delete-layer-pausable", "off")) - + ps_http.configure_failpoints((failpoint_deletion_queue, "off")) wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id) + env.pageserver.http_client().deletion_queue_flush(True) # Examine the resulting S3 state. log.info("integrity-check the remote storage") @@ -247,3 +268,12 @@ def future_layer_is_gone_from_index_part(): final_stat = future_layer_path.stat() log.info(f"future layer path: {future_layer_path}") assert final_stat.st_mtime != pre_stat.st_mtime + + # Ensure no weird errors in the end... + wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id) + + if attach_mode == "same_generation": + # we should have detected a race upload and deferred it + env.pageserver.assert_log_contains( + "waiting for deletion queue flush to complete before uploading layer" + ) diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index d5bbfbc7fc84..6ba5753420c7 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -459,7 +459,11 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): env.pageserver.start() # The pageserver should provide service to clients - generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver) + # Because it is in emergency mode, it will not attempt to validate deletions required by the initial barrier, and therefore + # other files cannot be uploaded b/c it's waiting for the initial barrier to be validated. + generate_uploads_and_deletions( + env, init=False, pageserver=env.pageserver, wait_until_uploaded=False + ) # The pageserver should neither validate nor execute any deletions, it should have # loaded the DeletionLists from before though