Skip to content

Commit

Permalink
Merge pull request #92 from commerceblock/develop
Browse files Browse the repository at this point in the history
Release 0.4.6 - Child thread handling
  • Loading branch information
Nikos Kostoulas authored Dec 13, 2019
2 parents 8a2e291 + c545143 commit 0da5e51
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 64 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
2. Robustness improvements
3. Dockerize request script and fixes
4. Complete payments
5. Payment key secrets
6. Child thread handling improvements

# 0.3.x

Expand Down
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "coordinator"
version = "0.4.4"
version = "0.4.6"
authors = ["nkostoulas <[email protected]>"]
description = "Guardnode Coordinator implementation for the Commerceblock Covalence system"
homepage = "https://github.com/commerceblock"
Expand All @@ -16,7 +16,7 @@ config = "0.9"
serde = { version = "1.0", features = ["derive"] }
serde_json="1.0"
mongodb = "0.3.11"
jsonrpc-http-server = "11.0"
jsonrpc-http-server = "14.0.5"
rust-ocean = { git = "https://github.com/commerceblock/rust-ocean"}
ocean-rpc = { git = "https://github.com/commerceblock/rust-ocean-rpc"}
bitcoin = { version = "0.20", features = [ "use-serde" ] }
13 changes: 7 additions & 6 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use base64::decode as b64decode;
use bitcoin::hashes::sha256d;
use hyper::{Body, Request, StatusCode};
use jsonrpc_http_server::jsonrpc_core::{Error, ErrorCode, IoHandler, Params, Value};
use jsonrpc_http_server::{hyper::header, AccessControlAllowOrigin, DomainsValidation, Response, ServerBuilder};
use jsonrpc_http_server::{
hyper::header, AccessControlAllowOrigin, CloseHandle, DomainsValidation, Response, ServerBuilder,
};
use serde::{Deserialize, Serialize};

