Skip to content

Commit

Permalink
Added worker state.
Browse files Browse the repository at this point in the history
This currently contains one method which manages updating the task and
removing the pending tasks.
  • Loading branch information
VoyTechnology committed Mar 13, 2018
1 parent 31d2305 commit 641ef84
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 0 deletions.
1 change: 1 addition & 0 deletions worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod broker;
mod master_interface;
mod operations;
mod server;
mod state;
mod parser;
mod worker_interface;

Expand Down
137 changes: 137 additions & 0 deletions worker/src/state/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use std::fs;
use std::io::Write;
use std::path::PathBuf;

use protobuf::Message;

use cerberus_proto::datatypes::{Task, TaskKind, TaskStatus};
use super::*;

// TODO: Make sure they are consistent or merge them with the master side of the library.
const JOBS_DIR: &str = "jobs";
const TASKS_DIR: &str = "tasks";
const PENDING_MAP_DIR: &str = "pending_map_tasks";
const PENDING_REDUCE_DIR: &str = "pending_reduce_tasks";

pub struct FileStore {
path: PathBuf,
}

impl FileStore {
pub fn new(path: &PathBuf) -> Result<Self, StateError> {
Ok(FileStore { path: path.clone() })
}

/// Creates the path to the job from the given `job_id`.
fn job_dir_path(&self, job_id: &str) -> PathBuf {
self.path.join(JOBS_DIR).join(job_id)
}
}

impl State for FileStore {
fn save_progress(&self, task: &Task) -> Result<(), StateError> {
let job_dir_path = self.job_dir_path(task.get_job_id());
let task_id = task.get_id();

let task_file_path = job_dir_path.join(TASKS_DIR).join(&task_id);
if !task_file_path.exists() {
return Err(StateErrorKind::MissingTask.into());
}

let mut pending_file_path = job_dir_path.clone();
match task.get_kind() {
TaskKind::MAP => pending_file_path.push(PENDING_MAP_DIR),
TaskKind::REDUCE => pending_file_path.push(PENDING_REDUCE_DIR),
}
pending_file_path.push(&task_id);
if !pending_file_path.exists() {
return Err(StateErrorKind::MissingPendingTask.into());
}

let serialized = task.write_to_bytes()
.context(StateErrorKind::TaskSerialisationFailed)?;
fs::OpenOptions::new()
.write(true)
.open(task_file_path)
.context(StateErrorKind::TaskWriteFailed)?
.write_all(&serialized)
.context(StateErrorKind::TaskWriteFailed)?;

// Remove the pending task if its done.
if task.get_status() == TaskStatus::TASK_DONE {
fs::remove_file(pending_file_path).context(StateErrorKind::RemovingPendingTaskFailed)?;
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use std::env::temp_dir;
use std::fs;
use std::fs::File;
use super::*;
use super::State;

fn setup(test_path: &PathBuf, task_id: &str) {
fs::create_dir_all(&test_path).unwrap();

let test_tasks_path = test_path.join(TASKS_DIR);
fs::create_dir_all(&test_tasks_path).unwrap();
// File content doesn't matter, we don't really care about it.
File::create(test_tasks_path.join(&task_id))
.unwrap()
.write_all(task_id.as_bytes())
.unwrap();

let test_pending_tasks_path = test_path.join(PENDING_MAP_DIR);
fs::create_dir_all(&test_pending_tasks_path).unwrap();
// File content doesn't matter
File::create(test_pending_tasks_path.join(&task_id))
.unwrap()
.write_all(task_id.clone().as_bytes())
.unwrap();
}

fn check_map_exists(test_path: &PathBuf, task_id: &str) -> bool {
test_path.join(PENDING_MAP_DIR).join(task_id).exists()
}

fn cleanup(test_dir: &PathBuf) {
fs::remove_dir_all(test_dir).unwrap();
}

#[test]
fn test_file_store() {
let test_dir = temp_dir().join("heracles_test").join("state");

let st = FileStore::new(&test_dir).unwrap();

let task_id = "test_task";
let job_id = "test_job";

let test_job_path = st.job_dir_path(job_id.clone());
assert_eq!(
"/tmp/heracles_test/state/jobs/test_job",
test_job_path.to_str().unwrap()
);
setup(&test_job_path, task_id.clone());

let mut test_task = Task::new();
test_task.set_id(task_id.clone().into());
test_task.set_job_id(job_id.clone().into());
test_task.set_kind(TaskKind::MAP);
test_task.set_status(TaskStatus::TASK_IN_PROGRESS);
st.save_progress(&test_task).unwrap();

assert_eq!(true, check_map_exists(&test_job_path, task_id.clone()));

test_task.set_status(TaskStatus::TASK_DONE);
st.save_progress(&test_task).unwrap();

assert_eq!(false, check_map_exists(&test_job_path, task_id.clone()));

cleanup(&test_dir);
}
}
76 changes: 76 additions & 0 deletions worker/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
mod file;

use self::file::FileStore;

use std::fmt;
use std::fmt::Display;

use failure::*;

use cerberus_proto::datatypes::Task;

#[allow(doc_markdown)]
pub trait State {
/// Updates the task status. If the task.Status is marked as DONE, the task is also removed
/// from the list of pending tasks.
fn save_progress(&self, task: &Task) -> Result<(), StateError>;
}

#[derive(Copy, Clone, Eq, PartialEq, Debug, Fail)]
pub enum StateErrorKind {
#[fail(display = "Failed to connect to state store server.")]
ConnectionFailed,
#[fail(display = "Task is missing in the state store")]
MissingTask,
#[fail(display = "Pending task is missing from the state store")]
MissingPendingTask,
#[fail(display = "Failed to serialise the task proto.")]
TaskSerialisationFailed,
#[fail(display = "Failed to deserialise the task proto.")]
TaskDeserialisationFailed,
#[fail(display = "Failed to write task")]
TaskWriteFailed,
#[fail(display = "Failed to remove pending task")]
RemovingPendingTaskFailed,
}

#[derive(Debug)]
pub struct StateError {
inner: Context<StateErrorKind>,
}

impl StateError {
pub fn kind(&self) -> StateErrorKind {
*self.inner.get_context()
}
}

impl Fail for StateError {
fn cause(&self) -> Option<&Fail> {
self.inner.cause()
}

fn backtrace(&self) -> Option<&Backtrace> {
self.inner.backtrace()
}
}

impl Display for StateError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Display::fmt(&self.inner, f)
}
}

impl From<StateErrorKind> for StateError {
fn from(kind: StateErrorKind) -> StateError {
StateError {
inner: Context::new(kind),
}
}
}

impl From<Context<StateErrorKind>> for StateError {
fn from(inner: Context<StateErrorKind>) -> StateError {
StateError { inner }
}
}

0 comments on commit 641ef84

Please sign in to comment.