From e2413e889e12e38b7d7557c90cd6be4156ec6f3a Mon Sep 17 00:00:00 2001 From: Zak Stucke Date: Mon, 12 Aug 2024 22:03:51 +0300 Subject: [PATCH] stdout and stderr realtime logic elevated into own feature --- rust/Cargo.toml | 2 +- rust/bitbazaar/command/mod.rs | 3 + rust/bitbazaar/command/spawn_builder.rs | 189 +++++++++++++++++++++ rust/bitbazaar/file/chmod.rs | 41 +++++ rust/bitbazaar/file/mod.rs | 3 + rust/bitbazaar/lib.rs | 8 + rust/bitbazaar/log/global_log/setup.rs | 4 +- rust/bitbazaar/log/mod.rs | 2 + rust/bitbazaar/log/standalone_collector.rs | 145 +++------------- rust/bitbazaar/misc/looper.rs | 4 +- rust/bitbazaar/misc/mod.rs | 13 +- rust/bitbazaar/misc/tarball.rs | 96 ++++++++++- 12 files changed, 368 insertions(+), 142 deletions(-) create mode 100644 rust/bitbazaar/command/mod.rs create mode 100644 rust/bitbazaar/command/spawn_builder.rs create mode 100644 rust/bitbazaar/file/chmod.rs create mode 100644 rust/bitbazaar/file/mod.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml index b69295ca..d00de0d1 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -127,7 +127,7 @@ gloo-timers = { version = "0.3", features = ["futures"] } # This includes threading (non-blocking stuff that can't be used in wasm) tracing-appender = '0.2' hostname = "0.3.1" -tokio = { version = '1', features = ["time", "sync", "fs", "process"] } +tokio = { version = '1', features = ["time", "sync", "fs", "process", "rt", "io-util"] } named-lock = "0.4" [dev-dependencies] diff --git a/rust/bitbazaar/command/mod.rs b/rust/bitbazaar/command/mod.rs new file mode 100644 index 00000000..239b3d78 --- /dev/null +++ b/rust/bitbazaar/command/mod.rs @@ -0,0 +1,3 @@ +mod spawn_builder; + +pub use spawn_builder::*; diff --git a/rust/bitbazaar/command/spawn_builder.rs b/rust/bitbazaar/command/spawn_builder.rs new file mode 100644 index 00000000..357208c3 --- /dev/null +++ b/rust/bitbazaar/command/spawn_builder.rs @@ -0,0 +1,189 @@ +use std::{future::Future, process::Stdio, sync::Arc}; + +use futures::future::BoxFuture; + +/// Extension trait for getting the spawn extension builder from both std and tokio commands. +pub trait CmdSpawnExt { + /// The type of the spawn extension builder. + type Spawner<'a> + where + Self: 'a; + + /// Get a spawn extension builder that provides the ability to: + /// - Listen to stdout and/or stderr line by line in real-time via callbacks. + fn spawn_builder(&mut self) -> Self::Spawner<'_>; +} + +impl CmdSpawnExt for std::process::Command { + type Spawner<'a> = CmdSpawnBuilderSync<'a>; + + fn spawn_builder(&mut self) -> Self::Spawner<'_> { + CmdSpawnBuilderSync { + command: self, + on_stdout: None, + on_stderr: None, + } + } +} + +impl CmdSpawnExt for tokio::process::Command { + type Spawner<'a> = CmdSpawnBuilderAsync<'a>; + + fn spawn_builder(&mut self) -> Self::Spawner<'_> { + CmdSpawnBuilderAsync { + command: self, + on_stdout: None, + on_stderr: None, + } + } +} + +/// Synchronous command spawn extension builder. For [`std::process::Command`]. +pub struct CmdSpawnBuilderSync<'a> { + command: &'a mut std::process::Command, + on_stdout: Option>, + on_stderr: Option>>, +} + +impl<'a> CmdSpawnBuilderSync<'a> { + /// Set a callback to be called for each line of stdout. + pub fn on_stdout(mut self, on_stdout: impl Fn(String) + Sync + Send + 'static) -> Self { + self.on_stdout = Some(Box::new(on_stdout)); + self.command.stdout(Stdio::piped()); + self + } + + /// Set a callback to be called for each line of stderr. + pub fn on_stderr(mut self, on_stderr: impl Fn(String) + Sync + Send + 'static) -> Self { + self.on_stderr = Some(Arc::new(Box::new(on_stderr))); + self.command.stderr(Stdio::piped()); + self + } + + /// Spawn the command. + pub fn spawn(self) -> std::io::Result { + let mut child = self.command.spawn()?; + + use std::io::BufRead; + + // Capture and print stdout in a separate thread + if let Some(on_stdout) = self.on_stdout { + let on_stderr = self.on_stderr.as_ref().map(|on_stderr| on_stderr.clone()); + if let Some(stdout) = child.stdout.take() { + let stdout_reader = std::io::BufReader::new(stdout); + std::thread::spawn(move || { + for line in stdout_reader.lines() { + match line { + Ok(line) => on_stdout(line), + Err(e) => { + let msg = format!("Error reading stdout: {:?}", e); + if let Some(on_stderr) = on_stderr.as_ref() { + on_stderr(msg); + } else { + on_stdout(msg); + } + } + } + } + }); + } + } + + // Capture and print stderr in a separate thread + if let Some(on_stderr) = self.on_stderr { + if let Some(stderr) = child.stderr.take() { + let stderr_reader = std::io::BufReader::new(stderr); + std::thread::spawn(move || { + for line in stderr_reader.lines() { + match line { + Ok(line) => on_stderr(line), + Err(e) => on_stderr(format!("Error reading stderr: {:?}", e)), + } + } + }); + } + } + + Ok(child) + } +} + +/// Asynchronous command spawn extension builder. For [`tokio::process::Command`]. +pub struct CmdSpawnBuilderAsync<'a> { + command: &'a mut tokio::process::Command, + on_stdout: Option BoxFuture<'static, ()> + Send + 'static>>, + on_stderr: Option BoxFuture<'static, ()> + Send + 'static>>, +} + +impl<'a> CmdSpawnBuilderAsync<'a> { + /// Set a callback to be called for each line of stdout. + pub fn on_stdout + Send + 'static>( + mut self, + on_stdout: impl Fn(String) -> Fut + Send + 'static, + ) -> Self { + self.on_stdout = Some(Box::new(move |s| Box::pin(on_stdout(s)))); + self.command.stdout(Stdio::piped()); + self + } + + /// Set a callback to be called for each line of stderr. + pub fn on_stderr + Send + 'static>( + mut self, + on_stderr: impl Fn(String) -> Fut + Send + 'static, + ) -> Self { + self.on_stderr = Some(Box::new(move |s| Box::pin(on_stderr(s)))); + self.command.stderr(Stdio::piped()); + self + } + + /// Spawn the command. + pub fn spawn(self) -> std::io::Result { + use tokio::io::AsyncBufReadExt; + + let mut child = self.command.spawn()?; + + // Capture and print stdout in a separate thread + if let Some(on_stdout) = self.on_stdout { + if let Some(stdout) = child.stdout.take() { + let stdout_reader = tokio::io::BufReader::new(stdout); + tokio::spawn(async move { + let mut lines = stdout_reader.lines(); + loop { + match lines.next_line().await { + Ok(v) => match v { + Some(line) => on_stdout(line).await, + None => break, + }, + Err(e) => { + on_stdout(format!("Error reading stdout: {:?}", e)).await; + } + } + } + }); + } + } + + // Capture and print stderr in a separate thread + if let Some(on_stderr) = self.on_stderr { + if let Some(stderr) = child.stderr.take() { + let stderr_reader = tokio::io::BufReader::new(stderr); + tokio::spawn(async move { + let mut lines = stderr_reader.lines(); + loop { + match lines.next_line().await { + Ok(v) => match v { + Some(line) => on_stderr(line).await, + None => break, + }, + Err(e) => on_stderr(format!("Error reading stderr: {:?}", e)).await, + } + } + }); + } + } + + Ok(child) + } +} + +// TESTING: implicitly tested during log tests which use it to extract logs I think. diff --git a/rust/bitbazaar/file/chmod.rs b/rust/bitbazaar/file/chmod.rs new file mode 100644 index 00000000..794223b3 --- /dev/null +++ b/rust/bitbazaar/file/chmod.rs @@ -0,0 +1,41 @@ +use std::path::Path; + +/// Chmod an open file, noop on Windows. +/// +/// ENTER AS HEX! +/// +/// E.g. `0o755` rather than `755`. +pub fn chmod_sync(mode: u32, filepath: &Path) -> Result<(), std::io::Error> { + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + std::fs::set_permissions(filepath, std::fs::Permissions::from_mode(mode))?; + } + Ok(()) +} + +/// Good default, chmod an open file to be executable by all, writeable by owner. +pub fn chmod_executable_sync(filepath: &Path) -> Result<(), std::io::Error> { + chmod_sync(0o755, filepath) +} + +/// Chmod an open file, noop on Windows. +/// +/// ENTER AS HEX! +/// +/// E.g. `0o755` rather than `755`. +pub async fn chmod_async(mode: u32, filepath: &Path) -> Result<(), std::io::Error> { + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + tokio::fs::set_permissions(&filepath, std::fs::Permissions::from_mode(mode)).await?; + } + Ok(()) +} + +/// Good default (755). +/// +/// chmod an open file to be executable by all, writeable by owner. +pub async fn chmod_executable_async(filepath: &Path) -> Result<(), std::io::Error> { + chmod_async(0o755, filepath).await +} diff --git a/rust/bitbazaar/file/mod.rs b/rust/bitbazaar/file/mod.rs new file mode 100644 index 00000000..60251bb1 --- /dev/null +++ b/rust/bitbazaar/file/mod.rs @@ -0,0 +1,3 @@ +mod chmod; + +pub use chmod::*; diff --git a/rust/bitbazaar/lib.rs b/rust/bitbazaar/lib.rs index 63bcbfeb..9faa8bef 100644 --- a/rust/bitbazaar/lib.rs +++ b/rust/bitbazaar/lib.rs @@ -13,6 +13,10 @@ mod prelude; #[cfg(feature = "cli")] /// Command line interface utilities. pub mod cli; +/// OS Command related utilities. Much lighter weight than cli module (which will probably be deprecated as badly written and not maintainable). +/// Not applicable to wasm +#[cfg(not(target_arch = "wasm32"))] +pub mod command; /// Chrono utilities pub mod chrono; @@ -23,6 +27,10 @@ pub mod cookies; pub mod crypto; /// Error handling utilities. pub mod errors; + +/// File related utilities. Therefore disabled on wasm. +#[cfg(not(target_arch = "wasm32"))] +pub mod file; /// Logging utilities pub mod log; /// Completely miscellaneous utilities diff --git a/rust/bitbazaar/log/global_log/setup.rs b/rust/bitbazaar/log/global_log/setup.rs index a1bc88c9..ae79a335 100644 --- a/rust/bitbazaar/log/global_log/setup.rs +++ b/rust/bitbazaar/log/global_log/setup.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use tracing::{Dispatch, Level, Metadata, Subscriber}; use tracing_subscriber::{filter::FilterFn, layer::SubscriberExt, registry::LookupSpan, Layer}; @@ -222,7 +220,7 @@ pub fn builder_into_global_log(builder: GlobalLogBuilder) -> RResult = + let header_map: std::collections::HashMap = otlp.http_headers.iter().cloned().collect(); let get_exporter = || { new_exporter() diff --git a/rust/bitbazaar/log/mod.rs b/rust/bitbazaar/log/mod.rs index 8bddf808..e882c98f 100644 --- a/rust/bitbazaar/log/mod.rs +++ b/rust/bitbazaar/log/mod.rs @@ -375,6 +375,7 @@ mod tests { move |stdout| { println!("{}", stdout); records.lock().push(stdout); + async {} } }, { @@ -382,6 +383,7 @@ mod tests { move |stderr| { println!("{}", stderr); records.lock().push(stderr); + async {} } }, ) diff --git a/rust/bitbazaar/log/standalone_collector.rs b/rust/bitbazaar/log/standalone_collector.rs index baa0c05f..d55a8843 100644 --- a/rust/bitbazaar/log/standalone_collector.rs +++ b/rust/bitbazaar/log/standalone_collector.rs @@ -1,10 +1,12 @@ +use std::future::Future; use std::io::{Cursor, Read, Write}; use std::path::{Path, PathBuf}; -use std::process::Stdio; use tempfile::NamedTempFile; use tokio::io::AsyncWriteExt; +use crate::command::CmdSpawnExt; +use crate::file::chmod_executable_async; use crate::log::record_exception; use crate::misc::{platform, setup_once, tarball_decompress}; use crate::prelude::*; @@ -26,10 +28,13 @@ impl CollectorStandalone { /// - config: the config file contents to pass to the collector. /// - on_stdout: what to do with each stdout line emitted by the process /// - on_stderr: what to do with each stderr line emitted by the process - pub async fn new( + pub async fn new< + FutOnStdOut: Future + Send + 'static, + FutOnStdErr: Future + Send + 'static, + >( config: &str, - on_stdout: impl Fn(String) + Send + 'static + Clone, - on_stderr: impl Fn(String) + Send + 'static + Clone, + on_stdout: impl Fn(String) -> FutOnStdOut + Send + 'static + Clone, + on_stderr: impl Fn(String) -> FutOnStdErr + Send + 'static + Clone, ) -> RResult { let mut config_file = NamedTempFile::new().change_context(AnyErr)?; config_file @@ -44,16 +49,22 @@ impl CollectorStandalone { }; static COLLECTOR_VERSION: &str = "0.106.1"; - async fn spawn_child( + async fn spawn_child< + FutOnStdOut: Future + Send + 'static, + FutOnStdErr: Future + Send + 'static, + >( workspace_dir: PathBuf, config_filepath: &Path, - on_stdout: impl Fn(String) + Send + 'static, - on_stderr: impl Fn(String) + Send + 'static + Clone, + on_stdout: impl Fn(String) -> FutOnStdOut + Send + 'static, + on_stderr: impl Fn(String) -> FutOnStdErr + Send + 'static, ) -> RResult { tokio::process::Command::new(workspace_dir.join(COLLECTOR_BINARY_NAME)) .arg("--config") .arg(config_filepath) - .spawn_with_managed_std(on_stdout, on_stderr) + .spawn_builder() + .on_stdout(on_stdout) + .on_stderr(on_stderr) + .spawn() .change_context(AnyErr) } @@ -142,18 +153,11 @@ impl CollectorStandalone { .change_context(AnyErr)?; file.write_all(&binary).await.change_context(AnyErr)?; - // TODONOW utility - // Execute permissions: - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - tokio::fs::set_permissions( - &filepath, - std::fs::Permissions::from_mode(0o755), - ) + + // Make runnable: + chmod_executable_async(&filepath) .await .change_context(AnyErr)?; - } } // Before adding a small sleep, on macos I'd randomly get Malformed Mach-o file (os error 88) when instantly trying to run binary after above: @@ -206,108 +210,3 @@ impl Drop for CollectorStandalone { self.kill_inner() } } - -/// TODONOW move somewhere else to generalise/expose and finalise name. -trait CmdSpawnWithManagedStd { - type Child; - - fn spawn_with_managed_std( - &mut self, - on_stdout: impl Fn(String) + Send + 'static, - on_stderr: impl Fn(String) + Send + 'static + Clone, - ) -> std::io::Result; -} - -impl CmdSpawnWithManagedStd for std::process::Command { - type Child = std::process::Child; - - fn spawn_with_managed_std( - &mut self, - on_stdout: impl Fn(String) + Send + 'static, - on_stderr: impl Fn(String) + Send + 'static + Clone, - ) -> std::io::Result { - let mut child = self.stdout(Stdio::piped()).stderr(Stdio::piped()).spawn()?; - - use std::io::BufRead; - - // Capture and print stdout in a separate thread - if let Some(stdout) = child.stdout.take() { - let on_stderr = on_stderr.clone(); - let stdout_reader = std::io::BufReader::new(stdout); - std::thread::spawn(move || { - for line in stdout_reader.lines() { - match line { - Ok(line) => on_stdout(line), - Err(e) => on_stderr(format!("Error reading stdout: {:?}", e)), - } - } - }); - } - - // Capture and print stderr in a separate thread - if let Some(stderr) = child.stderr.take() { - let stderr_reader = std::io::BufReader::new(stderr); - std::thread::spawn(move || { - for line in stderr_reader.lines() { - match line { - Ok(line) => on_stderr(line), - Err(e) => on_stderr(format!("Error reading stderr: {:?}", e)), - } - } - }); - } - - Ok(child) - } -} - -impl CmdSpawnWithManagedStd for tokio::process::Command { - type Child = tokio::process::Child; - - fn spawn_with_managed_std( - &mut self, - on_stdout: impl Fn(String) + Send + 'static, - on_stderr: impl Fn(String) + Send + 'static + Clone, - ) -> tokio::io::Result { - use tokio::io::AsyncBufReadExt; - - let mut child = self.stdout(Stdio::piped()).stderr(Stdio::piped()).spawn()?; - - // Capture and print stdout in a separate thread - if let Some(stdout) = child.stdout.take() { - let on_stderr = on_stderr.clone(); - let stdout_reader = tokio::io::BufReader::new(stdout); - tokio::spawn(async move { - let mut lines = stdout_reader.lines(); - loop { - match lines.next_line().await { - Ok(v) => match v { - Some(line) => on_stdout(line), - None => break, - }, - Err(e) => on_stderr(format!("Error reading stdout: {:?}", e)), - } - } - }); - } - - // Capture and print stderr in a separate thread - if let Some(stderr) = child.stderr.take() { - let stderr_reader = tokio::io::BufReader::new(stderr); - tokio::spawn(async move { - let mut lines = stderr_reader.lines(); - loop { - match lines.next_line().await { - Ok(v) => match v { - Some(line) => on_stderr(line), - None => break, - }, - Err(e) => on_stderr(format!("Error reading stderr: {:?}", e)), - } - } - }); - } - - Ok(child) - } -} diff --git a/rust/bitbazaar/misc/looper.rs b/rust/bitbazaar/misc/looper.rs index c46716bc..0dfb89ed 100644 --- a/rust/bitbazaar/misc/looper.rs +++ b/rust/bitbazaar/misc/looper.rs @@ -1,6 +1,4 @@ -/// TODONOW use this anywhere else we use the pattern, sep from here. -/// We like the pattern of internalising loops and operating via mutable callbacks. -/// This standardizes that pattern's interface. +/// Standardizes pattern for internalised loop that allows mutable callbacks and persistent state. /// The state is data that's persisted between iterations, and eventually passed out, this should be customisable by the caller. pub struct Looper { pub(crate) state: State, diff --git a/rust/bitbazaar/misc/mod.rs b/rust/bitbazaar/misc/mod.rs index 55d2ab97..70dddcda 100644 --- a/rust/bitbazaar/misc/mod.rs +++ b/rust/bitbazaar/misc/mod.rs @@ -1,6 +1,9 @@ /// Byte manipulation utilities, e.g. transfer speed. pub mod bytes; +/// Platform utilities, e.g. OS type, cpu arch, in_ci. +pub mod platform; + mod binary_search; mod flexi_logger; mod global_lock; @@ -8,12 +11,6 @@ mod is_tcp_port_listening; mod looper; mod main_wrapper; mod periodic_updater; -#[cfg(feature = "tarball")] -mod tarball; -#[cfg(feature = "tarball")] -pub use tarball::*; -/// Platform utilities, e.g. OS type, cpu arch, in_ci. -pub mod platform; mod random; #[cfg(feature = "redis")] mod refreshable; @@ -21,6 +18,8 @@ mod retry; mod serde_migratable; mod setup_once; mod sleep_compat; +#[cfg(feature = "tarball")] +mod tarball; mod timeout; pub use binary_search::*; @@ -37,4 +36,6 @@ pub use retry::*; pub use serde_migratable::*; pub use setup_once::*; pub use sleep_compat::*; +#[cfg(feature = "tarball")] +pub use tarball::*; pub use timeout::*; diff --git a/rust/bitbazaar/misc/tarball.rs b/rust/bitbazaar/misc/tarball.rs index 66b4bd8e..22fcbf43 100644 --- a/rust/bitbazaar/misc/tarball.rs +++ b/rust/bitbazaar/misc/tarball.rs @@ -1,14 +1,14 @@ -use std::{borrow::Cow, io::Read, path::Path}; +use std::{ + borrow::Cow, + io::{Read, Write}, + path::Path, +}; use crate::prelude::*; use super::Looper; -/// Decompress an in-memory .tar.gz to a hashmap of in-memory files. -/// -/// Uses a callback to hide away some of the complexity of the tarball crate. -/// Mutable state can be passed to persist data in and out. -/// a bool should also be returned, if false, no further files will be processed. +/// Decompress a tarball (.tar.gz). pub fn tarball_decompress( src: R, mut state: S, @@ -31,6 +31,30 @@ pub fn tarball_decompress( Ok(state) } +/// Compress a tarball (.tar.gz). +pub fn tarball_compress<'a, R: Read>( + dest: impl Write, + files: impl IntoIterator, +) -> RResult<(), AnyErr> { + let mut tar_src = Vec::new(); + { + let mut tar = tar::Builder::new(&mut tar_src); + for (path, mut data) in files { + let mut buf = vec![]; + data.read_to_end(&mut buf).change_context(AnyErr)?; + let mut header = tar::Header::new_gnu(); + header.set_size(buf.len() as u64); + tar.append_data(&mut header, path, std::io::Cursor::new(buf)) + .change_context(AnyErr)?; + } + tar.finish().change_context(AnyErr)?; + } + let mut gz = flate2::write::GzEncoder::new(dest, flate2::Compression::default()); + gz.write_all(&tar_src).change_context(AnyErr)?; + + Ok(()) +} + /// A decompressed file from a tarball wrapped in a simpler interface. /// Implements [`std::io::Read`] for easy lazy reading. pub struct TarballFile<'a, R: 'a + std::io::Read>(tar::Entry<'a, flate2::read::GzDecoder>); @@ -47,3 +71,63 @@ impl<'a, R: 'a + std::io::Read> TarballFile<'a, R> { self.0.path().change_context(AnyErr) } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + + #[test] + fn test_tarball() -> RResult<(), AnyErr> { + // Create using compress function, checking: + // - multiple files + // - files in directories + let mut tarball = Vec::new(); + tarball_compress( + &mut tarball, + [ + (Path::new("foo.txt"), "foo".as_bytes()), + (Path::new("bar.txt"), "bar".as_bytes()), + (Path::new("nested/ree.txt"), "ree".as_bytes()), + ], + )?; + + // Decompress using decompress function, checking: + // - multiple files + // - files in directories + let mut files = HashMap::new(); + tarball_decompress(&tarball[..], (), |mut looper| { + let mut buf = vec![]; + looper + .value_mut() + .read_to_end(&mut buf) + .change_context(AnyErr)?; + files.insert(looper.value().path()?.to_string_lossy().to_string(), buf); + Ok(looper) + })?; + assert_eq!(files.len(), 3, "{:#?}", files); + assert!(files.contains_key("foo.txt"), "{:#?}", files); + assert!(files.contains_key("bar.txt"), "{:#?}", files); + assert!(files.contains_key("nested/ree.txt"), "{:#?}", files); + assert_eq!(files["foo.txt"], b"foo", "{:#?}", files); + assert_eq!(files["bar.txt"], b"bar", "{:#?}", files); + assert_eq!(files["nested/ree.txt"], b"ree", "{:#?}", files); + + // Confirm early exit works, call stop_early(), meaning only 1 file should be output: + let mut files = HashMap::new(); + tarball_decompress(&tarball[..], (), |mut looper| { + looper.stop_early(); + let mut buf = vec![]; + looper + .value_mut() + .read_to_end(&mut buf) + .change_context(AnyErr)?; + files.insert(looper.value().path()?.to_string_lossy().to_string(), buf); + Ok(looper) + })?; + assert_eq!(files.len(), 1, "{:#?}", files); + + Ok(()) + } +}