Skip to content

Commit

Permalink
stdout and stderr realtime logic elevated into own feature
Browse files Browse the repository at this point in the history
  • Loading branch information
zakstucke committed Aug 12, 2024
1 parent 14d146e commit e2413e8
Show file tree
Hide file tree
Showing 12 changed files with 368 additions and 142 deletions.
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ gloo-timers = { version = "0.3", features = ["futures"] }
# This includes threading (non-blocking stuff that can't be used in wasm)
tracing-appender = '0.2'
hostname = "0.3.1"
tokio = { version = '1', features = ["time", "sync", "fs", "process"] }
tokio = { version = '1', features = ["time", "sync", "fs", "process", "rt", "io-util"] }
named-lock = "0.4"

[dev-dependencies]
Expand Down
3 changes: 3 additions & 0 deletions rust/bitbazaar/command/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod spawn_builder;

pub use spawn_builder::*;
189 changes: 189 additions & 0 deletions rust/bitbazaar/command/spawn_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::{future::Future, process::Stdio, sync::Arc};

use futures::future::BoxFuture;

/// Extension trait for getting the spawn extension builder from both std and tokio commands.
pub trait CmdSpawnExt {
/// The type of the spawn extension builder.
type Spawner<'a>
where
Self: 'a;

/// Get a spawn extension builder that provides the ability to:
/// - Listen to stdout and/or stderr line by line in real-time via callbacks.
fn spawn_builder(&mut self) -> Self::Spawner<'_>;
}

impl CmdSpawnExt for std::process::Command {
type Spawner<'a> = CmdSpawnBuilderSync<'a>;

fn spawn_builder(&mut self) -> Self::Spawner<'_> {
CmdSpawnBuilderSync {
command: self,
on_stdout: None,
on_stderr: None,
}
}
}

impl CmdSpawnExt for tokio::process::Command {
type Spawner<'a> = CmdSpawnBuilderAsync<'a>;

fn spawn_builder(&mut self) -> Self::Spawner<'_> {
CmdSpawnBuilderAsync {
command: self,
on_stdout: None,
on_stderr: None,
}
}
}

/// Synchronous command spawn extension builder. For [`std::process::Command`].
pub struct CmdSpawnBuilderSync<'a> {
command: &'a mut std::process::Command,
on_stdout: Option<Box<dyn Fn(String) + Sync + Send + 'static>>,
on_stderr: Option<Arc<Box<dyn Fn(String) + Sync + Send + 'static>>>,
}

impl<'a> CmdSpawnBuilderSync<'a> {
/// Set a callback to be called for each line of stdout.
pub fn on_stdout(mut self, on_stdout: impl Fn(String) + Sync + Send + 'static) -> Self {
self.on_stdout = Some(Box::new(on_stdout));
self.command.stdout(Stdio::piped());
self
}

/// Set a callback to be called for each line of stderr.
pub fn on_stderr(mut self, on_stderr: impl Fn(String) + Sync + Send + 'static) -> Self {
self.on_stderr = Some(Arc::new(Box::new(on_stderr)));
self.command.stderr(Stdio::piped());
self
}

/// Spawn the command.
pub fn spawn(self) -> std::io::Result<std::process::Child> {
let mut child = self.command.spawn()?;

use std::io::BufRead;

// Capture and print stdout in a separate thread
if let Some(on_stdout) = self.on_stdout {
let on_stderr = self.on_stderr.as_ref().map(|on_stderr| on_stderr.clone());
if let Some(stdout) = child.stdout.take() {
let stdout_reader = std::io::BufReader::new(stdout);
std::thread::spawn(move || {
for line in stdout_reader.lines() {
match line {
Ok(line) => on_stdout(line),
Err(e) => {
let msg = format!("Error reading stdout: {:?}", e);
if let Some(on_stderr) = on_stderr.as_ref() {
on_stderr(msg);
} else {
on_stdout(msg);
}
}
}
}
});
}
}

// Capture and print stderr in a separate thread
if let Some(on_stderr) = self.on_stderr {
if let Some(stderr) = child.stderr.take() {
let stderr_reader = std::io::BufReader::new(stderr);
std::thread::spawn(move || {
for line in stderr_reader.lines() {
match line {
Ok(line) => on_stderr(line),
Err(e) => on_stderr(format!("Error reading stderr: {:?}", e)),
}
}
});
}
}

Ok(child)
}
}

/// Asynchronous command spawn extension builder. For [`tokio::process::Command`].
pub struct CmdSpawnBuilderAsync<'a> {
command: &'a mut tokio::process::Command,
on_stdout: Option<Box<dyn Fn(String) -> BoxFuture<'static, ()> + Send + 'static>>,
on_stderr: Option<Box<dyn Fn(String) -> BoxFuture<'static, ()> + Send + 'static>>,
}

impl<'a> CmdSpawnBuilderAsync<'a> {
/// Set a callback to be called for each line of stdout.
pub fn on_stdout<Fut: Future<Output = ()> + Send + 'static>(
mut self,
on_stdout: impl Fn(String) -> Fut + Send + 'static,
) -> Self {
self.on_stdout = Some(Box::new(move |s| Box::pin(on_stdout(s))));
self.command.stdout(Stdio::piped());
self
}

