From d4dd86459c8b188bdbf7c489889d1d0d55e0ab20 Mon Sep 17 00:00:00 2001 From: nkostoulas Date: Thu, 12 Dec 2019 16:17:13 +0000 Subject: [PATCH] Graceful shutdown on Payments service error --- src/coordinator.rs | 9 +++++++-- src/listener.rs | 1 + src/payments.rs | 9 ++++++--- src/util/handler.rs | 26 ++++++++++++++++++++++++-- 4 files changed, 38 insertions(+), 7 deletions(-) diff --git a/src/coordinator.rs b/src/coordinator.rs index 694096c..df9696c 100644 --- a/src/coordinator.rs +++ b/src/coordinator.rs @@ -26,7 +26,7 @@ pub fn run(config: Config) -> Result<()> { let api_handler = ::api::run_api_server(&config.api, storage.clone()); let (req_send, req_recv): (Sender, Receiver) = channel(); - let payments_handler = ::payments::run_payments(config.clientchain.clone(), storage.clone(), req_recv)?; + let mut payments_handler = ::payments::run_payments(config.clientchain.clone(), storage.clone(), req_recv)?; // This loop runs continuously fetching and running challenge requests, // generating challenge responses and fails on any errors that occur @@ -44,12 +44,17 @@ pub fn run(config: Config) -> Result<()> { thread::sleep(time::Duration::from_secs(config.block_time)) } Err(err) => { - api_handler.close(); // try closing the api rpc server + api_handler.close(); // try closing the api server payments_handler.stop(); // try closing the payments service return Err(err); } } + if payments_handler.got_err() { + break; + } } + api_handler.close(); // try closing the api server + Ok(()) } /// Run request method attemps to fetch a challenge request and run it diff --git a/src/listener.rs b/src/listener.rs index e8c4ee1..1b77fbc 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -173,6 +173,7 @@ pub fn run_listener( Handle::new( tx, + None, thread::spawn(move || { rt::run(server); }), diff --git a/src/payments.rs b/src/payments.rs index 6a86d38..9a005db 100644 --- a/src/payments.rs +++ b/src/payments.rs @@ -197,11 +197,11 @@ impl Payments { if bids.len() > 0 { if let Some(resp) = self.storage.get_response(request.txid)? { let fees_amount = calculate_fees(request, &self.client)?; - info! {"total service request fees: {}", fees_amount}; + info! {"total service fees: {}", fees_amount}; let bid_payment_amount = calculate_bid_payment(&fees_amount, request.fee_percentage.into(), bids.len() as u64)?; - info! {"num bids: {} fee per bid: {} ({}%)", bids.len(), bid_payment_amount, request.fee_percentage}; - + info! {"num bids: {}", bids.len()}; + info! {"fees per bid: {} ({}%)", bid_payment_amount, request.fee_percentage}; self.process_bid_payments(&mut bids, &bid_payment_amount, &resp)?; if self.do_payment { payment_complete = self.complete_bid_payments(&mut bids)? @@ -311,11 +311,14 @@ pub fn run_payments<'a>( ) -> Result> { let payments = Payments::new(clientchain_config, storage)?; let (tx, rx) = oneshot::channel(); + let (err_tx, err_rx) = oneshot::channel(); Ok(Handle::new( tx, + Some(err_rx), thread::spawn(move || { if let Err(err) = payments.do_request_payments(req_recv, rx) { error! {"payments error: {}", err}; + err_tx.send(()).expect("failed sending error signal"); } }), "PAYMENTS", diff --git a/src/util/handler.rs b/src/util/handler.rs index fa92b46..34ea8d1 100644 --- a/src/util/handler.rs +++ b/src/util/handler.rs @@ -11,6 +11,8 @@ use futures::sync::oneshot; pub struct Handle<'a> { /// Channel to send kill signal tx: oneshot::Sender<()>, + /// Channel to receive error from service + err_rx: Option>, /// Service thread handler thread: thread::JoinHandle<()>, /// Service name @@ -19,8 +21,28 @@ pub struct Handle<'a> { impl<'a> Handle<'a> { /// Return new handle instance - pub fn new(tx: oneshot::Sender<()>, thread: thread::JoinHandle<()>, name: &str) -> Handle { - Handle { tx, thread, name } + pub fn new( + tx: oneshot::Sender<()>, + err_rx: Option>, + thread: thread::JoinHandle<()>, + name: &str, + ) -> Handle { + Handle { + tx, + err_rx, + thread, + name, + } + } + + /// Check if an err signal has been received in the error receiver channel + pub fn got_err(&mut self) -> bool { + if let Some(rcv) = &mut self.err_rx { + if rcv.try_recv().expect("").is_some() { + return true; + } + } + false } /// Handle sending a stop signal to the service and joining the service