diff --git a/Cargo.lock b/Cargo.lock index 0ade0fc..43e4c3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -215,6 +215,14 @@ name = "futures" version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "futures-timer" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "getopts" version = "0.2.17" @@ -576,6 +584,7 @@ name = "strymon_coordinator" version = "0.1.0" dependencies = [ "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)", "strymon_communication 0.1.2", @@ -798,15 +807,6 @@ dependencies = [ "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "tokio-timer" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", - "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "topology_generator" version = "0.1.1" @@ -814,13 +814,13 @@ dependencies = [ "abomonation 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "abomonation_derive 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "strymon_communication 0.1.2", "strymon_job 0.1.2", "timely 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "typename 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -947,6 +947,7 @@ dependencies = [ "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "0bab5b5e94f5c31fc764ba5dd9ad16568aae5d4825538c01d6bca680c9bf94a7" +"checksum futures-timer 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a5cedfe9b6dc756220782cc1ba5bcb1fa091cdcba155e40d3556159c3db58043" "checksum getopts 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)" = "b900c08c1939860ce8b54dc6a89e26e00c04c380fd0e09796799bd7f12861e05" "checksum iovec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b6e8b9c2247fcf6c6a1151f1156932be5606c9fd6f55a2d7f9fc1cb29386b2f7" "checksum itoa 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8324a32baf01e2ae060e9de58ed0bc2320c9a2833491ee36cd3b4c414de4db8c" @@ -1001,7 +1002,6 @@ dependencies = [ "checksum tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "514aae203178929dbf03318ad7c683126672d4d96eccb77b29603d33c9e25743" "checksum tokio-process 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "2e76e0cd21a4ae5362697e85f98aa5d26c88f09ce9fc367b57c0643ba0b022c2" "checksum tokio-signal 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "57c4031b97651d28c87a0a071e1c2809d70609d3120ce285b302eb7d52c96906" -"checksum tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6131e780037787ff1b3f8aad9da83bca02438b72277850dd6ad0d455e0e20efc" "checksum typename 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "07c6494e04fa9e31404425680954aac69080337a6e9a946b475578962674482f" "checksum typename_derive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9be293eee17e33e38c7f0c9ab50b4b8a3a41599bff6325fba57427713a7a3af1" "checksum unicode-width 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "bf3a113775714a22dcb774d8ea3655c53a32debae63a063acc00a91cc586245f" diff --git a/apps/topology-generator/Cargo.toml b/apps/topology-generator/Cargo.toml index 97138fb..b305c47 100644 --- a/apps/topology-generator/Cargo.toml +++ b/apps/topology-generator/Cargo.toml @@ -12,7 +12,7 @@ serde = "1.0" serde_derive = "1.0" typename = "0.1" futures = "0.1" -tokio-timer = "0.1" +futures-timer = "0.1" [dependencies.strymon_job] path = "../../src/strymon_job" diff --git a/apps/topology-generator/src/bin/main.rs b/apps/topology-generator/src/bin/main.rs index 19421fd..2a359b6 100644 --- a/apps/topology-generator/src/bin/main.rs +++ b/apps/topology-generator/src/bin/main.rs @@ -8,7 +8,7 @@ extern crate rand; extern crate futures; -extern crate tokio_timer; +extern crate futures_timer; extern crate strymon_job; extern crate strymon_communication; @@ -21,9 +21,9 @@ use std::time::Duration; use futures::future::Future; use futures::stream::{Stream, FuturesUnordered}; +use futures_timer::Delay; use rand::{ThreadRng, Rng}; -use tokio_timer::Timer; use timely::dataflow::operators::{Input, Probe}; @@ -97,7 +97,6 @@ struct TopologyServer { pending: FuturesUnordered, Error=io::Error>>>, server: Service, topo: TopologyManager, - timer: Timer, } enum Event { @@ -109,7 +108,6 @@ impl TopologyServer { fn new(topo: Topology, server: Service) -> Self { TopologyServer { pending: FuturesUnordered::new(), - timer: Timer::default(), server: server, topo: TopologyManager::new(topo), } @@ -133,10 +131,8 @@ impl TopologyServer { .map(|&e| (Entity::Connection(e), -1)) .collect(); - let restore = self.timer - .sleep(Duration::from_secs(FAULT_DURATION_SECS)) - .and_then(move |_| Ok(removed)) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err)); + let restore = Delay::new(Duration::from_secs(FAULT_DURATION_SECS)) + .and_then(move |()| Ok(removed)); resp.respond(Ok(())); self.pending.push(Box::new(restore)); diff --git a/src/strymon_coordinator/Cargo.toml b/src/strymon_coordinator/Cargo.toml index 7241a7e..c297ce5 100644 --- a/src/strymon_coordinator/Cargo.toml +++ b/src/strymon_coordinator/Cargo.toml @@ -6,6 +6,7 @@ authors = ["Sebastian Wicki "] [dependencies] log = "0.4" futures = "0.1" +futures-timer = "0.1" rand = "0.3" tokio-core = "0.1" diff --git a/src/strymon_coordinator/src/handler.rs b/src/strymon_coordinator/src/handler.rs index b8b9e41..6a23eff 100644 --- a/src/strymon_coordinator/src/handler.rs +++ b/src/strymon_coordinator/src/handler.rs @@ -20,11 +20,12 @@ use std::collections::{BTreeMap, HashMap, VecDeque}; use std::collections::btree_map::Entry; use std::rc::{Rc, Weak}; use std::cell::RefCell; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{self, Future}; use futures::stream::{self, Stream}; use futures::unsync::oneshot::{channel, Sender}; +use futures_timer::ext::FutureExt; use tokio_core::reactor::Handle; use rand; @@ -41,6 +42,8 @@ use catalog::Catalog; use super::util::Generator; +const JOB_SUBMISSION_TIMEOUT_SECS: u64 = 30; + /// The connection and available `timely_communication` ports of an registered executor. struct ExecutorResources { tx: Outgoing, @@ -276,7 +279,6 @@ impl Coordinator { self.reactor.spawn(response); } - // TODO(swicki) add a timeout that triggers SpawnFailed here debug!("add pending submission for {:?}", job.id); let (tx, rx) = channel(); let state = JobState::Spawning { @@ -293,7 +295,15 @@ impl Coordinator { }; self.jobs.insert(jobid, job_resources); - Box::new(rx.then(|res| res.expect("submission canceled?!"))) + let response = rx + .then(|res| res.unwrap_or(Err(SubmissionError::Other))) + .timeout(Duration::from_secs(JOB_SUBMISSION_TIMEOUT_SECS)) + .map_err(move |err| { + handle.borrow_mut().cancel_submission(jobid, err.clone()); + err + }); + + Box::new(response) } /// Cancels a pending submission, informing already available workers to shut down. diff --git a/src/strymon_coordinator/src/lib.rs b/src/strymon_coordinator/src/lib.rs index e96fdd4..387c6a6 100644 --- a/src/strymon_coordinator/src/lib.rs +++ b/src/strymon_coordinator/src/lib.rs @@ -63,6 +63,7 @@ extern crate log; extern crate rand; extern crate futures; +extern crate futures_timer; extern crate tokio_core; extern crate strymon_rpc; diff --git a/src/strymon_rpc/src/coordinator/mod.rs b/src/strymon_rpc/src/coordinator/mod.rs index fb71698..870dda1 100644 --- a/src/strymon_rpc/src/coordinator/mod.rs +++ b/src/strymon_rpc/src/coordinator/mod.rs @@ -11,6 +11,8 @@ pub mod catalog; +use std::io; + use num_traits::FromPrimitive; use strymon_model::*; @@ -80,6 +82,16 @@ pub enum SubmissionError { ExecutorUnreachable, /// An executor reported an error while spawning. SpawnError(::executor::SpawnError), + /// The coordinator timed out waiting for worker groups to arrive + TimedOut, + /// An unknown error occured. + Other, +} + +impl From for SubmissionError { + fn from(_: io::Error) -> SubmissionError { + SubmissionError::TimedOut + } } impl Request for Submission {