From 209c859ad47f9e19c776e66625d7a7b1b5842ccd Mon Sep 17 00:00:00 2001 From: Wojtek Bednarzak Date: Sun, 11 Mar 2018 21:31:43 +0000 Subject: [PATCH] Add file state store. This adds the code for saving the state as a file. --- master/src/state/file.rs | 156 +++++++++++++++++++++++++++++++++++++++ master/src/state/mod.rs | 54 ++++++++++---- 2 files changed, 196 insertions(+), 14 deletions(-) create mode 100644 master/src/state/file.rs diff --git a/master/src/state/file.rs b/master/src/state/file.rs new file mode 100644 index 0000000..c24e12e --- /dev/null +++ b/master/src/state/file.rs @@ -0,0 +1,156 @@ +use std::fs; +use std::fs::File; +use std::io::Write; +use std::path::PathBuf; + +use protobuf; +use protobuf::Message; + +use super::*; + +const JOB_DIR: &str = "jobs"; +const SCHEDULERS_DIR: &str = "schedulers"; +const JOB_SAVE_FILE: &str = "request"; +const TASKS_DIR: &str = "tasks"; +const PENDING_MAP_DIR: &str = "pending_map_tasks"; +const PENDING_REDUCE_DIR: &str = "pending_reduce_tasks"; + +/// Save the state data in filesystem +pub struct FileStore { + path: PathBuf, +} + +impl FileStore { + // Creates a new file backed state storing + pub fn new(path: &PathBuf) -> Result { + fs::create_dir_all(path.join(JOB_DIR)).context(StateErrorKind::JobsFolderCreationFailed)?; + Ok(FileStore { path: path.clone() }) + } + + /// Prepares a job directory for a given job. If a directory already exists, or if everything + /// went ok, [`PathBuf`] is returned with the location of the job folder. + /// [`std::result::Ok`] is returned, otherwise if an error occured while creating the + /// directory, [`StateError`] is raised; + fn prepare_job_directory(&self, job_dir_path: &PathBuf) -> Result<(), StateError> { + if job_dir_path.exists() { + return Ok(()); + } + fs::create_dir_all(job_dir_path.join(TASKS_DIR)) + .context(StateErrorKind::TasksFolderCreationFailed)?; + fs::create_dir_all(job_dir_path.join(PENDING_MAP_DIR)) + .context(StateErrorKind::MapTasksFolderCreationFailed)?; + fs::create_dir_all(job_dir_path.join(PENDING_REDUCE_DIR)) + .context(StateErrorKind::ReduceTasksFolderCreationFailed)?; + Ok(()) + } + + /// Creates the path to the job from the given `job_id`. + fn job_dir_path(&self, job_id: &str) -> PathBuf { + self.path.join(JOB_DIR).join(job_id) + } + + // TODO: Add to [`State`] trait. + /// Removes a job and all of its contents from the list of jobs. + fn remove_job(&self, job_id: &str) -> Result<(), StateError> { + Ok(fs::remove_dir_all(self.job_dir_path(job_id)) + .context(StateErrorKind::JobsFolderRemoveFailed)?) + } + + /// Lists pending tasks for a specified type of task in a job. + fn list_pending_tasks(&self, job_id: &str, kind: TaskKind) -> Result, StateError> { + let job_dir_path = self.job_dir_path(job_id); + + let mut pending_dir_path = job_dir_path.clone(); + match kind { + TaskKind::MAP => pending_dir_path.push(PENDING_MAP_DIR), + TaskKind::REDUCE => pending_dir_path.push(PENDING_REDUCE_DIR), + } + + fs::read_dir(&pending_dir_path) + .context(StateErrorKind::PendingTasksListFailed)? + .map(|entry| { + // We can ignore the last unwrap as its only a precaution if the file name is not + // a string for some random OS. + Ok(entry + .context(StateErrorKind::GenericIOError)? + .file_name() + .into_string() + .unwrap()) + }) + .collect::, StateError>>() + } + + /// Gets the full task details of map or reduce tasks which have not yet completed. + fn pending_tasks_data(&self, job_id: &str, kind: TaskKind) -> Result, StateError> { + let tasks_dir_path = self.job_dir_path(job_id).join(TASKS_DIR); + + self.list_pending_tasks(job_id, kind)? + .iter() + .map(|task_id| { + let mut f = File::open(tasks_dir_path.join(task_id)) + .context(StateErrorKind::TaskFileOpenFailed)?; + Ok(protobuf::core::parse_from_reader::(&mut f) + .context(StateErrorKind::TaskDeserialisationFailed)?) + }) + .collect::, StateError>>() + } +} + +impl State for FileStore { + fn save_job(&self, job: &Job) -> Result<(), StateError> { + let job_dir_path = self.job_dir_path(job.get_id()); + if let Err(err) = self.prepare_job_directory(&job_dir_path) { + self.remove_job(job.get_id())?; + return Err(err); + } + + let serialized = job.write_to_bytes() + .context(StateErrorKind::JobSerialisationFailed)?; + Ok(File::create(&job_dir_path.join(JOB_SAVE_FILE)) + .context(StateErrorKind::JobWriteFailed)? + .write_all(&serialized) + .context(StateErrorKind::JobWriteFailed)?) + } + + fn save_task(&self, task: &Task) -> Result<(), StateError> { + let job_dir_path = self.job_dir_path(task.get_job_id()); + let task_id = task.get_id(); + + let serialized = task.write_to_bytes() + .context(StateErrorKind::TaskSerialisationFailed)?; + File::create(&job_dir_path.join(TASKS_DIR).join(task_id)) + .context(StateErrorKind::TaskWriteFailed)? + .write_all(&serialized) + .context(StateErrorKind::TaskWriteFailed)?; + + // Save to either map or reduce pending tasks + let mut pending_file_path = job_dir_path.clone(); + match task.get_kind() { + TaskKind::MAP => pending_file_path.push(PENDING_MAP_DIR), + TaskKind::REDUCE => pending_file_path.push(PENDING_REDUCE_DIR), + } + pending_file_path.push(task_id); + + File::create(pending_file_path) + .context(StateErrorKind::PendingTaskWriteFailed)? + .write_all(&task_id.as_bytes()) + .context(StateErrorKind::PendingTaskWriteFailed)?; + Ok(()) + } + + fn pending_map_tasks(&self, job: &Job) -> Result, StateError> { + self.pending_tasks_data(job.get_id().into(), TaskKind::MAP) + } + + fn pending_reduce_tasks(&self, job: &Job) -> Result, StateError> { + self.pending_tasks_data(job.get_id().into(), TaskKind::REDUCE) + } + + fn map_done(&self, job: &Job) -> Box> { + unimplemented!() + } + + fn reduce_done(&self, job: &Job) -> Box> { + unimplemented!() + } +} diff --git a/master/src/state/mod.rs b/master/src/state/mod.rs index 70c7a24..e11b9b7 100644 --- a/master/src/state/mod.rs +++ b/master/src/state/mod.rs @@ -1,40 +1,66 @@ +mod file; + +use self::file::FileStore; + use std::fmt; use std::fmt::Display; use failure::*; use futures::Future; -use cerberus_proto::datatypes::{Job, Task}; +use cerberus_proto::datatypes::{Job, Task, TaskKind}; #[allow(doc_markdown)] /// Interface for creating connections to state stores, such as etcd or TiKV etc. pub trait State { - /// Marks a specific job as owned by a certain scheduler. - fn mark_job_scheduler(job: &Job) -> Result<(), StateError>; - /// Get a list of unfinished jobs - fn get_unfinished_jobs() -> Result, StateError>; - /// List of jobs assigned to current scheduler - fn list_scheduler_jobs() -> Result, StateError>; /// Serialize the job and save it in the state store so it can be loaded later. - fn save_job(job: &Job) -> Result<(), StateError>; + fn save_job(&self, job: &Job) -> Result<(), StateError>; /// Adds a task to the list of tasks and add it to pending - fn save_task(task: &Task) -> Result<(), StateError>; + fn save_task(&self, task: &Task) -> Result<(), StateError>; /// List of pending map tasks for a specific job. - fn pending_map_tasks(job: &Job) -> Result, StateError>; + fn pending_map_tasks(&self, job: &Job) -> Result, StateError>; /// List of pending reduce tasks. - fn pending_reduce_tasks(job: &Job) -> Result, StateError>; + fn pending_reduce_tasks(&self, job: &Job) -> Result, StateError>; /// Returns a future when all map tasks are done. - fn map_done(job: &Job) -> Future; + fn map_done(&self, job: &Job) -> Box>; /// Returns a future when all reduce tasks are done. - fn reduce_done(job: &Job) -> Future; + fn reduce_done(&self, job: &Job) -> Box>; } #[derive(Copy, Clone, Eq, PartialEq, Debug, Fail)] pub enum StateErrorKind { #[fail(display = "Failed to connect to state store server.")] ConnectionFailed, - #[fail(display = "Failed to serialize the job proto.")] + #[fail(display = "Failed precondition")] + PreconditionFailed, + #[fail(display = "Unable to create required jobs folder")] + JobsFolderCreationFailed, + #[fail(display = "Unable to remove jobs folder")] + JobsFolderRemoveFailed, + #[fail(display = "Unable to create required tasks folder")] + TasksFolderCreationFailed, + #[fail(display = "Unable to create required map tasks folder")] + MapTasksFolderCreationFailed, + #[fail(display = "Unable to create required reduce tasks folder")] + ReduceTasksFolderCreationFailed, + #[fail(display = "Unable to list pending tasks")] + PendingTasksListFailed, + #[fail(display = "An unknown I/O error has occurred.")] + GenericIOError, + #[fail(display = "Failed to serialise the job proto.")] JobSerialisationFailed, + #[fail(display = "Failed to serialise the task proto.")] + TaskSerialisationFailed, + #[fail(display = "Failed to deserialise the task proto.")] + TaskDeserialisationFailed, + #[fail(display = "Failed to write task")] + JobWriteFailed, + #[fail(display = "Unable to open task file")] + TaskFileOpenFailed, + #[fail(display = "Failed to write task")] + TaskWriteFailed, + #[fail(display = "Failed to create pending task")] + PendingTaskWriteFailed, #[fail(display = "Failed operation.")] OperationFailed, }