Skip to content

Commit

Permalink
Merge pull request #95 from commerceblock/develop
Browse files Browse the repository at this point in the history
Release 0.4.7 - Listener server improvements
  • Loading branch information
Nikos Kostoulas authored Jan 31, 2020
2 parents 0da5e51 + d4ceb60 commit fff92a6
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 56 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "coordinator"
version = "0.4.6"
version = "0.4.7"
authors = ["nkostoulas <[email protected]>"]
description = "Guardnode Coordinator implementation for the Commerceblock Covalence system"
homepage = "https://github.com/commerceblock"
Expand Down
2 changes: 1 addition & 1 deletion config/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# block_time = 60

# Host address that the listener binds to and receives guardnode requests
listener_host = "127.0.0.1:9999"
listener_host = "127.0.0.1:9998"

[api]
host = "localhost:3333"
Expand Down
2 changes: 1 addition & 1 deletion examples/hyperclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn main() {
sig.serialize_der().to_hex()
);

let uri: hyper::Uri = "http://localhost:9999/challengeproof".parse().unwrap();
let uri: hyper::Uri = "http://localhost:9998/challengeproof".parse().unwrap();
let mut req = Request::new(Body::from(data));
*req.method_mut() = Method::POST;
*req.uri_mut() = uri.clone();
Expand Down
24 changes: 12 additions & 12 deletions src/challenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ fn get_challenge_response(
pub fn run_challenge_request<T: Service, K: ClientChain, D: Storage>(
service: &T,
clientchain: &K,
challenge_state: Arc<Mutex<ChallengeState>>,
challenge_state: Arc<Mutex<Option<ChallengeState>>>,
verify_rx: &Receiver<ChallengeResponse>,
storage: Arc<D>,
verify_duration: time::Duration,
challenge_duration: time::Duration,
challenge_frequency: u64,
refresh_delay: time::Duration,
) -> Result<()> {
let request = challenge_state.lock().unwrap().request.clone(); // clone as const and drop mutex
let request = challenge_state.lock().unwrap().as_ref().unwrap().request.clone(); // clone as const and drop mutex
let mut response = storage.get_response(request.txid)?.unwrap_or(Response::new());
info! {"Running challenge request: {:?}", request.txid};
let mut prev_challenge_height: u64 = 0;
Expand All @@ -119,10 +119,10 @@ pub fn run_challenge_request<T: Service, K: ClientChain, D: Storage>(

info! {"sending challenge..."}
let challenge_hash = clientchain.send_challenge()?;
challenge_state.lock().unwrap().latest_challenge = Some(challenge_hash);
challenge_state.lock().unwrap().as_mut().unwrap().latest_challenge = Some(challenge_hash);

if let Err(e) = verify_challenge(&challenge_hash, clientchain, verify_duration) {
challenge_state.lock().unwrap().latest_challenge = None; // stop receiving responses
challenge_state.lock().unwrap().as_mut().unwrap().latest_challenge = None; // stop receiving responses
return Err(e);
}

Expand All @@ -133,7 +133,7 @@ pub fn run_challenge_request<T: Service, K: ClientChain, D: Storage>(
challenge_duration,
)?);
storage.save_response(request.txid, &response)?;
challenge_state.lock().unwrap().latest_challenge = None; // stop receiving responses
challenge_state.lock().unwrap().as_mut().unwrap().latest_challenge = None; // stop receiving responses
prev_challenge_height = challenge_height; // update prev height
}
info! {"Challenge request ended"}
Expand Down Expand Up @@ -518,7 +518,7 @@ mod tests {
let res = run_challenge_request(
&service,
&clientchain,
Arc::new(Mutex::new(challenge_state.clone())),
Arc::new(Mutex::new(Some(challenge_state.clone()))),
&vrx,
storage.clone(),
time::Duration::from_millis(10),
Expand Down Expand Up @@ -552,7 +552,7 @@ mod tests {
let res = run_challenge_request(
&service,
&clientchain,
Arc::new(Mutex::new(challenge_state.clone())),
Arc::new(Mutex::new(Some(challenge_state.clone()))),
&vrx,
storage.clone(),
time::Duration::from_millis(10),
Expand Down Expand Up @@ -594,7 +594,7 @@ mod tests {
assert!(run_challenge_request(
&service,
&clientchain,
Arc::new(Mutex::new(challenge_state)),
Arc::new(Mutex::new(Some(challenge_state))),
&vrx,
storage.clone(),
time::Duration::from_millis(10),
Expand All @@ -613,7 +613,7 @@ mod tests {
assert!(run_challenge_request(
&service,
&clientchain,
Arc::new(Mutex::new(challenge_state)),
Arc::new(Mutex::new(Some(challenge_state))),
&vrx,
storage.clone(),
time::Duration::from_millis(10),
Expand All @@ -633,7 +633,7 @@ mod tests {
assert!(run_challenge_request(
&service,
&clientchain,
Arc::new(Mutex::new(challenge_state)),
Arc::new(Mutex::new(Some(challenge_state))),
&vrx,
Arc::new(storage_err),
time::Duration::from_millis(10),
Expand All @@ -655,7 +655,7 @@ mod tests {
let res = run_challenge_request(
&service,
&clientchain,
Arc::new(Mutex::new(challenge_state)),
Arc::new(Mutex::new(Some(challenge_state))),
&vrx,
storage.clone(),
time::Duration::from_millis(10),
Expand Down Expand Up @@ -683,7 +683,7 @@ mod tests {
let res = run_challenge_request(
&service,
&clientchain,
Arc::new(Mutex::new(challenge_state)),
Arc::new(Mutex::new(Some(challenge_state))),
&vrx,
storage.clone(),
time::Duration::from_millis(10),
Expand Down
43 changes: 29 additions & 14 deletions src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{thread, time};

use bitcoin::hashes::{hex::FromHex, sha256d};

use crate::challenger::ChallengeResponse;
use crate::challenger::{ChallengeResponse, ChallengeState};
use crate::config::Config;
use crate::error::Result;
use crate::interfaces::clientchain::{ClientChain, RpcClientChain};
Expand All @@ -28,10 +28,26 @@ pub fn run(config: Config) -> Result<()> {
let (req_send, req_recv): (Sender<sha256d::Hash>, Receiver<sha256d::Hash>) = channel();
let mut payments_handler = ::payments::run_payments(config.clientchain.clone(), storage.clone(), req_recv)?;

// create a challenge state mutex to share between challenger and listener.
// initially None
let shared_challenge = Arc::new(Mutex::new(None));
// and a channel for sending responses from listener to challenger
let (verify_tx, verify_rx): (Sender<ChallengeResponse>, Receiver<ChallengeResponse>) = channel();
// start listener along with a oneshot channel to send shutdown message
let listener_handle = ::listener::run_listener(&config.listener_host, shared_challenge.clone(), verify_tx);

// This loop runs continuously fetching and running challenge requests,
// generating challenge responses and fails on any errors that occur
loop {
match run_request(&config, &service, &clientchain, storage.clone(), genesis_hash) {
match run_request(
&config,
&service,
&clientchain,
storage.clone(),
shared_challenge.clone(),
&verify_rx,
genesis_hash,
) {
Ok(res) => {
if let Some(request_id) = res {
// if challenge request succeeds print responses
Expand All @@ -40,12 +56,16 @@ pub fn run(config: Config) -> Result<()> {
let resp = storage.get_response(request_id)?.unwrap();
info! {"{}", serde_json::to_string_pretty(&resp).unwrap()};
}
// Reset challenge state to None.
*shared_challenge.lock().unwrap() = None;

info! {"Sleeping for {} sec...", config.block_time}
thread::sleep(time::Duration::from_secs(config.block_time))
}
Err(err) => {
api_handler.close(); // try closing the api server
payments_handler.stop(); // try closing the payments service
listener_handle.stop(); // try stop listener service
return Err(err);
}
}
Expand All @@ -54,6 +74,7 @@ pub fn run(config: Config) -> Result<()> {
}
}
api_handler.close(); // try closing the api server
listener_handle.stop(); // try stop listener service
Ok(())
}

Expand All @@ -65,6 +86,8 @@ pub fn run_request<T: Service, K: ClientChain, D: Storage>(
service: &T,
clientchain: &K,
storage: Arc<D>,
shared_challenge: Arc<Mutex<Option<ChallengeState>>>,
verify_rx: &Receiver<ChallengeResponse>,
genesis_hash: sha256d::Hash,
) -> Result<Option<sha256d::Hash>> {
match ::challenger::fetch_next(service, &genesis_hash)? {
Expand All @@ -80,13 +103,8 @@ pub fn run_request<T: Service, K: ClientChain, D: Storage>(
config.clientchain.block_time,
)?;

// create a challenge state mutex to share between challenger and listener
let shared_challenge = Arc::new(Mutex::new(challenge));
// and a channel for sending responses from listener to challenger
let (verify_tx, verify_rx): (Sender<ChallengeResponse>, Receiver<ChallengeResponse>) = channel();

// start listener along with a oneshot channel to send shutdown message
let listener_handle = ::listener::run_listener(&config.listener_host, shared_challenge.clone(), verify_tx);
// modify challenge state for the new challenge request
*shared_challenge.lock().unwrap() = Some(challenge);

// run challenge request storing expected responses
match ::challenger::run_challenge_request(
Expand All @@ -100,11 +118,8 @@ pub fn run_request<T: Service, K: ClientChain, D: Storage>(
config.challenge_frequency,
time::Duration::from_secs(config.block_time / 2),
) {
Ok(()) => return Ok(Some(shared_challenge.lock().unwrap().request.txid)),
Err(err) => {
listener_handle.stop(); // try stop listener service
Err(err)
}
Ok(()) => return Ok(Some(shared_challenge.lock().unwrap().as_ref().unwrap().request.txid)),
Err(err) => Err(err),
}
}
None => Ok(None),
Expand Down
57 changes: 31 additions & 26 deletions src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl ChallengeProof {
/// challenger to receive
fn handle_challengeproof(
req: Request<Body>,
challenge: Arc<Mutex<ChallengeState>>,
challenge: Arc<Mutex<Option<ChallengeState>>>,
challenge_resp: Sender<ChallengeResponse>,
) -> impl Future<Item = Response<Body>, Error = hyper::Error> + Send {
let resp = req.into_body().concat2().map(move |body| {
Expand All @@ -82,27 +82,32 @@ fn handle_challengeproof(
// parse challenge proof from json
Ok(proof) => {
// check for an active challenge
let challenge_lock = challenge.lock().unwrap();
if let Some(h) = challenge_lock.latest_challenge {
// check challenge proof bid exists
if !challenge_lock.bids.contains(&proof.bid) {
return response(StatusCode::BAD_REQUEST, "bad-bid".to_owned());
let ch_lock = challenge.lock().unwrap();
if let Some(ch) = ch_lock.as_ref() {
if let Some(h) = ch.latest_challenge {
// check challenge proof bid exists
if !ch.bids.contains(&proof.bid) {
return response(StatusCode::BAD_REQUEST, "bad-bid".to_owned());
}
// drop lock immediately
std::mem::drop(ch_lock);
// check challenge proof hash is correct
if proof.hash != h {
return response(StatusCode::BAD_REQUEST, "bad-hash".to_owned());
}
// check challenge proof sig is correct
if let Err(e) = ChallengeProof::verify(&proof) {
return response(StatusCode::BAD_REQUEST, format!("bad-sig: {}", e));
}
// send successful response to challenger
challenge_resp
.send(ChallengeResponse(proof.hash, proof.bid.clone()))
.unwrap();
return response(StatusCode::OK, String::new());
}
} else {
// drop lock immediately
std::mem::drop(challenge_lock);
// check challenge proof hash is correct
if proof.hash != h {
return response(StatusCode::BAD_REQUEST, "bad-hash".to_owned());
}
// check challenge proof sig is correct
if let Err(e) = ChallengeProof::verify(&proof) {
return response(StatusCode::BAD_REQUEST, format!("bad-sig: {}", e));
}
// send successful response to challenger
challenge_resp
.send(ChallengeResponse(proof.hash, proof.bid.clone()))
.unwrap();
return response(StatusCode::OK, String::new());
std::mem::drop(ch_lock);
}
response(StatusCode::BAD_REQUEST, format!("no-active-challenge"))
}
Expand All @@ -118,7 +123,7 @@ fn handle_challengeproof(
/// and to the /challengeproof POST uri for receiving challenges from guardnodes
fn handle(
req: Request<Body>,
challenge: Arc<Mutex<ChallengeState>>,
challenge: Arc<Mutex<Option<ChallengeState>>>,
challenge_resp: Sender<ChallengeResponse>,
) -> impl Future<Item = Response<Body>, Error = hyper::Error> + Send {
let resp = match (req.method(), req.uri().path()) {
Expand Down Expand Up @@ -151,7 +156,7 @@ fn response(status: StatusCode, message: String) -> Response<Body> {
/// of the coordinator
pub fn run_listener(
listener_host: &String,
challenge: Arc<Mutex<ChallengeState>>,
challenge: Arc<Mutex<Option<ChallengeState>>>,
ch_resp: Sender<ChallengeResponse>,
) -> Handle {
let addr: Vec<_> = listener_host
Expand Down Expand Up @@ -304,7 +309,7 @@ mod tests {
let _challenge_state = gen_challenge_state_with_challenge(&gen_dummy_hash(3), &chl_hash);
let bid_txid = _challenge_state.bids.iter().next().unwrap().txid;
let bid_pubkey = _challenge_state.bids.iter().next().unwrap().pubkey;
let challenge_state = Arc::new(Mutex::new(_challenge_state));
let challenge_state = Arc::new(Mutex::new(Some(_challenge_state)));

// Request get /
let data = "";
Expand Down Expand Up @@ -444,7 +449,7 @@ mod tests {
let _challenge_state = gen_challenge_state_with_challenge(&gen_dummy_hash(1), &chl_hash);
let bid_txid = _challenge_state.bids.iter().next().unwrap().txid;
let bid_pubkey = _challenge_state.bids.iter().next().unwrap().pubkey;
let challenge_state = Arc::new(Mutex::new(_challenge_state));
let challenge_state = Arc::new(Mutex::new(Some(_challenge_state)));

// Request body data empty
let data = "";
Expand Down Expand Up @@ -523,7 +528,7 @@ mod tests {
assert!(resp_rx.try_recv() == Err(TryRecvError::Empty)); // check receiver empty

// No active challenge (hash is None) so request rejected
challenge_state.lock().unwrap().latest_challenge = None;
challenge_state.lock().unwrap().as_mut().unwrap().latest_challenge = None;
let data = r#"
{
"txid": "0000000000000000000000000000000000000000000000000000000000000000",
Expand All @@ -543,7 +548,7 @@ mod tests {
.wait()
})
.wait();
challenge_state.lock().unwrap().latest_challenge = Some(chl_hash);
challenge_state.lock().unwrap().as_mut().unwrap().latest_challenge = Some(chl_hash);
assert!(resp_rx.try_recv() == Err(TryRecvError::Empty)); // check receiver empty

// Invalid bid on request body (txid does not exist)
Expand Down

0 comments on commit fff92a6

Please sign in to comment.