Skip to content

Commit

Permalink
Custom thread Handle for sub-thread services
Browse files Browse the repository at this point in the history
  • Loading branch information
nkostoulas committed Dec 11, 2019
1 parent 8a2e291 commit 98faa22
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 13 deletions.
9 changes: 2 additions & 7 deletions src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::sync::{Arc, Mutex};
use std::{thread, time};

use bitcoin::hashes::{hex::FromHex, sha256d};
use futures::sync::oneshot;

use crate::challenger::ChallengeResponse;
use crate::config::Config;
Expand Down Expand Up @@ -73,9 +72,7 @@ pub fn run_request<T: Service, K: ClientChain, D: Storage>(
let (verify_tx, verify_rx): (Sender<ChallengeResponse>, Receiver<ChallengeResponse>) = channel();

// start listener along with a oneshot channel to send shutdown message
let (thread_tx, thread_rx) = oneshot::channel();
let verify_handle =
::listener::run_listener(&config.listener_host, shared_challenge.clone(), verify_tx, thread_rx);
let listener_handle = ::listener::run_listener(&config.listener_host, shared_challenge.clone(), verify_tx);

// run challenge request storing expected responses
::challenger::run_challenge_request(
Expand All @@ -90,9 +87,7 @@ pub fn run_request<T: Service, K: ClientChain, D: Storage>(
time::Duration::from_secs(config.block_time / 2),
)?;

// stop listener service
thread_tx.send(()).expect("thread_tx send failed");
verify_handle.join().expect("verify_handle join failed");
listener_handle.stop(); // try stop listener service

return Ok(Some(shared_challenge.lock().unwrap().request.txid));
}
Expand Down
2 changes: 2 additions & 0 deletions src/interfaces/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ pub trait Storage {

/// Database implementation of Storage trait
pub struct MongoStorage {
/// mongo db connection instance
db: Mutex<Database>,
/// db config for reconnecting
config: StorageConfig,
}

Expand Down
17 changes: 11 additions & 6 deletions src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use serde_json::{self, Value};
use crate::challenger::{ChallengeResponse, ChallengeState};
use crate::error::Result;
use crate::interfaces::bid::Bid;
use crate::util::handler::Handle;

/// Messsage type for challenge proofs sent by guardnodes
#[derive(Debug)]
Expand Down Expand Up @@ -152,8 +153,7 @@ pub fn run_listener(
listener_host: &String,
challenge: Arc<Mutex<ChallengeState>>,
ch_resp: Sender<ChallengeResponse>,
ch_recv: oneshot::Receiver<()>,
) -> thread::JoinHandle<()> {
) -> Handle {
let addr: Vec<_> = listener_host
.to_socket_addrs()
.expect("Unable to resolve domain")
Expand All @@ -165,14 +165,19 @@ pub fn run_listener(
service_fn(move |req: Request<Body>| handle(req, challenge.clone(), challenge_resp.clone()))
};

let (tx, rx) = oneshot::channel();
let server = Server::bind(&addr[0])
.serve(listener_service)
.with_graceful_shutdown(ch_recv)
.with_graceful_shutdown(rx)
.map_err(|e| error!("listener error: {}", e));

thread::spawn(move || {
rt::run(server);
})
Handle::new(
tx,
thread::spawn(move || {
rt::run(server);
}),
"LISTENER",
)
}

#[cfg(test)]
Expand Down
34 changes: 34 additions & 0 deletions src/util/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! # Handler
//!
//! Error/kill handler for sub threads and services
use std::thread;

use futures::sync::oneshot;

/// Handler struct responsible for sending a stop signal to a service and
/// joining a thread back to the main thread
pub struct Handle<'a> {
/// Channel to send kill signal
tx: oneshot::Sender<()>,
/// Service thread handler
thread: thread::JoinHandle<()>,
/// Service name
name: &'a str,
}

impl<'a> Handle<'a> {
/// Return new handle instance
pub fn new(tx: oneshot::Sender<()>, thread: thread::JoinHandle<()>, name: &str) -> Handle {
Handle { tx, thread, name }
}

/// Handle sending a stop signal to the service and joining the service
/// thread
pub fn stop(self) {
self.tx
.send(())
.expect(&format!("failed sending shutdown signal to {}", self.name));
self.thread.join().expect(&format!("{} join failed", self.name));
}
}
1 change: 1 addition & 0 deletions src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
pub mod checks;
pub mod doc_format;
pub mod handler;
pub mod ocean;
#[cfg(test)]
pub mod testing;

0 comments on commit 98faa22

Please sign in to comment.