Skip to content

Commit

Permalink
refactor: TunshellSession ~> TunshellClient (#273)
Browse files Browse the repository at this point in the history
refactor: `TunshellClient`
  • Loading branch information
Devdutt Shenoi authored Aug 23, 2023
1 parent 67dff9f commit a586c1d
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 49 deletions.
88 changes: 42 additions & 46 deletions uplink/src/collector/tunshell.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,38 @@
use std::sync::Arc;

use log::error;
use serde::{Deserialize, Serialize};
use tokio_compat_02::FutureExt;
use tunshell_client::{Client, ClientMode, Config, HostShell};

use crate::{
base::{self, bridge::BridgeTx, ActionRoute},
ActionResponse,
base::{bridge::BridgeTx, ActionRoute},
Action, ActionResponse,
};

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Failed to deserialize keys. Error = {0}")]
Serde(#[from] serde_json::Error),
#[error("TunshellClient client Error = {0}")]
TunshellClient(#[from] anyhow::Error),
#[error("TunshellClient exited with unexpected status: {0}")]
UnexpectedStatus(u8),
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Keys {
session: String,
relay: String,
encryption: String,
}

pub struct TunshellSession {
_config: Arc<base::Config>,
echo_stdout: bool,
#[derive(Debug, Clone)]
pub struct TunshellClient {
bridge: BridgeTx,
}

impl TunshellSession {
pub fn new(config: Arc<base::Config>, echo_stdout: bool, bridge: BridgeTx) -> Self {
Self { _config: config, echo_stdout, bridge }
impl TunshellClient {
pub fn new(bridge: BridgeTx) -> Self {
Self { bridge }
}

fn config(&self, keys: Keys) -> Config {
Expand All @@ -37,7 +44,7 @@ impl TunshellSession {
443,
&keys.encryption,
true,
self.echo_stdout,
false,
)
}

Expand All @@ -47,45 +54,34 @@ impl TunshellSession {
let actions_rx = self.bridge.register_action_route(route).await;

while let Ok(action) = actions_rx.recv_async().await {
let action_id = action.action_id.clone();

// println!("{:?}", keys);
let keys = match serde_json::from_str(&action.payload) {
Ok(k) => k,
Err(e) => {
error!("Failed to deserialize keys. Error = {:?}", e);
let status = ActionResponse::failure(&action_id, "corruptkeys".to_owned());
self.bridge.send_action_response(status).await;
continue;
let session = self.clone();
//TODO(RT): Findout why this is spawned. We want to send other action's with shell?
tokio::spawn(async move {
if let Err(e) = session.session(&action).await {
error!("{}", e.to_string());
let status = ActionResponse::failure(&action.action_id, e.to_string());
session.bridge.send_action_response(status).await;
}
};
});
}
}

let mut client = Client::new(self.config(keys), HostShell::new().unwrap());
let status_tx = self.bridge.clone();
async fn session(&self, action: &Action) -> Result<(), Error> {
let action_id = action.action_id.clone();

//TODO(RT): Findout why this is spawned. We want to send other action's with shell?
tokio::spawn(async move {
let response = ActionResponse::progress(&action_id, "ShellSpawned", 90);
status_tx.send_action_response(response).await;
// println!("{:?}", keys);
let keys = serde_json::from_str(&action.payload)?;
let mut client = Client::new(self.config(keys), HostShell::new().unwrap());

match client.start_session().compat().await {
Ok(status) => {
if status != 0 {
let response = ActionResponse::failure(&action_id, status.to_string());
status_tx.send_action_response(response).await;
} else {
log::info!("tunshell exited with status: {}", status);
let response = ActionResponse::success(&action_id);
status_tx.send_action_response(response).await;
}
}
Err(e) => {
log::warn!("tunshell client error: {}", e);
let response = ActionResponse::failure(&action_id, e.to_string());
status_tx.send_action_response(response).await;
}
};
});
let response = ActionResponse::progress(&action_id, "ShellSpawned", 90);
self.bridge.send_action_response(response).await;

let status = client.start_session().compat().await?;
if status != 0 {
Err(Error::UnexpectedStatus(status))
} else {
log::info!("Tunshell session ended successfully");
Ok(())
}
}
}
6 changes: 3 additions & 3 deletions uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use collector::installer::OTAInstaller;
use collector::process::ProcessHandler;
use collector::script_runner::ScriptRunner;
use collector::systemstats::StatCollector;
use collector::tunshell::TunshellSession;
use collector::tunshell::TunshellClient;
use flume::{bounded, Receiver, RecvError, Sender};
use log::error;

Expand Down Expand Up @@ -392,8 +392,8 @@ impl Uplink {
})
});

let tunshell_session = TunshellSession::new(config.clone(), false, bridge_tx.clone());
thread::spawn(move || tunshell_session.start());
let tunshell_client = TunshellClient::new(bridge_tx.clone());
thread::spawn(move || tunshell_client.start());

let file_downloader = FileDownloader::new(config.clone(), bridge_tx.clone())?;
thread::spawn(move || file_downloader.start());
Expand Down

0 comments on commit a586c1d

Please sign in to comment.