Skip to content

Commit

Permalink
Graceful shutdown on Payments service error
Browse files Browse the repository at this point in the history
  • Loading branch information
nkostoulas committed Dec 12, 2019
1 parent c953f5f commit d4dd864
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 7 deletions.
9 changes: 7 additions & 2 deletions src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub fn run(config: Config) -> Result<()> {

let api_handler = ::api::run_api_server(&config.api, storage.clone());
let (req_send, req_recv): (Sender<sha256d::Hash>, Receiver<sha256d::Hash>) = channel();
let payments_handler = ::payments::run_payments(config.clientchain.clone(), storage.clone(), req_recv)?;
let mut payments_handler = ::payments::run_payments(config.clientchain.clone(), storage.clone(), req_recv)?;

// This loop runs continuously fetching and running challenge requests,
// generating challenge responses and fails on any errors that occur
Expand All @@ -44,12 +44,17 @@ pub fn run(config: Config) -> Result<()> {
thread::sleep(time::Duration::from_secs(config.block_time))
}
Err(err) => {
api_handler.close(); // try closing the api rpc server
api_handler.close(); // try closing the api server
payments_handler.stop(); // try closing the payments service
return Err(err);
}
}
if payments_handler.got_err() {
break;
}
}
api_handler.close(); // try closing the api server
Ok(())
}

/// Run request method attemps to fetch a challenge request and run it
Expand Down
1 change: 1 addition & 0 deletions src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ pub fn run_listener(

Handle::new(
tx,
None,
thread::spawn(move || {
rt::run(server);
}),
Expand Down
9 changes: 6 additions & 3 deletions src/payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,11 @@ impl Payments {
if bids.len() > 0 {
if let Some(resp) = self.storage.get_response(request.txid)? {
let fees_amount = calculate_fees(request, &self.client)?;
info! {"total service request fees: {}", fees_amount};
info! {"total service fees: {}", fees_amount};
let bid_payment_amount =
calculate_bid_payment(&fees_amount, request.fee_percentage.into(), bids.len() as u64)?;
info! {"num bids: {} fee per bid: {} ({}%)", bids.len(), bid_payment_amount, request.fee_percentage};

info! {"num bids: {}", bids.len()};
info! {"fees per bid: {} ({}%)", bid_payment_amount, request.fee_percentage};
self.process_bid_payments(&mut bids, &bid_payment_amount, &resp)?;
if self.do_payment {
payment_complete = self.complete_bid_payments(&mut bids)?
Expand Down Expand Up @@ -311,11 +311,14 @@ pub fn run_payments<'a>(
) -> Result<Handle<'a>> {
let payments = Payments::new(clientchain_config, storage)?;
let (tx, rx) = oneshot::channel();
let (err_tx, err_rx) = oneshot::channel();
Ok(Handle::new(
tx,
Some(err_rx),
thread::spawn(move || {
if let Err(err) = payments.do_request_payments(req_recv, rx) {
error! {"payments error: {}", err};
err_tx.send(()).expect("failed sending error signal");
}
}),
"PAYMENTS",
Expand Down
26 changes: 24 additions & 2 deletions src/util/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use futures::sync::oneshot;
pub struct Handle<'a> {
/// Channel to send kill signal
tx: oneshot::Sender<()>,
/// Channel to receive error from service
err_rx: Option<oneshot::Receiver<()>>,
/// Service thread handler
thread: thread::JoinHandle<()>,
/// Service name
Expand All @@ -19,8 +21,28 @@ pub struct Handle<'a> {

impl<'a> Handle<'a> {
/// Return new handle instance
pub fn new(tx: oneshot::Sender<()>, thread: thread::JoinHandle<()>, name: &str) -> Handle {
Handle { tx, thread, name }
pub fn new(
tx: oneshot::Sender<()>,
err_rx: Option<oneshot::Receiver<()>>,
thread: thread::JoinHandle<()>,
name: &str,
) -> Handle {
Handle {
tx,
err_rx,
thread,
name,
}
}

/// Check if an err signal has been received in the error receiver channel
pub fn got_err(&mut self) -> bool {
if let Some(rcv) = &mut self.err_rx {
if rcv.try_recv().expect("").is_some() {
return true;
}
}
false
}

/// Handle sending a stop signal to the service and joining the service
Expand Down

0 comments on commit d4dd864

Please sign in to comment.