diff --git a/src/coordinator.rs b/src/coordinator.rs index 3e583e2..8d1201f 100644 --- a/src/coordinator.rs +++ b/src/coordinator.rs @@ -7,7 +7,6 @@ use std::sync::{Arc, Mutex}; use std::{thread, time}; use bitcoin::hashes::{hex::FromHex, sha256d}; -use futures::sync::oneshot; use crate::challenger::ChallengeResponse; use crate::config::Config; @@ -73,9 +72,7 @@ pub fn run_request( let (verify_tx, verify_rx): (Sender, Receiver) = channel(); // start listener along with a oneshot channel to send shutdown message - let (thread_tx, thread_rx) = oneshot::channel(); - let verify_handle = - ::listener::run_listener(&config.listener_host, shared_challenge.clone(), verify_tx, thread_rx); + let listener_handle = ::listener::run_listener(&config.listener_host, shared_challenge.clone(), verify_tx); // run challenge request storing expected responses ::challenger::run_challenge_request( @@ -90,9 +87,7 @@ pub fn run_request( time::Duration::from_secs(config.block_time / 2), )?; - // stop listener service - thread_tx.send(()).expect("thread_tx send failed"); - verify_handle.join().expect("verify_handle join failed"); + listener_handle.stop(); // try stop listener service return Ok(Some(shared_challenge.lock().unwrap().request.txid)); } diff --git a/src/interfaces/storage.rs b/src/interfaces/storage.rs index 09d1ff8..c58b350 100644 --- a/src/interfaces/storage.rs +++ b/src/interfaces/storage.rs @@ -47,7 +47,9 @@ pub trait Storage { /// Database implementation of Storage trait pub struct MongoStorage { + /// mongo db connection instance db: Mutex, + /// db config for reconnecting config: StorageConfig, } diff --git a/src/listener.rs b/src/listener.rs index 8676f09..e8c4ee1 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -21,6 +21,7 @@ use serde_json::{self, Value}; use crate::challenger::{ChallengeResponse, ChallengeState}; use crate::error::Result; use crate::interfaces::bid::Bid; +use crate::util::handler::Handle; /// Messsage type for challenge proofs sent by guardnodes #[derive(Debug)] @@ -152,8 +153,7 @@ pub fn run_listener( listener_host: &String, challenge: Arc>, ch_resp: Sender, - ch_recv: oneshot::Receiver<()>, -) -> thread::JoinHandle<()> { +) -> Handle { let addr: Vec<_> = listener_host .to_socket_addrs() .expect("Unable to resolve domain") @@ -165,14 +165,19 @@ pub fn run_listener( service_fn(move |req: Request| handle(req, challenge.clone(), challenge_resp.clone())) }; + let (tx, rx) = oneshot::channel(); let server = Server::bind(&addr[0]) .serve(listener_service) - .with_graceful_shutdown(ch_recv) + .with_graceful_shutdown(rx) .map_err(|e| error!("listener error: {}", e)); - thread::spawn(move || { - rt::run(server); - }) + Handle::new( + tx, + thread::spawn(move || { + rt::run(server); + }), + "LISTENER", + ) } #[cfg(test)] diff --git a/src/util/handler.rs b/src/util/handler.rs new file mode 100644 index 0000000..fa92b46 --- /dev/null +++ b/src/util/handler.rs @@ -0,0 +1,34 @@ +//! # Handler +//! +//! Error/kill handler for sub threads and services + +use std::thread; + +use futures::sync::oneshot; + +/// Handler struct responsible for sending a stop signal to a service and +/// joining a thread back to the main thread +pub struct Handle<'a> { + /// Channel to send kill signal + tx: oneshot::Sender<()>, + /// Service thread handler + thread: thread::JoinHandle<()>, + /// Service name + name: &'a str, +} + +impl<'a> Handle<'a> { + /// Return new handle instance + pub fn new(tx: oneshot::Sender<()>, thread: thread::JoinHandle<()>, name: &str) -> Handle { + Handle { tx, thread, name } + } + + /// Handle sending a stop signal to the service and joining the service + /// thread + pub fn stop(self) { + self.tx + .send(()) + .expect(&format!("failed sending shutdown signal to {}", self.name)); + self.thread.join().expect(&format!("{} join failed", self.name)); + } +} diff --git a/src/util/mod.rs b/src/util/mod.rs index 29b49bf..25aad58 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -4,6 +4,7 @@ pub mod checks; pub mod doc_format; +pub mod handler; pub mod ocean; #[cfg(test)] pub mod testing;