From 518435fb3eaf78b6e8c8d18e409076d2de3d6546 Mon Sep 17 00:00:00 2001 From: Antheas Kapenekakis Date: Thu, 28 Nov 2024 00:12:26 +0100 Subject: [PATCH] pull: Add --json-fd This adds a generic "progress" infrastructure for granular incremental notifications of downloading in particular, but we may extend this to other generic tasks in the future too. Signed-off-by: Antheas Kapenekakis Signed-off-by: Colin Walters --- lib/src/cli.rs | 36 +++- lib/src/deploy.rs | 246 +++++++++++++++++++++++-- lib/src/install.rs | 10 +- lib/src/lib.rs | 1 + lib/src/progress_jsonl.rs | 290 ++++++++++++++++++++++++++++++ ostree-ext/src/cli.rs | 1 + ostree-ext/src/container/store.rs | 8 +- 7 files changed, 568 insertions(+), 24 deletions(-) create mode 100644 lib/src/progress_jsonl.rs diff --git a/lib/src/cli.rs b/lib/src/cli.rs index e293fd940..8e429fe93 100644 --- a/lib/src/cli.rs +++ b/lib/src/cli.rs @@ -4,6 +4,7 @@ use std::ffi::{CString, OsStr, OsString}; use std::io::Seek; +use std::os::fd::RawFd; use std::os::unix::process::CommandExt; use std::process::Command; @@ -25,6 +26,8 @@ use serde::{Deserialize, Serialize}; use crate::deploy::RequiredHostSpec; use crate::lints; +use crate::progress_jsonl; +use crate::progress_jsonl::ProgressWriter; use crate::spec::Host; use crate::spec::ImageReference; use crate::utils::sigpolicy_from_opts; @@ -52,6 +55,10 @@ pub(crate) struct UpgradeOpts { /// a userspace-only restart. #[clap(long, conflicts_with = "check")] pub(crate) apply: bool, + + /// Pipe download progress to this fd in a jsonl format. + #[clap(long)] + pub(crate) json_fd: Option, } /// Perform an switch operation @@ -101,6 +108,10 @@ pub(crate) struct SwitchOpts { /// Target image to use for the next boot. pub(crate) target: String, + + /// Pipe download progress to this fd in a jsonl format. + #[clap(long)] + pub(crate) json_fd: Option, } /// Options controlling rollback @@ -644,6 +655,12 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { let (booted_deployment, _deployments, host) = crate::status::get_status_require_booted(sysroot)?; let imgref = host.spec.image.as_ref(); + let prog = opts + .json_fd + .map(progress_jsonl::ProgressWriter::from_raw_fd) + .transpose()? + .unwrap_or_default(); + // If there's no specified image, let's be nice and check if the booted system is using rpm-ostree if imgref.is_none() { let booted_incompatible = host @@ -700,7 +717,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { } } } else { - let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet).await?; + let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, prog.clone()).await?; let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status")); let fetched_digest = &fetched.manifest_digest; tracing::debug!("staged: {staged_digest:?}"); @@ -723,7 +740,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { println!("No update available.") } else { let osname = booted_deployment.osname(); - crate::deploy::stage(sysroot, &osname, &fetched, &spec).await?; + crate::deploy::stage(sysroot, &osname, &fetched, &spec, prog.clone()).await?; changed = true; if let Some(prev) = booted_image.as_ref() { if let Some(fetched_manifest) = fetched.get_manifest(repo)? { @@ -759,6 +776,11 @@ async fn switch(opts: SwitchOpts) -> Result<()> { ); let target = ostree_container::OstreeImageReference { sigverify, imgref }; let target = ImageReference::from(target); + let prog = opts + .json_fd + .map(progress_jsonl::ProgressWriter::from_raw_fd) + .transpose()? + .unwrap_or_default(); // If we're doing an in-place mutation, we shortcut most of the rest of the work here if opts.mutate_in_place { @@ -794,7 +816,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> { } let new_spec = RequiredHostSpec::from_spec(&new_spec)?; - let fetched = crate::deploy::pull(repo, &target, None, opts.quiet).await?; + let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, prog.clone()).await?; if !opts.retain { // By default, we prune the previous ostree ref so it will go away after later upgrades @@ -808,7 +830,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> { } let stateroot = booted_deployment.osname(); - crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec).await?; + crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec, prog.clone()).await?; if opts.apply { crate::reboot::reboot()?; @@ -850,18 +872,20 @@ async fn edit(opts: EditOpts) -> Result<()> { host.spec.verify_transition(&new_host.spec)?; let new_spec = RequiredHostSpec::from_spec(&new_host.spec)?; + let prog = ProgressWriter::default(); + // We only support two state transitions right now; switching the image, // or flipping the bootloader ordering. if host.spec.boot_order != new_host.spec.boot_order { return crate::deploy::rollback(sysroot).await; } - let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet).await?; + let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, prog.clone()).await?; // TODO gc old layers here let stateroot = booted_deployment.osname(); - crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec).await?; + crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec, prog.clone()).await?; Ok(()) } diff --git a/lib/src/deploy.rs b/lib/src/deploy.rs index 960c1abde..7196e2881 100644 --- a/lib/src/deploy.rs +++ b/lib/src/deploy.rs @@ -21,6 +21,7 @@ use ostree_ext::ostree::{self, Sysroot}; use ostree_ext::sysroot::SysrootLock; use ostree_ext::tokio_util::spawn_blocking_cancellable_flatten; +use crate::progress_jsonl::{Event, ProgressWriter, SubTaskBytes, SubTaskStep, API_VERSION}; use crate::spec::ImageReference; use crate::spec::{BootOrder, HostSpec}; use crate::status::labels_of_config; @@ -141,11 +142,20 @@ fn prefix_of_progress(p: &ImportProgress) -> &'static str { async fn handle_layer_progress_print( mut layers: tokio::sync::mpsc::Receiver, mut layer_bytes: tokio::sync::watch::Receiver>, + digest: Box, n_layers_to_fetch: usize, + layers_total: usize, + bytes_to_download: u64, + bytes_total: u64, + prog: ProgressWriter, + quiet: bool, ) { let start = std::time::Instant::now(); let mut total_read = 0u64; let bar = indicatif::MultiProgress::new(); + if quiet { + bar.set_draw_target(indicatif::ProgressDrawTarget::hidden()); + } let layers_bar = bar.add(indicatif::ProgressBar::new( n_layers_to_fetch.try_into().unwrap(), )); @@ -157,7 +167,8 @@ async fn handle_layer_progress_print( .template("{prefix} {bar} {pos}/{len} {wide_msg}") .unwrap(), ); - layers_bar.set_prefix("Fetching layers"); + let taskname = "Fetching layers"; + layers_bar.set_prefix(taskname); layers_bar.set_message(""); byte_bar.set_prefix("Fetching"); byte_bar.set_style( @@ -167,6 +178,9 @@ async fn handle_layer_progress_print( ) .unwrap() ); + + let mut subtasks = vec![]; + let mut subtask: SubTaskBytes = Default::default(); loop { tokio::select! { // Always handle layer changes first. @@ -174,18 +188,44 @@ async fn handle_layer_progress_print( layer = layers.recv() => { if let Some(l) = layer { let layer = descriptor_of_progress(&l); + let layer_type = prefix_of_progress(&l); + let short_digest = &layer.digest().digest()[0..21]; let layer_size = layer.size(); if l.is_starting() { + // Reset the progress bar byte_bar.reset_elapsed(); byte_bar.reset_eta(); byte_bar.set_length(layer_size); - let layer_type = prefix_of_progress(&l); - let short_digest = &layer.digest().digest()[0..21]; byte_bar.set_message(format!("{layer_type} {short_digest}")); + + subtask = SubTaskBytes { + subtask: layer_type.into(), + description: format!("{layer_type}: {short_digest}").clone().into(), + id: format!("{short_digest}").clone().into(), + bytes_cached: 0, + bytes: 0, + bytes_total: layer_size, + }; } else { byte_bar.set_position(layer_size); layers_bar.inc(1); total_read = total_read.saturating_add(layer_size); + // Emit an event where bytes == total to signal completion. + subtask.bytes = layer_size; + subtasks.push(subtask.clone()); + prog.send(Event::ProgressBytes { + api_version: API_VERSION.into(), + task: "pulling".into(), + description: format!("Pulling Image: {digest}").into(), + id: (*digest).into(), + bytes_cached: bytes_total - bytes_to_download, + bytes: total_read, + bytes_total: bytes_to_download, + steps_cached: (layers_total - n_layers_to_fetch) as u64, + steps: layers_bar.position(), + steps_total: n_layers_to_fetch as u64, + subtasks: subtasks.clone(), + }).await; } } else { // If the receiver is disconnected, then we're done @@ -197,9 +237,26 @@ async fn handle_layer_progress_print( // If the receiver is disconnected, then we're done break } - let bytes = layer_bytes.borrow(); - if let Some(bytes) = &*bytes { + let bytes = { + let bytes = layer_bytes.borrow_and_update(); + bytes.as_ref().cloned() + }; + if let Some(bytes) = bytes { byte_bar.set_position(bytes.fetched); + subtask.bytes = byte_bar.position(); + prog.send_lossy(Event::ProgressBytes { + api_version: API_VERSION.into(), + task: "pulling".into(), + description: format!("Pulling Image: {digest}").into(), + id: (*digest).into(), + bytes_cached: bytes_total - bytes_to_download, + bytes: total_read + byte_bar.position(), + bytes_total: bytes_to_download, + steps_cached: (layers_total - n_layers_to_fetch) as u64, + steps: layers_bar.position(), + steps_total: n_layers_to_fetch as u64, + subtasks: subtasks.clone().into_iter().chain([subtask.clone()]).collect(), + }).await; } } } @@ -221,6 +278,27 @@ async fn handle_layer_progress_print( )) { tracing::warn!("writing to stdout: {e}"); } + + // Since the progress notifier closed, we know import has started + // use as a heuristic to begin import progress + // Cannot be lossy or it is dropped + prog.send(Event::ProgressSteps { + api_version: API_VERSION.into(), + task: "importing".into(), + description: "Importing Image".into(), + id: (*digest).into(), + steps_cached: 0, + steps: 0, + steps_total: 1, + subtasks: [SubTaskStep { + subtask: "importing".into(), + description: "Importing Image".into(), + id: "importing".into(), + completed: false, + }] + .into(), + }) + .await; } /// Wrapper for pulling a container image, wiring up status output. @@ -230,6 +308,7 @@ pub(crate) async fn pull( imgref: &ImageReference, target_imgref: Option<&OstreeImageReference>, quiet: bool, + prog: ProgressWriter, ) -> Result> { let ostree_imgref = &OstreeImageReference::from(imgref.clone()); let mut imp = new_importer(repo, ostree_imgref).await?; @@ -250,20 +329,52 @@ pub(crate) async fn pull( ostree_ext::cli::print_layer_status(&prep); let layers_to_fetch = prep.layers_to_fetch().collect::>>()?; let n_layers_to_fetch = layers_to_fetch.len(); - let printer = (!quiet).then(|| { - let layer_progress = imp.request_progress(); - let layer_byte_progress = imp.request_layer_progress(); - tokio::task::spawn(async move { - handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch) - .await - }) + let layers_total = prep.all_layers().count(); + let bytes_to_fetch: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum(); + let bytes_total: u64 = prep.all_layers().map(|l| l.layer.size()).sum(); + + let prog_print = prog.clone(); + let digest = prep.manifest_digest.clone(); + let digest_imp = prep.manifest_digest.clone(); + let layer_progress = imp.request_progress(); + let layer_byte_progress = imp.request_layer_progress(); + let printer = tokio::task::spawn(async move { + handle_layer_progress_print( + layer_progress, + layer_byte_progress, + digest.as_ref().into(), + n_layers_to_fetch, + layers_total, + bytes_to_fetch, + bytes_total, + prog_print, + quiet, + ) + .await }); let import = imp.import(prep).await; - if let Some(printer) = printer { - let _ = printer.await; - } + let _ = printer.await; + // Both the progress and the import are done, so import is done as well + prog.send(Event::ProgressSteps { + api_version: API_VERSION.into(), + task: "importing".into(), + description: "Importing Image".into(), + id: digest_imp.clone().as_ref().into(), + steps_cached: 0, + steps: 1, + steps_total: 1, + subtasks: [SubTaskStep { + subtask: "importing".into(), + description: "Importing Image".into(), + id: "importing".into(), + completed: true, + }] + .into(), + }) + .await; let import = import?; let wrote_imgref = target_imgref.as_ref().unwrap_or(&ostree_imgref); + if let Some(msg) = ostree_container::store::image_filtered_content_warning(repo, &wrote_imgref.imgref) .context("Image content warning")? @@ -450,8 +561,53 @@ pub(crate) async fn stage( stateroot: &str, image: &ImageState, spec: &RequiredHostSpec<'_>, + prog: ProgressWriter, ) -> Result<()> { + let mut subtask = SubTaskStep { + subtask: "merging".into(), + description: "Merging Image".into(), + id: "fetching".into(), + completed: false, + }; + let mut subtasks = vec![]; + prog.send(Event::ProgressSteps { + api_version: API_VERSION.into(), + task: "staging".into(), + description: "Deploying Image".into(), + id: image.manifest_digest.clone().as_ref().into(), + steps_cached: 0, + steps: 0, + steps_total: 3, + subtasks: subtasks + .clone() + .into_iter() + .chain([subtask.clone()]) + .collect(), + }) + .await; let merge_deployment = sysroot.merge_deployment(Some(stateroot)); + + subtask.completed = true; + subtasks.push(subtask.clone()); + subtask.subtask = "deploying".into(); + subtask.id = "deploying".into(); + subtask.description = "Deploying Image".into(); + subtask.completed = false; + prog.send(Event::ProgressSteps { + api_version: API_VERSION.into(), + task: "staging".into(), + description: "Deploying Image".into(), + id: image.manifest_digest.clone().as_ref().into(), + steps_cached: 0, + steps: 1, + steps_total: 3, + subtasks: subtasks + .clone() + .into_iter() + .chain([subtask.clone()]) + .collect(), + }) + .await; let origin = origin_from_imageref(spec.image)?; let deployment = crate::deploy::deploy( sysroot, @@ -462,8 +618,50 @@ pub(crate) async fn stage( ) .await?; + subtask.completed = true; + subtasks.push(subtask.clone()); + subtask.subtask = "bound_images".into(); + subtask.id = "bound_images".into(); + subtask.description = "Pulling Bound Images".into(); + subtask.completed = false; + prog.send(Event::ProgressSteps { + api_version: API_VERSION.into(), + task: "staging".into(), + description: "Deploying Image".into(), + id: image.manifest_digest.clone().as_ref().into(), + steps_cached: 0, + steps: 1, + steps_total: 3, + subtasks: subtasks + .clone() + .into_iter() + .chain([subtask.clone()]) + .collect(), + }) + .await; crate::boundimage::pull_bound_images(sysroot, &deployment).await?; + subtask.completed = true; + subtasks.push(subtask.clone()); + subtask.subtask = "cleanup".into(); + subtask.id = "cleanup".into(); + subtask.description = "Removing old images".into(); + subtask.completed = false; + prog.send(Event::ProgressSteps { + api_version: API_VERSION.into(), + task: "staging".into(), + description: "Deploying Image".into(), + id: image.manifest_digest.clone().as_ref().into(), + steps_cached: 0, + steps: 2, + steps_total: 3, + subtasks: subtasks + .clone() + .into_iter() + .chain([subtask.clone()]) + .collect(), + }) + .await; crate::deploy::cleanup(sysroot).await?; println!("Queued for next boot: {:#}", spec.image); if let Some(version) = image.version.as_deref() { @@ -471,6 +669,24 @@ pub(crate) async fn stage( } println!(" Digest: {}", image.manifest_digest); + subtask.completed = true; + subtasks.push(subtask.clone()); + prog.send(Event::ProgressSteps { + api_version: API_VERSION.into(), + task: "staging".into(), + description: "Deploying Image".into(), + id: image.manifest_digest.clone().as_ref().into(), + steps_cached: 0, + steps: 3, + steps_total: 3, + subtasks: subtasks + .clone() + .into_iter() + .chain([subtask.clone()]) + .collect(), + }) + .await; + Ok(()) } diff --git a/lib/src/install.rs b/lib/src/install.rs index 548c3e8d3..2834ae44f 100644 --- a/lib/src/install.rs +++ b/lib/src/install.rs @@ -48,6 +48,7 @@ use crate::boundimage::{BoundImage, ResolvedBoundImage}; use crate::containerenv::ContainerExecutionInfo; use crate::lsm; use crate::mount::Filesystem; +use crate::progress_jsonl::ProgressWriter; use crate::spec::ImageReference; use crate::store::Storage; use crate::task::Task; @@ -733,7 +734,14 @@ async fn install_container( let spec_imgref = ImageReference::from(src_imageref.clone()); let repo = &sysroot.repo(); repo.set_disable_fsync(true); - let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false).await?; + let r = crate::deploy::pull( + repo, + &spec_imgref, + Some(&state.target_imgref), + false, + ProgressWriter::default(), + ) + .await?; repo.set_disable_fsync(false); r }; diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 1f0c263b5..f90ef3665 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -41,3 +41,4 @@ pub mod spec; mod docgen; mod glyph; mod imgstorage; +mod progress_jsonl; diff --git a/lib/src/progress_jsonl.rs b/lib/src/progress_jsonl.rs new file mode 100644 index 000000000..8e97a7631 --- /dev/null +++ b/lib/src/progress_jsonl.rs @@ -0,0 +1,290 @@ +//! Output progress data using the json-lines format. For more information +//! see . + +use anyhow::Result; +use fn_error_context::context; +use serde::Serialize; +use std::borrow::Cow; +use std::os::fd::{FromRawFd, OwnedFd, RawFd}; +use std::sync::Arc; +use std::time::Instant; +use tokio::io::{AsyncWriteExt, BufWriter}; +use tokio::net::unix::pipe::Sender; +use tokio::sync::Mutex; + +// Maximum number of times per second that an event will be written. +const REFRESH_HZ: u16 = 5; + +pub const API_VERSION: &str = "org.containers.bootc.progress/v1"; + +/// An incremental update to e.g. a container image layer download. +/// The first time a given "subtask" name is seen, a new progress bar should be created. +/// If bytes == bytes_total, then the subtask is considered complete. +#[derive(Debug, serde::Serialize, serde::Deserialize, Default, Clone)] +#[serde(rename_all = "camelCase")] +pub struct SubTaskBytes<'t> { + /// A machine readable type for the task (used for i18n). + /// (e.g., "ostree_chunk", "ostree_derived") + #[serde(borrow)] + pub subtask: Cow<'t, str>, + /// A human readable description of the task if i18n is not available. + /// (e.g., "OSTree Chunk:", "Derived Layer:") + #[serde(borrow)] + pub description: Cow<'t, str>, + /// A human and machine readable identifier for the task + /// (e.g., ostree chunk/layer hash). + #[serde(borrow)] + pub id: Cow<'t, str>, + /// The number of bytes fetched by a previous run (e.g., zstd_chunked). + pub bytes_cached: u64, + /// Updated byte level progress + pub bytes: u64, + /// Total number of bytes + pub bytes_total: u64, +} + +/// Marks the beginning and end of a dictrete step +#[derive(Debug, serde::Serialize, serde::Deserialize, Default, Clone)] +#[serde(rename_all = "camelCase")] +pub struct SubTaskStep<'t> { + /// A machine readable type for the task (used for i18n). + /// (e.g., "ostree_chunk", "ostree_derived") + #[serde(borrow)] + pub subtask: Cow<'t, str>, + /// A human readable description of the task if i18n is not available. + /// (e.g., "OSTree Chunk:", "Derived Layer:") + #[serde(borrow)] + pub description: Cow<'t, str>, + /// A human and machine readable identifier for the task + /// (e.g., ostree chunk/layer hash). + #[serde(borrow)] + pub id: Cow<'t, str>, + /// Starts as false when beginning to execute and turns true when completed. + pub completed: bool, +} + +/// An event emitted as JSON. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[serde( + tag = "type", + rename_all = "PascalCase", + rename_all_fields = "camelCase" +)] +pub enum Event<'t> { + /// An incremental update to a container image layer download + ProgressBytes { + /// The version of the progress event format. + #[serde(borrow)] + api_version: Cow<'t, str>, + /// A machine readable type (e.g., pulling) for the task (used for i18n + /// and UI customization). + #[serde(borrow)] + task: Cow<'t, str>, + /// A human readable description of the task if i18n is not available. + #[serde(borrow)] + description: Cow<'t, str>, + /// A human and machine readable unique identifier for the task + /// (e.g., the image name). For tasks that only happen once, + /// it can be set to the same value as task. + #[serde(borrow)] + id: Cow<'t, str>, + /// The number of bytes fetched by a previous run. + bytes_cached: u64, + /// The number of bytes already fetched. + bytes: u64, + /// Total number of bytes. If zero, then this should be considered "unspecified". + bytes_total: u64, + /// The number of steps fetched by a previous run. + steps_cached: u64, + /// The initial position of progress. + steps: u64, + /// The total number of steps (e.g. container image layers, RPMs) + steps_total: u64, + /// The currently running subtasks. + subtasks: Vec>, + }, + /// An incremental update with discrete steps + ProgressSteps { + /// The version of the progress event format. + #[serde(borrow)] + api_version: Cow<'t, str>, + /// A machine readable type (e.g., pulling) for the task (used for i18n + /// and UI customization). + #[serde(borrow)] + task: Cow<'t, str>, + /// A human readable description of the task if i18n is not available. + #[serde(borrow)] + description: Cow<'t, str>, + /// A human and machine readable unique identifier for the task + /// (e.g., the image name). For tasks that only happen once, + /// it can be set to the same value as task. + #[serde(borrow)] + id: Cow<'t, str>, + /// The number of steps fetched by a previous run. + steps_cached: u64, + /// The initial position of progress. + steps: u64, + /// The total number of steps (e.g. container image layers, RPMs) + steps_total: u64, + /// The currently running subtasks. + subtasks: Vec>, + }, +} + +#[derive(Debug)] +struct ProgressWriterInner { + last_write: Option, + fd: BufWriter, +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct ProgressWriter { + inner: Arc>>, +} + +impl TryFrom for ProgressWriter { + type Error = anyhow::Error; + + fn try_from(value: OwnedFd) -> Result { + let value = Sender::from_owned_fd(value)?; + Ok(Self::from(value)) + } +} + +impl From for ProgressWriter { + fn from(value: Sender) -> Self { + let inner = ProgressWriterInner { + last_write: None, + fd: BufWriter::new(value), + }; + Self { + inner: Arc::new(Some(inner).into()), + } + } +} + +impl ProgressWriter { + /// Given a raw file descriptor, create an instance of a json-lines writer. + #[allow(unsafe_code)] + #[context("Creating progress writer")] + pub(crate) fn from_raw_fd(fd: RawFd) -> Result { + unsafe { OwnedFd::from_raw_fd(fd) }.try_into() + } + + /// Serialize the target object to JSON as a single line + pub(crate) async fn send_impl(&self, v: T, required: bool) -> Result<()> { + let mut guard = self.inner.lock().await; + // Check if we have an inner value; if not, nothing to do. + let Some(inner) = guard.as_mut() else { + return Ok(()); + }; + + // For messages that can be dropped, if we already sent an update within this cycle, discard this one. + // TODO: Also consider querying the pipe buffer and also dropping if we can't do this write. + let now = Instant::now(); + if !required { + const REFRESH_MS: u32 = 1000 / REFRESH_HZ as u32; + if let Some(elapsed) = inner.last_write.map(|w| now.duration_since(w)) { + if elapsed.as_millis() < REFRESH_MS.into() { + return Ok(()); + } + } + } + + // SAFETY: Propagating panics from the mutex here is intentional + // serde is guaranteed not to output newlines here + let buf = serde_json::to_vec(&v)?; + inner.fd.write_all(&buf).await?; + // We always end in a newline + inner.fd.write_all(b"\n").await?; + // And flush to ensure the remote side sees updates immediately + inner.fd.flush().await?; + // Update the last write time + inner.last_write = Some(now); + Ok(()) + } + + /// Send an event. + pub(crate) async fn send(&self, v: T) { + if let Err(e) = self.send_impl(v, true).await { + eprintln!("Failed to write to jsonl: {}", e); + // Stop writing to fd but let process continue + // SAFETY: Propagating panics from the mutex here is intentional + let _ = self.inner.lock().await.take(); + } + } + + /// Send an event that can be dropped. + pub(crate) async fn send_lossy(&self, v: T) { + if let Err(e) = self.send_impl(v, false).await { + eprintln!("Failed to write to jsonl: {}", e); + // Stop writing to fd but let process continue + // SAFETY: Propagating panics from the mutex here is intentional + let _ = self.inner.lock().await.take(); + } + } + + /// Flush remaining data and return the underlying file. + #[allow(dead_code)] + pub(crate) async fn into_inner(self) -> Result> { + // SAFETY: Propagating panics from the mutex here is intentional + let mut mutex = self.inner.lock().await; + if let Some(inner) = mutex.take() { + Ok(Some(inner.fd.into_inner())) + } else { + Ok(None) + } + } +} + +#[cfg(test)] +mod test { + use serde::Deserialize; + use tokio::io::{AsyncBufReadExt, BufReader}; + + use super::*; + + #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] + struct S { + s: String, + v: u32, + } + + #[tokio::test] + async fn test_jsonl() -> Result<()> { + let testvalues = [ + S { + s: "foo".into(), + v: 42, + }, + S { + // Test with an embedded newline to sanity check that serde doesn't write it literally + s: "foo\nbar".into(), + v: 0, + }, + ]; + let (send, recv) = tokio::net::unix::pipe::pipe()?; + let testvalues_sender = &testvalues; + let sender = async move { + let w = ProgressWriter::try_from(send)?; + for value in testvalues_sender { + w.send(value).await; + } + anyhow::Ok(()) + }; + let testvalues = &testvalues; + let receiver = async move { + let tf = BufReader::new(recv); + let mut expected = testvalues.iter(); + let mut lines = tf.lines(); + while let Some(line) = lines.next_line().await? { + let found: S = serde_json::from_str(&line)?; + let expected = expected.next().unwrap(); + assert_eq!(&found, expected); + } + anyhow::Ok(()) + }; + tokio::try_join!(sender, receiver)?; + Ok(()) + } +} diff --git a/ostree-ext/src/cli.rs b/ostree-ext/src/cli.rs index fa60ccf41..e63e4240e 100644 --- a/ostree-ext/src/cli.rs +++ b/ostree-ext/src/cli.rs @@ -657,6 +657,7 @@ pub async fn handle_layer_progress_print( pub fn print_layer_status(prep: &PreparedImport) { if let Some(status) = prep.format_layer_status() { println!("{status}"); + let _ = std::io::stdout().flush(); } } diff --git a/ostree-ext/src/container/store.rs b/ostree-ext/src/container/store.rs index d4e4a4fec..b2a57a86a 100644 --- a/ostree-ext/src/container/store.rs +++ b/ostree-ext/src/container/store.rs @@ -102,7 +102,7 @@ impl ImportProgress { } /// Sent across a channel to track the byte-level progress of a layer fetch. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct LayerProgress { /// Index of the layer in the manifest pub layer_index: usize, @@ -193,7 +193,7 @@ pub enum PrepareResult { #[derive(Debug)] pub struct ManifestLayerState { /// The underlying layer descriptor. - pub(crate) layer: oci_image::Descriptor, + pub layer: oci_image::Descriptor, // TODO semver: Make this readonly via an accessor /// The ostree ref name for this layer. pub ostree_ref: String, @@ -952,6 +952,10 @@ impl ImageImporter { proxy.finalize().await?; tracing::debug!("finalized proxy"); + // Disconnect progress notifiers to signal we're done with fetching. + let _ = self.layer_byte_progress.take(); + let _ = self.layer_progress.take(); + let serialized_manifest = serde_json::to_string(&import.manifest)?; let serialized_config = serde_json::to_string(&import.config)?; let mut metadata = HashMap::new();