use crate::config::ApiConfig;
Expand Down Expand Up @@ -148,10 +150,7 @@ fn authorize(our_auth: &str, request: &Request<Body>) -> bool {
/// Run Api RPC server for external requests that require information from the
/// coordinator. Data returned to the caller are drawn from the storage
/// interface which is shared with the main coordinator process
pub fn run_api_server<D: Storage + Send + Sync + 'static>(
config: &ApiConfig,
storage: Arc<D>,
) -> thread::JoinHandle<()> {
pub fn run_api_server<D: Storage + Send + Sync + 'static>(config: &ApiConfig, storage: Arc<D>) -> CloseHandle {
let mut io = IoHandler::default();
let storage_ref = storage.clone();
io.add_method("getrequestresponse", move |params: Params| {
Expand Down Expand Up @@ -189,7 +188,9 @@ pub fn run_api_server<D: Storage + Send + Sync + 'static>(
.start_http(&addr[0])
.expect("api error");

thread::spawn(move || server.wait())
let close_handle = server.close_handle();
let _ = thread::spawn(move || server.wait());
close_handle // handler to stop the server from the main thread
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions src/bin/main.rs → src/bin/coord.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! # Main
//! # Coord
//!
//! Main daemon entry
//! Coord main daemon entry
#[macro_use]
extern crate log;
Expand Down
55 changes: 33 additions & 22 deletions src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::sync::{Arc, Mutex};
use std::{thread, time};

use bitcoin::hashes::{hex::FromHex, sha256d};
use futures::sync::oneshot;

use crate::challenger::ChallengeResponse;
use crate::config::Config;
Expand All @@ -25,23 +24,37 @@ pub fn run(config: Config) -> Result<()> {
let storage = Arc::new(MongoStorage::new(config.storage.clone())?);
let genesis_hash = sha256d::Hash::from_hex(&config.clientchain.genesis_hash)?;

let _ = ::api::run_api_server(&config.api, storage.clone());
let api_handler = ::api::run_api_server(&config.api, storage.clone());
let (req_send, req_recv): (Sender<sha256d::Hash>, Receiver<sha256d::Hash>) = channel();
let _ = ::payments::run_payments(config.clientchain.clone(), storage.clone(), req_recv)?;
let mut payments_handler = ::payments::run_payments(config.clientchain.clone(), storage.clone(), req_recv)?;

// This loop runs continuously fetching and running challenge requests,
// generating challenge responses and fails on any errors that occur
loop {
if let Some(request_id) = run_request(&config, &service, &clientchain, storage.clone(), genesis_hash)? {
// if challenge request succeeds print responses
req_send.send(request_id).unwrap();
info! {"***** Response *****"}
let resp = storage.get_response(request_id)?.unwrap();
info! {"{}", serde_json::to_string_pretty(&resp).unwrap()};
match run_request(&config, &service, &clientchain, storage.clone(), genesis_hash) {
Ok(res) => {
if let Some(request_id) = res {
// if challenge request succeeds print responses
req_send.send(request_id).unwrap();
info! {"***** Response *****"}
let resp = storage.get_response(request_id)?.unwrap();
info! {"{}", serde_json::to_string_pretty(&resp).unwrap()};
}
info! {"Sleeping for {} sec...", config.block_time}
thread::sleep(time::Duration::from_secs(config.block_time))
}
Err(err) => {
api_handler.close(); // try closing the api server
payments_handler.stop(); // try closing the payments service
return Err(err);
}
}
if payments_handler.got_err() {
break;
}
info! {"Sleeping for {} sec...", config.block_time}
thread::sleep(time::Duration::from_secs(config.block_time))
}
api_handler.close(); // try closing the api server
Ok(())
}

/// Run request method attemps to fetch a challenge request and run it
Expand Down Expand Up @@ -73,12 +86,10 @@ pub fn run_request<T: Service, K: ClientChain, D: Storage>(
let (verify_tx, verify_rx): (Sender<ChallengeResponse>, Receiver<ChallengeResponse>) = channel();

// start listener along with a oneshot channel to send shutdown message
let (thread_tx, thread_rx) = oneshot::channel();
let verify_handle =
::listener::run_listener(&config.listener_host, shared_challenge.clone(), verify_tx, thread_rx);
let listener_handle = ::listener::run_listener(&config.listener_host, shared_challenge.clone(), verify_tx);

// run challenge request storing expected responses
::challenger::run_challenge_request(
match ::challenger::run_challenge_request(
service,
clientchain,
shared_challenge.clone(),
Expand All @@ -88,13 +99,13 @@ pub fn run_request<T: Service, K: ClientChain, D: Storage>(
time::Duration::from_secs(config.challenge_duration),
config.challenge_frequency,
time::Duration::from_secs(config.block_time / 2),
)?;

// stop listener service
thread_tx.send(()).expect("thread_tx send failed");
verify_handle.join().expect("verify_handle join failed");

return Ok(Some(shared_challenge.lock().unwrap().request.txid));
) {
Ok(()) => return Ok(Some(shared_challenge.lock().unwrap().request.txid)),
Err(err) => {
listener_handle.stop(); // try stop listener service
Err(err)
}
}
}
None => Ok(None),
}
Expand Down
2 changes: 2 additions & 0 deletions src/interfaces/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ pub trait Storage {

/// Database implementation of Storage trait
pub struct MongoStorage {
/// mongo db connection instance
db: Mutex<Database>,
/// db config for reconnecting
config: StorageConfig,
}

Expand Down
18 changes: 12 additions & 6 deletions src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use serde_json::{self, Value};
use crate::challenger::{ChallengeResponse, ChallengeState};
use crate::error::Result;
use crate::interfaces::bid::Bid;
use crate::util::handler::Handle;

/// Messsage type for challenge proofs sent by guardnodes
#[derive(Debug)]
Expand Down Expand Up @@ -152,8 +153,7 @@ pub fn run_listener(
listener_host: &String,
challenge: Arc<Mutex<ChallengeState>>,
ch_resp: Sender<ChallengeResponse>,
ch_recv: oneshot::Receiver<()>,
) -> thread::JoinHandle<()> {
) -> Handle {
let addr: Vec<_> = listener_host
.to_socket_addrs()
.expect("Unable to resolve domain")
Expand All @@ -165,14 +165,20 @@ pub fn run_listener(
service_fn(move |req: Request<Body>| handle(req, challenge.clone(), challenge_resp.clone()))
};

let (tx, rx) = oneshot::channel();
let server = Server::bind(&addr[0])
.serve(listener_service)
.with_graceful_shutdown(ch_recv)
.with_graceful_shutdown(rx)
.map_err(|e| error!("listener error: {}", e));

thread::spawn(move || {
rt::run(server);
})
Handle::new(
tx,
None,
thread::spawn(move || {
rt::run(server);
}),
"LISTENER",
)
}

#[cfg(test)]
Expand Down
54 changes: 39 additions & 15 deletions src/payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
//! TODO: Add description
use std::str::FromStr;
use std::sync::mpsc::{Receiver, RecvError};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use bitcoin::{hashes::sha256d, Amount, PublicKey};
use futures::sync::oneshot;
use ocean::{Address, AddressParams};
use ocean_rpc::{json::SendAnyToAddressResult, RpcApi};

Expand All @@ -19,7 +21,7 @@ use crate::interfaces::{
response::Response,
storage::Storage,
};
use crate::util::ocean::OceanClient;
use crate::util::{handler::Handle, ocean::OceanClient};

/// Get addr params from chain name
pub fn get_chain_addr_params(chain: &String) -> &'static AddressParams {
Expand Down Expand Up @@ -195,11 +197,11 @@ impl Payments {
if bids.len() > 0 {
if let Some(resp) = self.storage.get_response(request.txid)? {
let fees_amount = calculate_fees(request, &self.client)?;
info! {"total service request fees: {}", fees_amount};
info! {"total service fees: {}", fees_amount};
let bid_payment_amount =
calculate_bid_payment(&fees_amount, request.fee_percentage.into(), bids.len() as u64)?;
info! {"num bids: {} fee per bid: {} ({}%)", bids.len(), bid_payment_amount, request.fee_percentage};

info! {"num bids: {}", bids.len()};
info! {"fees per bid: {} ({}%)", bid_payment_amount, request.fee_percentage};
self.process_bid_payments(&mut bids, &bid_payment_amount, &resp)?;
if self.do_payment {
payment_complete = self.complete_bid_payments(&mut bids)?
Expand All @@ -220,7 +222,11 @@ impl Payments {

/// Main Request payments method; first checks for any incomplete requests
/// and then listens for new requests on the receiver channel
fn do_request_payments(&self, req_recv: Receiver<sha256d::Hash>) -> Result<()> {
fn do_request_payments(
&self,
req_recv: Receiver<sha256d::Hash>,
mut kill_recv: oneshot::Receiver<()>,
) -> Result<()> {
// Look for incomplete requests
let incomplete_requests = self.storage.get_requests(Some(false), None, None)?;
for mut req in incomplete_requests {
Expand All @@ -230,16 +236,26 @@ impl Payments {

// Wait for new requests
loop {
match req_recv.recv() {
match req_recv.recv_timeout(Duration::from_millis(100)) {
Ok(resp) => {
let mut req = self.storage.get_request(resp)?.unwrap();
info! {"New request: {}", req.txid};
let _ = self.do_request_payment(&mut req)?;
}
Err(RecvError) => {
Err(RecvTimeoutError::Timeout) => {} // ignore timeout - it's allowed
Err(RecvTimeoutError::Disconnected) => {
return Err(Error::from(CError::ReceiverDisconnected));
}
}

if kill_recv
.try_recv()
.expect("failed receiving shutdown signal")
.is_some()
{
info!("Shutting down...");
return Ok(());
}
}
}

Expand Down Expand Up @@ -288,17 +304,25 @@ impl Payments {

/// Run payments daemon in a separate thread with a Payments instance receiving
/// information on finished requests via a Receiver channel
pub fn run_payments(
pub fn run_payments<'a>(
clientchain_config: ClientChainConfig,
storage: Arc<dyn Storage + Send + Sync>,
req_recv: Receiver<sha256d::Hash>,
) -> Result<thread::JoinHandle<()>> {
) -> Result<Handle<'a>> {
let payments = Payments::new(clientchain_config, storage)?;
Ok(thread::spawn(move || {
if let Err(err) = payments.do_request_payments(req_recv) {
error! {"payments error: {}", err};
}
}))
let (tx, rx) = oneshot::channel();
let (err_tx, err_rx) = oneshot::channel();
Ok(Handle::new(
tx,
Some(err_rx),
thread::spawn(move || {
if let Err(err) = payments.do_request_payments(req_recv, rx) {
error! {"payments error: {}", err};
err_tx.send(()).expect("failed sending error signal");
}
}),
"PAYMENTS",
))
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 0da5e51

Please sign in to comment.