Skip to content
This repository has been archived by the owner on Feb 27, 2024. It is now read-only.

Commit

Permalink
Merge pull request #46 from gandro/timeouts
Browse files Browse the repository at this point in the history
Implement job submission timeouts
  • Loading branch information
gandro authored Mar 6, 2018
2 parents a7f733a + 5de03d7 commit fcc3a4e
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 23 deletions.
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/topology-generator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ serde = "1.0"
serde_derive = "1.0"
typename = "0.1"
futures = "0.1"
tokio-timer = "0.1"
futures-timer = "0.1"

[dependencies.strymon_job]
path = "../../src/strymon_job"
Expand Down
12 changes: 4 additions & 8 deletions apps/topology-generator/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

extern crate rand;
extern crate futures;
extern crate tokio_timer;
extern crate futures_timer;

extern crate strymon_job;
extern crate strymon_communication;
Expand All @@ -21,9 +21,9 @@ use std::time::Duration;

use futures::future::Future;
use futures::stream::{Stream, FuturesUnordered};
use futures_timer::Delay;

use rand::{ThreadRng, Rng};
use tokio_timer::Timer;

use timely::dataflow::operators::{Input, Probe};

Expand Down Expand Up @@ -97,7 +97,6 @@ struct TopologyServer {
pending: FuturesUnordered<Box<Future<Item=Vec<Connection>, Error=io::Error>>>,
server: Service<TopologyService>,
topo: TopologyManager,
timer: Timer,
}

enum Event {
Expand All @@ -109,7 +108,6 @@ impl TopologyServer {
fn new(topo: Topology, server: Service<TopologyService>) -> Self {
TopologyServer {
pending: FuturesUnordered::new(),
timer: Timer::default(),
server: server,
topo: TopologyManager::new(topo),
}
Expand All @@ -133,10 +131,8 @@ impl TopologyServer {
.map(|&e| (Entity::Connection(e), -1))
.collect();

let restore = self.timer
.sleep(Duration::from_secs(FAULT_DURATION_SECS))
.and_then(move |_| Ok(removed))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err));
let restore = Delay::new(Duration::from_secs(FAULT_DURATION_SECS))
.and_then(move |()| Ok(removed));

resp.respond(Ok(()));
self.pending.push(Box::new(restore));
Expand Down
1 change: 1 addition & 0 deletions src/strymon_coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ authors = ["Sebastian Wicki <[email protected]>"]
[dependencies]
log = "0.4"
futures = "0.1"
futures-timer = "0.1"
rand = "0.3"
tokio-core = "0.1"

Expand Down
16 changes: 13 additions & 3 deletions src/strymon_coordinator/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use std::collections::{BTreeMap, HashMap, VecDeque};
use std::collections::btree_map::Entry;
use std::rc::{Rc, Weak};
use std::cell::RefCell;
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use futures::{self, Future};
use futures::stream::{self, Stream};
use futures::unsync::oneshot::{channel, Sender};
use futures_timer::ext::FutureExt;
use tokio_core::reactor::Handle;

use rand;
Expand All @@ -41,6 +42,8 @@ use catalog::Catalog;

use super::util::Generator;

const JOB_SUBMISSION_TIMEOUT_SECS: u64 = 30;

/// The connection and available `timely_communication` ports of an registered executor.
struct ExecutorResources {
tx: Outgoing,
Expand Down Expand Up @@ -276,7 +279,6 @@ impl Coordinator {
self.reactor.spawn(response);
}

// TODO(swicki) add a timeout that triggers SpawnFailed here
debug!("add pending submission for {:?}", job.id);
let (tx, rx) = channel();
let state = JobState::Spawning {
Expand All @@ -293,7 +295,15 @@ impl Coordinator {
};
self.jobs.insert(jobid, job_resources);

Box::new(rx.then(|res| res.expect("submission canceled?!")))
let response = rx
.then(|res| res.unwrap_or(Err(SubmissionError::Other)))
.timeout(Duration::from_secs(JOB_SUBMISSION_TIMEOUT_SECS))
.map_err(move |err| {
handle.borrow_mut().cancel_submission(jobid, err.clone());
err
});

Box::new(response)
}

/// Cancels a pending submission, informing already available workers to shut down.
Expand Down
1 change: 1 addition & 0 deletions src/strymon_coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
extern crate log;
extern crate rand;
extern crate futures;
extern crate futures_timer;
extern crate tokio_core;

extern crate strymon_rpc;
Expand Down
12 changes: 12 additions & 0 deletions src/strymon_rpc/src/coordinator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
pub mod catalog;

use std::io;

use num_traits::FromPrimitive;

use strymon_model::*;
Expand Down Expand Up @@ -80,6 +82,16 @@ pub enum SubmissionError {
ExecutorUnreachable,
/// An executor reported an error while spawning.
SpawnError(::executor::SpawnError),
/// The coordinator timed out waiting for worker groups to arrive
TimedOut,
/// An unknown error occured.
Other,
}

impl From<io::Error> for SubmissionError {
fn from(_: io::Error) -> SubmissionError {
SubmissionError::TimedOut
}
}

impl Request<CoordinatorRPC> for Submission {
Expand Down

0 comments on commit fcc3a4e

Please sign in to comment.