/// Set a callback to be called for each line of stderr.
pub fn on_stderr<Fut: Future<Output = ()> + Send + 'static>(
mut self,
on_stderr: impl Fn(String) -> Fut + Send + 'static,
) -> Self {
self.on_stderr = Some(Box::new(move |s| Box::pin(on_stderr(s))));
self.command.stderr(Stdio::piped());
self
}

/// Spawn the command.
pub fn spawn(self) -> std::io::Result<tokio::process::Child> {
use tokio::io::AsyncBufReadExt;

let mut child = self.command.spawn()?;

// Capture and print stdout in a separate thread
if let Some(on_stdout) = self.on_stdout {
if let Some(stdout) = child.stdout.take() {
let stdout_reader = tokio::io::BufReader::new(stdout);
tokio::spawn(async move {
let mut lines = stdout_reader.lines();
loop {
match lines.next_line().await {
Ok(v) => match v {
Some(line) => on_stdout(line).await,
None => break,
},
Err(e) => {
on_stdout(format!("Error reading stdout: {:?}", e)).await;
}
}
}
});
}
}

// Capture and print stderr in a separate thread
if let Some(on_stderr) = self.on_stderr {
if let Some(stderr) = child.stderr.take() {
let stderr_reader = tokio::io::BufReader::new(stderr);
tokio::spawn(async move {
let mut lines = stderr_reader.lines();
loop {
match lines.next_line().await {
Ok(v) => match v {
Some(line) => on_stderr(line).await,
None => break,
},
Err(e) => on_stderr(format!("Error reading stderr: {:?}", e)).await,
}
}
});
}
}

Ok(child)
}
}

// TESTING: implicitly tested during log tests which use it to extract logs I think.
41 changes: 41 additions & 0 deletions rust/bitbazaar/file/chmod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::path::Path;

/// Chmod an open file, noop on Windows.
///
/// ENTER AS HEX!
///
/// E.g. `0o755` rather than `755`.
pub fn chmod_sync(mode: u32, filepath: &Path) -> Result<(), std::io::Error> {
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(filepath, std::fs::Permissions::from_mode(mode))?;
}
Ok(())
}

/// Good default, chmod an open file to be executable by all, writeable by owner.
pub fn chmod_executable_sync(filepath: &Path) -> Result<(), std::io::Error> {
chmod_sync(0o755, filepath)
}

/// Chmod an open file, noop on Windows.
///
/// ENTER AS HEX!
///
/// E.g. `0o755` rather than `755`.
pub async fn chmod_async(mode: u32, filepath: &Path) -> Result<(), std::io::Error> {
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
tokio::fs::set_permissions(&filepath, std::fs::Permissions::from_mode(mode)).await?;
}
Ok(())
}

/// Good default (755).
///
/// chmod an open file to be executable by all, writeable by owner.
pub async fn chmod_executable_async(filepath: &Path) -> Result<(), std::io::Error> {
chmod_async(0o755, filepath).await
}
3 changes: 3 additions & 0 deletions rust/bitbazaar/file/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod chmod;

pub use chmod::*;
8 changes: 8 additions & 0 deletions rust/bitbazaar/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ mod prelude;
#[cfg(feature = "cli")]
/// Command line interface utilities.
pub mod cli;
/// OS Command related utilities. Much lighter weight than cli module (which will probably be deprecated as badly written and not maintainable).
/// Not applicable to wasm
#[cfg(not(target_arch = "wasm32"))]
pub mod command;

/// Chrono utilities
pub mod chrono;
Expand All @@ -23,6 +27,10 @@ pub mod cookies;
pub mod crypto;
/// Error handling utilities.
pub mod errors;

/// File related utilities. Therefore disabled on wasm.
#[cfg(not(target_arch = "wasm32"))]
pub mod file;
/// Logging utilities
pub mod log;
/// Completely miscellaneous utilities
Expand Down
4 changes: 1 addition & 3 deletions rust/bitbazaar/log/global_log/setup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::HashMap;

use tracing::{Dispatch, Level, Metadata, Subscriber};
use tracing_subscriber::{filter::FilterFn, layer::SubscriberExt, registry::LookupSpan, Layer};

Expand Down Expand Up @@ -222,7 +220,7 @@ pub fn builder_into_global_log(builder: GlobalLogBuilder) -> RResult<GlobalLog,
if let Some(endpoint) = otlp.http_endpoint {
use opentelemetry_otlp::{new_exporter, WithExportConfig};

let header_map: HashMap<String, String> =
let header_map: std::collections::HashMap<String, String> =
otlp.http_headers.iter().cloned().collect();
let get_exporter = || {
new_exporter()
Expand Down
2 changes: 2 additions & 0 deletions rust/bitbazaar/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,15 @@ mod tests {
move |stdout| {
println!("{}", stdout);
records.lock().push(stdout);
async {}
}
},
{
let records = records.clone();
move |stderr| {
println!("{}", stderr);
records.lock().push(stderr);
async {}
}
},
)
Expand Down
Loading

0 comments on commit e2413e8

Please sign in to comment.