From ef81c0480c809cc053784a5eb1d089b3fbefa345 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 23 Aug 2023 09:21:27 +0530 Subject: [PATCH] refactor: `TunshellClient` --- uplink/src/collector/tunshell.rs | 88 +++++++++++++++----------------- uplink/src/lib.rs | 6 +-- 2 files changed, 45 insertions(+), 49 deletions(-) diff --git a/uplink/src/collector/tunshell.rs b/uplink/src/collector/tunshell.rs index 89de42c7..4019ceb2 100644 --- a/uplink/src/collector/tunshell.rs +++ b/uplink/src/collector/tunshell.rs @@ -1,15 +1,23 @@ -use std::sync::Arc; - use log::error; use serde::{Deserialize, Serialize}; use tokio_compat_02::FutureExt; use tunshell_client::{Client, ClientMode, Config, HostShell}; use crate::{ - base::{self, bridge::BridgeTx, ActionRoute}, - ActionResponse, + base::{bridge::BridgeTx, ActionRoute}, + Action, ActionResponse, }; +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Failed to deserialize keys. Error = {0}")] + Serde(#[from] serde_json::Error), + #[error("TunshellClient client Error = {0}")] + TunshellClient(#[from] anyhow::Error), + #[error("TunshellClient exited with unexpected status: {0}")] + UnexpectedStatus(u8), +} + #[derive(Debug, Serialize, Deserialize)] pub struct Keys { session: String, @@ -17,15 +25,14 @@ pub struct Keys { encryption: String, } -pub struct TunshellSession { - _config: Arc, - echo_stdout: bool, +#[derive(Debug, Clone)] +pub struct TunshellClient { bridge: BridgeTx, } -impl TunshellSession { - pub fn new(config: Arc, echo_stdout: bool, bridge: BridgeTx) -> Self { - Self { _config: config, echo_stdout, bridge } +impl TunshellClient { + pub fn new(bridge: BridgeTx) -> Self { + Self { bridge } } fn config(&self, keys: Keys) -> Config { @@ -37,7 +44,7 @@ impl TunshellSession { 443, &keys.encryption, true, - self.echo_stdout, + false, ) } @@ -47,45 +54,34 @@ impl TunshellSession { let actions_rx = self.bridge.register_action_route(route).await; while let Ok(action) = actions_rx.recv_async().await { - let action_id = action.action_id.clone(); - - // println!("{:?}", keys); - let keys = match serde_json::from_str(&action.payload) { - Ok(k) => k, - Err(e) => { - error!("Failed to deserialize keys. Error = {:?}", e); - let status = ActionResponse::failure(&action_id, "corruptkeys".to_owned()); - self.bridge.send_action_response(status).await; - continue; + let session = self.clone(); + //TODO(RT): Findout why this is spawned. We want to send other action's with shell? + tokio::spawn(async move { + if let Err(e) = session.session(&action).await { + error!("{}", e.to_string()); + let status = ActionResponse::failure(&action.action_id, e.to_string()); + session.bridge.send_action_response(status).await; } - }; + }); + } + } - let mut client = Client::new(self.config(keys), HostShell::new().unwrap()); - let status_tx = self.bridge.clone(); + async fn session(&self, action: &Action) -> Result<(), Error> { + let action_id = action.action_id.clone(); - //TODO(RT): Findout why this is spawned. We want to send other action's with shell? - tokio::spawn(async move { - let response = ActionResponse::progress(&action_id, "ShellSpawned", 90); - status_tx.send_action_response(response).await; + // println!("{:?}", keys); + let keys = serde_json::from_str(&action.payload)?; + let mut client = Client::new(self.config(keys), HostShell::new().unwrap()); - match client.start_session().compat().await { - Ok(status) => { - if status != 0 { - let response = ActionResponse::failure(&action_id, status.to_string()); - status_tx.send_action_response(response).await; - } else { - log::info!("tunshell exited with status: {}", status); - let response = ActionResponse::success(&action_id); - status_tx.send_action_response(response).await; - } - } - Err(e) => { - log::warn!("tunshell client error: {}", e); - let response = ActionResponse::failure(&action_id, e.to_string()); - status_tx.send_action_response(response).await; - } - }; - }); + let response = ActionResponse::progress(&action_id, "ShellSpawned", 90); + self.bridge.send_action_response(response).await; + + let status = client.start_session().compat().await?; + if status != 0 { + Err(Error::UnexpectedStatus(status)) + } else { + log::info!("Tunshell session ended successfully"); + Ok(()) } } } diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index 39aebdc4..f4775baa 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -55,7 +55,7 @@ use collector::installer::OTAInstaller; use collector::process::ProcessHandler; use collector::script_runner::ScriptRunner; use collector::systemstats::StatCollector; -use collector::tunshell::TunshellSession; +use collector::tunshell::TunshellClient; use flume::{bounded, Receiver, RecvError, Sender}; use log::error; @@ -392,8 +392,8 @@ impl Uplink { }) }); - let tunshell_session = TunshellSession::new(config.clone(), false, bridge_tx.clone()); - thread::spawn(move || tunshell_session.start()); + let tunshell_client = TunshellClient::new(bridge_tx.clone()); + thread::spawn(move || tunshell_client.start()); let file_downloader = FileDownloader::new(config.clone(), bridge_tx.clone())?; thread::spawn(move || file_downloader.start());