Skip to content

Commit

Permalink
feat(rust): handle ctrlc signal on foreground nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
phillyphil91 authored and adrianbenavides committed Apr 2, 2023
1 parent e899dee commit a111ea9
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 62 deletions.
84 changes: 47 additions & 37 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions implementations/rust/ockam/ockam_command/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ ockam_multiaddr = { path = "../ockam_multiaddr", version = "0.17.0", features =
ockam_vault = { path = "../ockam_vault", version = "^0.73.0", features = ["storage", "aws", "rustcrypto"] }
ockam_core = { path = "../ockam_core", version = "^0.77.0" }
ockam_identity = { path = "../ockam_identity", version = "^0.71.0" }
ctrlc = { version = "3.2.5" , features = ["termination"] }

[dev-dependencies]
assert_cmd = "2"
Expand Down
65 changes: 40 additions & 25 deletions implementations/rust/ockam/ockam_command/src/node/create.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use clap::Args;
use colorful::Colorful;
use ockam_identity::PublicIdentity;
use ockam_multiaddr::MultiAddr;
use ockam_vault::Vault;
use rand::prelude::random;
use tokio::io::AsyncBufReadExt;
use tokio::time::{sleep, Duration};

use anyhow::{anyhow, Context as _};
use std::io::{self, Read};
use std::{
net::{IpAddr, SocketAddr},
path::PathBuf,
Expand Down Expand Up @@ -214,7 +215,7 @@ fn create_foreground_node(opts: &CommandGlobalOpts, cmd: &CreateCommand) -> crat

async fn run_foreground_node(
mut ctx: Context,
(opts, cmd, addr): (CommandGlobalOpts, CreateCommand, SocketAddr),
(mut opts, cmd, addr): (CommandGlobalOpts, CreateCommand, SocketAddr),
) -> crate::Result<()> {
let cfg = &opts.config;
let node_name = parse_node_name(&cmd.node_name)?;
Expand Down Expand Up @@ -351,8 +352,44 @@ async fn run_foreground_node(
}
}

// Create a channel for communicating back to the main thread
let (tx, mut rx) = tokio::sync::mpsc::channel(2);

// Register a handler for SIGINT, SIGTERM, SIGHUP
let tx_clone = tx.clone();
let mut opts_clone = opts.clone();
ctrlc::set_handler(move || {
let _ = tx_clone.blocking_send(());
let _ = opts_clone
.shell
.write_line(format!("{} Ctrl+C signal received", "!".light_yellow()).as_str());
})
.expect("Error setting Ctrl+C handler");

// Spawn a thread to monitor STDIN for EOF
if cmd.exit_on_eof {
stop_node_on_eof(&mut ctx, &opts, &node_name).await?;
let tx_clone = tx.clone();
let mut opts_clone = opts.clone();
std::thread::spawn(move || {
let mut buffer = Vec::new();
let mut handle = io::stdin().lock();
handle
.read_to_end(&mut buffer)
.expect("Error reading from stdin");
let _ = tx_clone.blocking_send(());
let _ = opts_clone
.shell
.write_line(format!("{} EOF received", "!".light_yellow()).as_str());
});
}

// Shutdown on SIGINT, SIGTERM, SIGHUP or EOF
if rx.recv().await.is_some() {
opts.state.nodes.get(&node_name)?.kill_process(false)?;
ctx.stop().await?;
opts.shell
.write_line(format!("{}Node stopped successfully", "✔︎".light_green()).as_str())
.unwrap();
}

Ok(())
Expand All @@ -373,28 +410,6 @@ pub fn load_pre_trusted_identities(cmd: &CreateCommand) -> Result<Option<PreTrus
Ok(pre_trusted_identities)
}

// Read STDIN until EOF is encountered and then stop the node
async fn stop_node_on_eof(
ctx: &mut Context,
opts: &CommandGlobalOpts,
node_name: &str,
) -> crate::Result<()> {
let reader = tokio::io::BufReader::new(tokio::io::stdin());
let mut lines = reader.lines();

loop {
match lines.next_line().await {
Ok(Some(_)) => (),
Ok(None) => break,
Err(_) => unreachable!(),
}
}

ctx.stop().await?;
opts.state.nodes.get(node_name)?.kill_process(false)?;
Ok(())
}

async fn start_services(
ctx: &Context,
tcp: &TcpTransport,
Expand Down

0 comments on commit a111ea9

Please sign in to comment.