diff --git a/Cargo.lock b/Cargo.lock index 75efad3..4108e89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -194,7 +194,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "coordinator" -version = "0.4.4" +version = "0.4.7" dependencies = [ "base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "bitcoin 0.20.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 2e6610f..e3e7c4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "coordinator" -version = "0.4.6" +version = "0.4.7" authors = ["nkostoulas "] description = "Guardnode Coordinator implementation for the Commerceblock Covalence system" homepage = "https://github.com/commerceblock" diff --git a/config/default.toml b/config/default.toml index 2e781c3..07ba785 100644 --- a/config/default.toml +++ b/config/default.toml @@ -11,7 +11,7 @@ # block_time = 60 # Host address that the listener binds to and receives guardnode requests -listener_host = "127.0.0.1:9999" +listener_host = "127.0.0.1:9998" [api] host = "localhost:3333" diff --git a/examples/hyperclient.rs b/examples/hyperclient.rs index 48e6340..5b37e65 100644 --- a/examples/hyperclient.rs +++ b/examples/hyperclient.rs @@ -35,7 +35,7 @@ fn main() { sig.serialize_der().to_hex() ); - let uri: hyper::Uri = "http://localhost:9999/challengeproof".parse().unwrap(); + let uri: hyper::Uri = "http://localhost:9998/challengeproof".parse().unwrap(); let mut req = Request::new(Body::from(data)); *req.method_mut() = Method::POST; *req.uri_mut() = uri.clone(); diff --git a/src/challenger.rs b/src/challenger.rs index 4eb19e8..2d508f4 100644 --- a/src/challenger.rs +++ b/src/challenger.rs @@ -94,7 +94,7 @@ fn get_challenge_response( pub fn run_challenge_request( service: &T, clientchain: &K, - challenge_state: Arc>, + challenge_state: Arc>>, verify_rx: &Receiver, storage: Arc, verify_duration: time::Duration, @@ -102,7 +102,7 @@ pub fn run_challenge_request( challenge_frequency: u64, refresh_delay: time::Duration, ) -> Result<()> { - let request = challenge_state.lock().unwrap().request.clone(); // clone as const and drop mutex + let request = challenge_state.lock().unwrap().as_ref().unwrap().request.clone(); // clone as const and drop mutex let mut response = storage.get_response(request.txid)?.unwrap_or(Response::new()); info! {"Running challenge request: {:?}", request.txid}; let mut prev_challenge_height: u64 = 0; @@ -119,10 +119,10 @@ pub fn run_challenge_request( info! {"sending challenge..."} let challenge_hash = clientchain.send_challenge()?; - challenge_state.lock().unwrap().latest_challenge = Some(challenge_hash); + challenge_state.lock().unwrap().as_mut().unwrap().latest_challenge = Some(challenge_hash); if let Err(e) = verify_challenge(&challenge_hash, clientchain, verify_duration) { - challenge_state.lock().unwrap().latest_challenge = None; // stop receiving responses + challenge_state.lock().unwrap().as_mut().unwrap().latest_challenge = None; // stop receiving responses return Err(e); } @@ -133,7 +133,7 @@ pub fn run_challenge_request( challenge_duration, )?); storage.save_response(request.txid, &response)?; - challenge_state.lock().unwrap().latest_challenge = None; // stop receiving responses + challenge_state.lock().unwrap().as_mut().unwrap().latest_challenge = None; // stop receiving responses prev_challenge_height = challenge_height; // update prev height } info! {"Challenge request ended"} @@ -518,7 +518,7 @@ mod tests { let res = run_challenge_request( &service, &clientchain, - Arc::new(Mutex::new(challenge_state.clone())), + Arc::new(Mutex::new(Some(challenge_state.clone()))), &vrx, storage.clone(), time::Duration::from_millis(10), @@ -552,7 +552,7 @@ mod tests { let res = run_challenge_request( &service, &clientchain, - Arc::new(Mutex::new(challenge_state.clone())), + Arc::new(Mutex::new(Some(challenge_state.clone()))), &vrx, storage.clone(), time::Duration::from_millis(10), @@ -594,7 +594,7 @@ mod tests { assert!(run_challenge_request( &service, &clientchain, - Arc::new(Mutex::new(challenge_state)), + Arc::new(Mutex::new(Some(challenge_state))), &vrx, storage.clone(), time::Duration::from_millis(10), @@ -613,7 +613,7 @@ mod tests { assert!(run_challenge_request( &service, &clientchain, - Arc::new(Mutex::new(challenge_state)), + Arc::new(Mutex::new(Some(challenge_state))), &vrx, storage.clone(), time::Duration::from_millis(10), @@ -633,7 +633,7 @@ mod tests { assert!(run_challenge_request( &service, &clientchain, - Arc::new(Mutex::new(challenge_state)), + Arc::new(Mutex::new(Some(challenge_state))), &vrx, Arc::new(storage_err), time::Duration::from_millis(10), @@ -655,7 +655,7 @@ mod tests { let res = run_challenge_request( &service, &clientchain, - Arc::new(Mutex::new(challenge_state)), + Arc::new(Mutex::new(Some(challenge_state))), &vrx, storage.clone(), time::Duration::from_millis(10), @@ -683,7 +683,7 @@ mod tests { let res = run_challenge_request( &service, &clientchain, - Arc::new(Mutex::new(challenge_state)), + Arc::new(Mutex::new(Some(challenge_state))), &vrx, storage.clone(), time::Duration::from_millis(10), diff --git a/src/coordinator.rs b/src/coordinator.rs index df9696c..d9efead 100644 --- a/src/coordinator.rs +++ b/src/coordinator.rs @@ -8,7 +8,7 @@ use std::{thread, time}; use bitcoin::hashes::{hex::FromHex, sha256d}; -use crate::challenger::ChallengeResponse; +use crate::challenger::{ChallengeResponse, ChallengeState}; use crate::config::Config; use crate::error::Result; use crate::interfaces::clientchain::{ClientChain, RpcClientChain}; @@ -28,10 +28,26 @@ pub fn run(config: Config) -> Result<()> { let (req_send, req_recv): (Sender, Receiver) = channel(); let mut payments_handler = ::payments::run_payments(config.clientchain.clone(), storage.clone(), req_recv)?; + // create a challenge state mutex to share between challenger and listener. + // initially None + let shared_challenge = Arc::new(Mutex::new(None)); + // and a channel for sending responses from listener to challenger + let (verify_tx, verify_rx): (Sender, Receiver) = channel(); + // start listener along with a oneshot channel to send shutdown message + let listener_handle = ::listener::run_listener(&config.listener_host, shared_challenge.clone(), verify_tx); + // This loop runs continuously fetching and running challenge requests, // generating challenge responses and fails on any errors that occur loop { - match run_request(&config, &service, &clientchain, storage.clone(), genesis_hash) { + match run_request( + &config, + &service, + &clientchain, + storage.clone(), + shared_challenge.clone(), + &verify_rx, + genesis_hash, + ) { Ok(res) => { if let Some(request_id) = res { // if challenge request succeeds print responses @@ -40,12 +56,16 @@ pub fn run(config: Config) -> Result<()> { let resp = storage.get_response(request_id)?.unwrap(); info! {"{}", serde_json::to_string_pretty(&resp).unwrap()}; } + // Reset challenge state to None. + *shared_challenge.lock().unwrap() = None; + info! {"Sleeping for {} sec...", config.block_time} thread::sleep(time::Duration::from_secs(config.block_time)) } Err(err) => { api_handler.close(); // try closing the api server payments_handler.stop(); // try closing the payments service + listener_handle.stop(); // try stop listener service return Err(err); } } @@ -54,6 +74,7 @@ pub fn run(config: Config) -> Result<()> { } } api_handler.close(); // try closing the api server + listener_handle.stop(); // try stop listener service Ok(()) } @@ -65,6 +86,8 @@ pub fn run_request( service: &T, clientchain: &K, storage: Arc, + shared_challenge: Arc>>, + verify_rx: &Receiver, genesis_hash: sha256d::Hash, ) -> Result> { match ::challenger::fetch_next(service, &genesis_hash)? { @@ -80,13 +103,8 @@ pub fn run_request( config.clientchain.block_time, )?; - // create a challenge state mutex to share between challenger and listener - let shared_challenge = Arc::new(Mutex::new(challenge)); - // and a channel for sending responses from listener to challenger - let (verify_tx, verify_rx): (Sender, Receiver) = channel(); - - // start listener along with a oneshot channel to send shutdown message - let listener_handle = ::listener::run_listener(&config.listener_host, shared_challenge.clone(), verify_tx); + // modify challenge state for the new challenge request + *shared_challenge.lock().unwrap() = Some(challenge); // run challenge request storing expected responses match ::challenger::run_challenge_request( @@ -100,11 +118,8 @@ pub fn run_request( config.challenge_frequency, time::Duration::from_secs(config.block_time / 2), ) { - Ok(()) => return Ok(Some(shared_challenge.lock().unwrap().request.txid)), - Err(err) => { - listener_handle.stop(); // try stop listener service - Err(err) - } + Ok(()) => return Ok(Some(shared_challenge.lock().unwrap().as_ref().unwrap().request.txid)), + Err(err) => Err(err), } } None => Ok(None), diff --git a/src/listener.rs b/src/listener.rs index 1b77fbc..565acf5 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -71,7 +71,7 @@ impl ChallengeProof { /// challenger to receive fn handle_challengeproof( req: Request, - challenge: Arc>, + challenge: Arc>>, challenge_resp: Sender, ) -> impl Future, Error = hyper::Error> + Send { let resp = req.into_body().concat2().map(move |body| { @@ -82,27 +82,32 @@ fn handle_challengeproof( // parse challenge proof from json Ok(proof) => { // check for an active challenge - let challenge_lock = challenge.lock().unwrap(); - if let Some(h) = challenge_lock.latest_challenge { - // check challenge proof bid exists - if !challenge_lock.bids.contains(&proof.bid) { - return response(StatusCode::BAD_REQUEST, "bad-bid".to_owned()); + let ch_lock = challenge.lock().unwrap(); + if let Some(ch) = ch_lock.as_ref() { + if let Some(h) = ch.latest_challenge { + // check challenge proof bid exists + if !ch.bids.contains(&proof.bid) { + return response(StatusCode::BAD_REQUEST, "bad-bid".to_owned()); + } + // drop lock immediately + std::mem::drop(ch_lock); + // check challenge proof hash is correct + if proof.hash != h { + return response(StatusCode::BAD_REQUEST, "bad-hash".to_owned()); + } + // check challenge proof sig is correct + if let Err(e) = ChallengeProof::verify(&proof) { + return response(StatusCode::BAD_REQUEST, format!("bad-sig: {}", e)); + } + // send successful response to challenger + challenge_resp + .send(ChallengeResponse(proof.hash, proof.bid.clone())) + .unwrap(); + return response(StatusCode::OK, String::new()); } + } else { // drop lock immediately - std::mem::drop(challenge_lock); - // check challenge proof hash is correct - if proof.hash != h { - return response(StatusCode::BAD_REQUEST, "bad-hash".to_owned()); - } - // check challenge proof sig is correct - if let Err(e) = ChallengeProof::verify(&proof) { - return response(StatusCode::BAD_REQUEST, format!("bad-sig: {}", e)); - } - // send successful response to challenger - challenge_resp - .send(ChallengeResponse(proof.hash, proof.bid.clone())) - .unwrap(); - return response(StatusCode::OK, String::new()); + std::mem::drop(ch_lock); } response(StatusCode::BAD_REQUEST, format!("no-active-challenge")) } @@ -118,7 +123,7 @@ fn handle_challengeproof( /// and to the /challengeproof POST uri for receiving challenges from guardnodes fn handle( req: Request, - challenge: Arc>, + challenge: Arc>>, challenge_resp: Sender, ) -> impl Future, Error = hyper::Error> + Send { let resp = match (req.method(), req.uri().path()) { @@ -151,7 +156,7 @@ fn response(status: StatusCode, message: String) -> Response { /// of the coordinator pub fn run_listener( listener_host: &String, - challenge: Arc>, + challenge: Arc>>, ch_resp: Sender, ) -> Handle { let addr: Vec<_> = listener_host @@ -304,7 +309,7 @@ mod tests { let _challenge_state = gen_challenge_state_with_challenge(&gen_dummy_hash(3), &chl_hash); let bid_txid = _challenge_state.bids.iter().next().unwrap().txid; let bid_pubkey = _challenge_state.bids.iter().next().unwrap().pubkey; - let challenge_state = Arc::new(Mutex::new(_challenge_state)); + let challenge_state = Arc::new(Mutex::new(Some(_challenge_state))); // Request get / let data = ""; @@ -444,7 +449,7 @@ mod tests { let _challenge_state = gen_challenge_state_with_challenge(&gen_dummy_hash(1), &chl_hash); let bid_txid = _challenge_state.bids.iter().next().unwrap().txid; let bid_pubkey = _challenge_state.bids.iter().next().unwrap().pubkey; - let challenge_state = Arc::new(Mutex::new(_challenge_state)); + let challenge_state = Arc::new(Mutex::new(Some(_challenge_state))); // Request body data empty let data = ""; @@ -523,7 +528,7 @@ mod tests { assert!(resp_rx.try_recv() == Err(TryRecvError::Empty)); // check receiver empty // No active challenge (hash is None) so request rejected - challenge_state.lock().unwrap().latest_challenge = None; + challenge_state.lock().unwrap().as_mut().unwrap().latest_challenge = None; let data = r#" { "txid": "0000000000000000000000000000000000000000000000000000000000000000", @@ -543,7 +548,7 @@ mod tests { .wait() }) .wait(); - challenge_state.lock().unwrap().latest_challenge = Some(chl_hash); + challenge_state.lock().unwrap().as_mut().unwrap().latest_challenge = Some(chl_hash); assert!(resp_rx.try_recv() == Err(TryRecvError::Empty)); // check receiver empty // Invalid bid on request body (txid does not exist)