diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index caacd365b306..b67a9cc47951 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -62,10 +62,8 @@ async fn ingest( let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); let gate = utils::sync::gate::Gate::default(); - let entered = gate.enter().unwrap(); - let layer = - InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?; + let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &gate, &ctx).await?; let data = Value::Image(Bytes::from(vec![0u8; put_size])); let data_ser_size = data.serialized_size().unwrap() as usize; diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 9aecfff384eb..aaec8a4c313a 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -38,9 +38,9 @@ impl EphemeralFile { conf: &PageServerConf, tenant_shard_id: TenantShardId, timeline_id: TimelineId, - gate_guard: utils::sync::gate::GateGuard, + gate: &utils::sync::gate::Gate, ctx: &RequestContext, - ) -> Result { + ) -> anyhow::Result { static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1); let filename_disambiguator = NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -73,9 +73,10 @@ impl EphemeralFile { buffered_writer: owned_buffers_io::write::BufferedWriter::new( file, || IoBufferMut::with_capacity(TAIL_SZ), + gate.enter()?, ctx, ), - _gate_guard: gate_guard, + _gate_guard: gate.enter()?, }) } } @@ -362,7 +363,7 @@ mod tests { let gate = utils::sync::gate::Gate::default(); - let file = EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx) + let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx) .await .unwrap(); @@ -393,10 +394,9 @@ mod tests { let gate = utils::sync::gate::Gate::default(); - let mut file = - EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx) - .await - .unwrap(); + let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx) + .await + .unwrap(); let mutable = file.buffered_writer.inspect_mutable(); let cap = mutable.capacity(); @@ -451,10 +451,9 @@ mod tests { let gate = utils::sync::gate::Gate::default(); - let mut file = - EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx) - .await - .unwrap(); + let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx) + .await + .unwrap(); // mutable buffer and maybe_flushed buffer each has `cap` bytes. let cap = file.buffered_writer.inspect_mutable().capacity(); @@ -499,10 +498,9 @@ mod tests { let gate = utils::sync::gate::Gate::default(); - let mut file = - EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx) - .await - .unwrap(); + let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx) + .await + .unwrap(); let mutable = file.buffered_writer.inspect_mutable(); let cap = mutable.capacity(); diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 007bd3eef083..2982cfb16c3e 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -681,6 +681,7 @@ impl RemoteTimelineClient { layer_file_name: &LayerName, layer_metadata: &LayerFileMetadata, local_path: &Utf8Path, + gate: &utils::sync::gate::Gate, cancel: &CancellationToken, ctx: &RequestContext, ) -> Result { @@ -700,6 +701,7 @@ impl RemoteTimelineClient { layer_file_name, layer_metadata, local_path, + gate, cancel, ctx, ) diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 41e32a1aa3ba..4f7a51ee68f1 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -57,6 +57,7 @@ pub async fn download_layer_file<'a>( layer_file_name: &'a LayerName, layer_metadata: &'a LayerFileMetadata, local_path: &Utf8Path, + gate: &utils::sync::gate::Gate, cancel: &CancellationToken, ctx: &RequestContext, ) -> Result { @@ -85,7 +86,9 @@ pub async fn download_layer_file<'a>( let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION); let bytes_amount = download_retry( - || async { download_object(storage, &remote_path, &temp_file_path, cancel, ctx).await }, + || async { + download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await + }, &format!("download {remote_path:?}"), cancel, ) @@ -145,6 +148,7 @@ async fn download_object<'a>( storage: &'a GenericRemoteStorage, src_path: &RemotePath, dst_path: &Utf8PathBuf, + gate: &utils::sync::gate::Gate, cancel: &CancellationToken, #[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext, ) -> Result { @@ -219,15 +223,16 @@ async fn download_object<'a>( pausable_failpoint!("before-downloading-layer-stream-pausable"); + let mut buffered = owned_buffers_io::write::BufferedWriter::::new( + destination_file, + || IoBufferMut::with_capacity(super::BUFFER_SIZE), + gate.enter().map_err(|_| DownloadError::Cancelled)?, + ctx, + ); + // TODO: use vectored write (writev) once supported by tokio-epoll-uring. // There's chunks_vectored() on the stream. let (bytes_amount, destination_file) = async { - let mut buffered = - owned_buffers_io::write::BufferedWriter::::new( - destination_file, - || IoBufferMut::with_capacity(super::BUFFER_SIZE), - ctx, - ); while let Some(res) = futures::StreamExt::next(&mut download.download_stream).await { diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 7443261a9c00..03d40cffe3c3 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -1181,6 +1181,7 @@ impl<'a> TenantDownloader<'a> { &layer.name, &layer.metadata, &local_path, + &self.secondary_state.gate, &self.secondary_state.cancel, ctx, ) diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index af6112d53550..71e53da20f7f 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -555,13 +555,12 @@ impl InMemoryLayer { timeline_id: TimelineId, tenant_shard_id: TenantShardId, start_lsn: Lsn, - gate_guard: utils::sync::gate::GateGuard, + gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> Result { trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}"); - let file = - EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?; + let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate, ctx).await?; let key = InMemoryLayerFileId(file.page_cache_file_id()); Ok(InMemoryLayer { diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index a9f1189b4112..8933e8ceb13e 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1149,6 +1149,7 @@ impl LayerInner { &self.desc.layer_name(), &self.metadata(), &self.path, + &timeline.gate, &timeline.cancel, ctx, ) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f6a06e73a790..347393ba5654 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3471,7 +3471,6 @@ impl Timeline { ctx: &RequestContext, ) -> anyhow::Result> { let mut guard = self.layers.write().await; - let gate_guard = self.gate.enter().context("enter gate for inmem layer")?; let last_record_lsn = self.get_last_record_lsn(); ensure!( @@ -3488,7 +3487,7 @@ impl Timeline { self.conf, self.timeline_id, self.tenant_shard_id, - gate_guard, + &self.gate, ctx, ) .await?; diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 4293a44dca25..88baa88f24b8 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -182,7 +182,7 @@ impl OpenLayerManager { conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, - gate_guard: utils::sync::gate::GateGuard, + gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> anyhow::Result> { ensure!(lsn.is_aligned()); @@ -212,15 +212,9 @@ impl OpenLayerManager { lsn ); - let new_layer = InMemoryLayer::create( - conf, - timeline_id, - tenant_shard_id, - start_lsn, - gate_guard, - ctx, - ) - .await?; + let new_layer = + InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, &gate, ctx) + .await?; let layer = Arc::new(new_layer); self.layer_map.open_layer = Some(layer.clone()); diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index d2e85bda2bf3..9edb2141e9f4 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -68,11 +68,21 @@ where /// Creates a new buffered writer. /// /// The `buf_new` function provides a way to initialize the owned buffers used by this writer. - pub fn new(writer: Arc, buf_new: impl Fn() -> B, ctx: &RequestContext) -> Self { + pub fn new( + writer: Arc, + buf_new: impl Fn() -> B, + gate_guard: utils::sync::gate::GateGuard, + ctx: &RequestContext, + ) -> Self { Self { writer: writer.clone(), mutable: Some(buf_new()), - flush_handle: FlushHandle::spawn_new(writer, buf_new(), ctx.attached_child()), + flush_handle: FlushHandle::spawn_new( + writer, + buf_new(), + gate_guard, + ctx.attached_child(), + ), bytes_submitted: 0, } } @@ -302,13 +312,15 @@ mod tests { } #[tokio::test] - async fn test_write_all_borrowed_always_goes_through_buffer() -> std::io::Result<()> { + async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> { let ctx = test_ctx(); let ctx = &ctx; let recorder = Arc::new(RecorderWriter::default()); + let gate = utils::sync::gate::Gate::default(); let mut writer = BufferedWriter::<_, RecorderWriter>::new( recorder, || IoBufferMut::with_capacity(2), + gate.enter()?, ctx, ); diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index 9ec49c33a890..5b3997963ead 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -115,14 +115,19 @@ where /// Spawns a new background flush task and obtains a handle. /// /// Note: The background task so we do not need to explicitly maintain a queue of buffers. - pub fn spawn_new(file: Arc, buf: B, ctx: RequestContext) -> Self + pub fn spawn_new( + file: Arc, + buf: B, + gate_guard: utils::sync::gate::GateGuard, + ctx: RequestContext, + ) -> Self where B: Buffer + Send + 'static, { let (front, back) = duplex::mpsc::channel(2); let join_handle = tokio::spawn(async move { - FlushBackgroundTask::new(back, file, ctx) + FlushBackgroundTask::new(back, file, gate_guard, ctx) .run(buf.flush()) .await }); @@ -200,6 +205,8 @@ pub struct FlushBackgroundTask { /// A writter for persisting data to disk. writer: Arc, ctx: RequestContext, + /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk. + _gate_guard: utils::sync::gate::GateGuard, } impl FlushBackgroundTask @@ -211,11 +218,13 @@ where fn new( channel: duplex::mpsc::Duplex, FlushRequest>, file: Arc, + gate_guard: utils::sync::gate::GateGuard, ctx: RequestContext, ) -> Self { FlushBackgroundTask { channel, writer: file, + _gate_guard: gate_guard, ctx, } }