diff --git a/Cargo.lock b/Cargo.lock index 01e75f5..0653a2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -491,16 +491,15 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "4.1.1" +version = "4.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e89b8c6a2e4b1f45971ad09761aafb85514a84744b67a95e32c3cc1352d1f65c" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" dependencies = [ "cfg-if", "cpufeatures", "curve25519-dalek-derive", "digest", "fiat-crypto", - "platforms", "rustc_version", "subtle", "zeroize", @@ -1340,12 +1339,6 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" -[[package]] -name = "platforms" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14e6ab3f592e6fb464fc9712d8d6e6912de6473954635fd76a589d832cffcbb0" - [[package]] name = "poly1305" version = "0.8.0" @@ -1822,6 +1815,7 @@ dependencies = [ "log", "openssl", "path-absolutize", + "russh", "semver 1.0.20", "serde", "serde_json", @@ -2340,6 +2334,6 @@ dependencies = [ [[package]] name = "zeroize" -version = "1.6.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" +checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" diff --git a/Cargo.toml b/Cargo.toml index f3e97ff..65e8702 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,3 +34,4 @@ fs2 = "0.4.3" log = "0.4.20" handlebars = "5.1.2" cron = "0.12.1" +russh = "0.38.0" diff --git a/README.md b/README.md index dc3500f..e1053d4 100644 --- a/README.md +++ b/README.md @@ -155,11 +155,12 @@ sudo apt-get install -y gcc make libssl-dev pkg-config - [ ] Remove - [x] List - [ ] Store manifest in store so CNI plugin can get access - - [ ] Fix pod naming to avoid collisions + - [x] Fix pod naming to avoid collisions - Deployments - [x] Apply - [x] Remove - [x] List + - [ ] Logs - [x] Output matches kubectl - Daemonsets - [x] Apply diff --git a/src/lib.rs b/src/lib.rs index 04d3ff1..273dd1f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,7 @@ mod executor; mod describe; mod filestore; mod cron; +mod logs; pub use skate::skate; pub use skatelet::skatelet; \ No newline at end of file diff --git a/src/logs.rs b/src/logs.rs new file mode 100644 index 0000000..47719e1 --- /dev/null +++ b/src/logs.rs @@ -0,0 +1,69 @@ +use std::error::Error; +use std::process; +use std::process::Stdio; +use anyhow::anyhow; +use clap::Args; +use futures::stream::FuturesUnordered; +use crate::config::Config; +use crate::create::CreateCommands; +use crate::skate::ConfigFileArgs; +use crate::ssh; +use futures::StreamExt; + +#[derive(Debug, Args)] +#[command(arg_required_else_help(true))] +pub struct LogArgs { + #[command(flatten)] + config: ConfigFileArgs, + #[arg(short, long, long_help = "Specify if the logs should be streamed.")] + pub follow: bool, + #[arg( + short, default_value_t = - 1, long, long_help = "Lines of recent log file to display. Defaults to -1." + )] + pub tail: i32, + #[arg(long, short, long_help = "Filter by resource namespace")] + namespace: Option, + #[arg(trailing_var_arg = true, name = "POD | TYPE/NAME")] + var_args: Vec, +} + +pub async fn logs(args: LogArgs) -> Result<(), Box> { + + let config = Config::load(Some(args.config.skateconfig.clone()))?; + let (conns, errors) = ssh::cluster_connections(config.current_cluster()?).await; + + if errors.is_some() { + eprintln!("{}", errors.as_ref().unwrap()) + } + + if conns.is_none() { + if errors.is_some() { + return Err(anyhow!(errors.unwrap().to_string()).into()); + } + println!("No connections found"); + return Ok(()); + } + + let conns = conns.unwrap(); + + let name = args.var_args.first(); + if name.is_none() { + return Err("No resource name provided".into()); + } + + let name = name.unwrap(); + let ns = args.namespace.unwrap_or("default".to_string()); + + let cmd = format!("sudo podman logs {}", name); + let fut: FuturesUnordered<_> = conns.clients.iter().map(|c| c.execute_stdout(&cmd)).collect(); + + let result: Vec<_> = fut.collect().await; + + for res in result { + match res { + Err(e) => eprintln!("{}", e), + _ => {} + } + } + Ok(()) +} \ No newline at end of file diff --git a/src/skate.rs b/src/skate.rs index ca0bbdd..c6207de 100644 --- a/src/skate.rs +++ b/src/skate.rs @@ -35,6 +35,7 @@ use crate::create::{create, CreateArgs}; use crate::delete::{delete, DeleteArgs}; use crate::get::{get, GetArgs}; use crate::describe::{DescribeArgs, describe}; +use crate::logs::{LogArgs, logs}; use crate::skate::Distribution::{Debian, Raspbian, Ubuntu, Unknown}; use crate::skate::Os::{Darwin, Linux}; use crate::ssh::SshClient; @@ -57,6 +58,7 @@ enum Commands { Refresh(RefreshArgs), Get(GetArgs), Describe(DescribeArgs), + Logs(LogArgs) } #[derive(Debug, Clone, Args)] @@ -78,6 +80,7 @@ pub async fn skate() -> Result<(), Box> { Commands::Refresh(args) => refresh(args).await, Commands::Get(args) => get(args).await, Commands::Describe(args) => describe(args).await, + Commands::Logs(args) => logs(args).await, _ => Ok(()) } } diff --git a/src/ssh.rs b/src/ssh.rs index 841e129..8959882 100644 --- a/src/ssh.rs +++ b/src/ssh.rs @@ -1,6 +1,8 @@ use std::error::Error; +use russh; use std::fmt; use std::fmt::{Debug, Formatter}; +use std::process::Stdio; use std::time::Duration; use anyhow::anyhow; use async_ssh2_tokio::{AuthMethod, ServerCheckMethod}; @@ -242,6 +244,41 @@ echo ovs=$(cat /tmp/ovs-$$); } } + pub async fn execute_stdout(self: &SshClient, cmd: &str) -> Result<(), Box> { + let mut ch = self.client.get_channel().await?; + let _ = ch.exec(true, cmd).await?; + + let mut result: Option<_> = None; + + while let Some(msg) = ch.wait().await { + //dbg!(&msg); + match msg { + // If we get data, add it to the buffer + russh::ChannelMsg::Data { ref data } => print!("{}", &String::from_utf8_lossy(&data.to_vec())), + russh::ChannelMsg::ExtendedData { ref data, ext } => { + if ext == 1 { + eprint!("{}", &String::from_utf8_lossy(&data.to_vec())) + } + } + // If we get an exit code report, store it, but crucially don't + // assume this message means end of communications. The data might + // not be finished yet! + russh::ChannelMsg::ExitStatus { exit_status } => result = Some(exit_status), + + // We SHOULD get this EOF messagge, but 4254 sec 5.3 also permits + // the channel to close without it being sent. And sometimes this + // message can even precede the Data message, so don't handle it + // russh::ChannelMsg::Eof => break, + _ => {} + } + } + + if result.is_none() || result == Some(0) { + return Ok(()); + } + Err(anyhow!("exit status {}", result.unwrap()).into()) + } + pub async fn execute(self: &SshClient, cmd: &str) -> Result> { cmd.lines().for_each(|l| println!("{} | > {}", self.node_name, l.green())); let result = self.client.execute(cmd).await.