Skip to content

Commit

Permalink
Fix event log config for MPI
Browse files Browse the repository at this point in the history
  • Loading branch information
juntyr committed Jun 2, 2024
1 parent aa36dc9 commit 121e0dc
Show file tree
Hide file tree
Showing 16 changed files with 201 additions and 59 deletions.
168 changes: 141 additions & 27 deletions necsim/impls/std/src/event_log/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
// limitations under the License.

use std::{
borrow::Cow,
convert::TryFrom,
fmt,
fs::{self, OpenOptions},
io::BufWriter,
mem::ManuallyDrop,
num::NonZeroUsize,
path::{Component, Path, PathBuf},
};
Expand All @@ -31,8 +33,6 @@ use necsim_core::event::{DispersalEvent, PackedEvent, SpeciationEvent};
use super::EventLogHeader;

#[allow(clippy::module_name_repetitions)]
#[derive(Deserialize)]
#[serde(try_from = "EventLogRecorderRaw")]
pub struct EventLogRecorder {
segment_capacity: NonZeroUsize,
directory: PathBuf,
Expand All @@ -43,24 +43,6 @@ pub struct EventLogRecorder {
record_dispersal: bool,
}

impl TryFrom<EventLogRecorderRaw> for EventLogRecorder {
type Error = Error;

fn try_from(raw: EventLogRecorderRaw) -> Result<Self, Self::Error> {
Self::try_new(&raw.directory, raw.capacity)
}
}

impl Serialize for EventLogRecorder {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
EventLogRecorderRaw {
directory: self.directory.clone(),
capacity: self.segment_capacity,
}
.serialize(serializer)
}
}

impl Drop for EventLogRecorder {
fn drop(&mut self) {
if !self.buffer.is_empty() {
Expand All @@ -75,17 +57,17 @@ impl Drop for EventLogRecorder {
impl EventLogRecorder {
/// # Errors
///
/// Fails to construct iff `path` is not a writable directory.
pub fn try_new(path: &Path, segment_capacity: NonZeroUsize) -> Result<Self> {
if let Some(parent) = path.parent() {
/// Fails to construct iff `directory` is not a writable directory.
pub fn try_new(directory: PathBuf, segment_capacity: NonZeroUsize) -> Result<Self> {
if let Some(parent) = directory.parent() {
fs::create_dir_all(parent).with_context(|| {
format!("failed to ensure that the parent path for {path:?} exists")
format!("failed to ensure that the parent path for {directory:?} exists")
})?;
}

Self {
segment_capacity,
directory: path.to_owned(),
directory,
segment_index: 0_usize,
buffer: Vec::with_capacity(segment_capacity.get()),

Expand Down Expand Up @@ -242,11 +224,143 @@ impl fmt::Debug for EventLogRecorder {
}
}

#[allow(clippy::unsafe_derive_deserialize)]
#[derive(Debug, Deserialize)]
#[serde(try_from = "EventLogRecorderRaw")]
pub struct EventLogConfig {
directory: PathBuf,
#[serde(default = "default_event_log_recorder_segment_capacity")]
capacity: NonZeroUsize,
}

impl<'a> TryFrom<EventLogRecorderRaw<'a>> for EventLogConfig {
type Error = Error;

fn try_from(raw: EventLogRecorderRaw) -> Result<Self, Self::Error> {
Self::try_new(raw.directory.into_owned(), raw.capacity)
}
}

impl Serialize for EventLogConfig {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
EventLogRecorderRaw {
directory: Cow::Borrowed(&self.directory),
capacity: self.capacity,
}
.serialize(serializer)
}
}

impl EventLogConfig {
/// # Errors
///
/// Fails to construct iff the parent of `directory` cannot be created or
/// is not a writable directory.
pub fn try_new(directory: PathBuf, capacity: NonZeroUsize) -> Result<Self> {
Self {
directory,
capacity,
}
.create_parent_directory()
}

/// # Errors
///
/// Fails to construct iff
/// - `child` is not a valid single-component path
/// - newly creating a writable child directory fails
pub fn new_child_log(&self, child: &str) -> Result<Self> {
EventLogRecorder::check_valid_component(child)?;

Self {
directory: self.directory.join(child),
capacity: self.capacity,
}
.create_parent_directory()
}

fn create_parent_directory(mut self) -> Result<Self> {
let Some(name) = self.directory.file_name() else {
anyhow::bail!(
"{:?} does not terminate in a directory name",
self.directory
);
};

let Some(parent) = self.directory.parent() else {
return Ok(self);
};
let parent = if parent.as_os_str().is_empty() {
Path::new(".")
} else {
parent
};

fs::create_dir_all(parent).with_context(|| {
format!(
"failed to ensure that the parent path for {:?} exists",
self.directory
)
})?;

let mut directory = parent.canonicalize()?;
directory.push(name);
self.directory = directory;

let Some(parent) = self.directory.parent() else {
return Ok(self);
};

let metadata = fs::metadata(parent)?;

if !metadata.is_dir() {
return Err(anyhow::anyhow!(
"the parent path of {:?} is not a directory.",
self.directory
));
}

if metadata.permissions().readonly() {
return Err(anyhow::anyhow!(
"the parent path of {:?} is a read-only directory.",
self.directory
));
}

Ok(self)
}

#[must_use]
pub fn directory(&self) -> &Path {
&self.directory
}

/// # Errors
///
/// Fails to construct iff `self.directory()` is not a writable directory.
pub fn create(self) -> Result<EventLogRecorder> {
let this = ManuallyDrop::new(self);
// Safety: self will not be dropped and self.directory is only read once
let directory = unsafe { std::ptr::read(&this.directory) };
EventLogRecorder::try_new(directory, this.capacity)
}
}

impl Drop for EventLogConfig {
fn drop(&mut self) {
// Try to remove the directory parent if it is empty
if let Some(parent) = self.directory.parent() {
std::mem::drop(fs::remove_dir(parent));
}
}
}

#[derive(Serialize, Deserialize)]
#[serde(rename = "EventLog")]
#[serde(deny_unknown_fields)]
struct EventLogRecorderRaw {
directory: PathBuf,
struct EventLogRecorderRaw<'a> {
#[serde(borrow)]
directory: Cow<'a, Path>,
#[serde(default = "default_event_log_recorder_segment_capacity")]
capacity: NonZeroUsize,
}
Expand Down
6 changes: 3 additions & 3 deletions necsim/partitioning/monolithic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use necsim_partitioning_core::{
LocalPartition, MigrationMode, Partitioning,
};

use necsim_impls_std::event_log::recorder::EventLogRecorder;
use necsim_impls_std::event_log::recorder::EventLogConfig;

pub mod live;
pub mod recorded;
Expand Down Expand Up @@ -50,7 +50,7 @@ impl<'de> Deserialize<'de> for MonolithicPartitioning {
}

impl Partitioning for MonolithicPartitioning {
type Auxiliary = Option<EventLogRecorder>;
type Auxiliary = Option<EventLogConfig>;
type FinalisableReporter<R: Reporter> = FinalisableMonolithicReporter<R>;
type LocalPartition<R: Reporter> = MonolithicLocalPartition<R>;

Expand All @@ -73,7 +73,7 @@ impl Partitioning for MonolithicPartitioning {
MonolithicLocalPartition::Recorded(Box::new(
recorded::RecordedMonolithicLocalPartition::from_reporter_and_recorder(
reporter_context.try_build()?,
event_log,
event_log.create()?,
),
))
} else {
Expand Down
5 changes: 3 additions & 2 deletions necsim/partitioning/mpi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use necsim_core::{
},
};

use necsim_impls_std::event_log::recorder::EventLogRecorder;
use necsim_impls_std::event_log::recorder::EventLogConfig;
use necsim_partitioning_core::{
partition::PartitionSize,
reporter::{FinalisableReporter, ReporterContext},
Expand Down Expand Up @@ -165,7 +165,7 @@ impl MpiPartitioning {
}

impl Partitioning for MpiPartitioning {
type Auxiliary = Option<EventLogRecorder>;
type Auxiliary = Option<EventLogConfig>;
type FinalisableReporter<R: Reporter> = FinalisableMpiReporter<R>;
type LocalPartition<R: Reporter> = MpiLocalPartition<'static, R>;

Expand Down Expand Up @@ -200,6 +200,7 @@ impl Partitioning for MpiPartitioning {

let partition_event_log = event_log
.new_child_log(&self.world.rank().to_string())
.and_then(EventLogConfig::create)
.context(MpiLocalPartitionError::InvalidEventSubLog)?;

let mut mpi_local_global_wait = (false, false);
Expand Down
6 changes: 4 additions & 2 deletions necsim/partitioning/mpi/src/partition/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,10 @@ impl<'p> MpiCommonPartition<'p> {
{
*global_wait
} else {
// Block until any new message has been received
self.world.any_process().probe();
// This partition doesn't have any work right now but we have
// to do another round of voting, so let's yield to not busy
// wait - we'll be woken up if more work comes in
std::thread::yield_now();

true
};
Expand Down
2 changes: 1 addition & 1 deletion necsim/partitioning/mpi/src/partition/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl<'p, R: Reporter> LocalPartition<R> for MpiRootPartition<'p, R> {
fn wait_for_termination(&mut self) -> ControlFlow<(), ()> {
let result = self.common.wait_for_termination();

if result.is_break() || !self.common.has_ongoing_termination_vote() {
if !self.common.has_ongoing_termination_vote() {
// Check for pending progress updates from other partitions
let remaining = self.all_remaining[self.get_partition().rank() as usize];
self.report_progress(&remaining.into());
Expand Down
5 changes: 3 additions & 2 deletions necsim/partitioning/threads/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use necsim_core::reporter::{
FilteredReporter, Reporter,
};

use necsim_impls_std::event_log::recorder::EventLogRecorder;
use necsim_impls_std::event_log::recorder::EventLogConfig;
use necsim_partitioning_core::{
partition::PartitionSize,
reporter::{FinalisableReporter, ReporterContext},
Expand Down Expand Up @@ -120,7 +120,7 @@ impl ThreadsPartitioning {
}

impl Partitioning for ThreadsPartitioning {
type Auxiliary = Option<EventLogRecorder>;
type Auxiliary = Option<EventLogConfig>;
type FinalisableReporter<R: Reporter> = FinalisableThreadsReporter<R>;
type LocalPartition<R: Reporter> = ThreadsLocalPartition<R>;

Expand Down Expand Up @@ -180,6 +180,7 @@ impl Partitioning for ThreadsPartitioning {
.map(|partition| {
event_log
.new_child_log(&partition.rank().to_string())
.and_then(EventLogConfig::create)
.context(ThreadsLocalPartitionError::InvalidEventSubLog)
})
.collect::<Result<Vec<_>, _>>()?;
Expand Down
20 changes: 18 additions & 2 deletions rustcoalescence/src/args/config/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use necsim_core::{
cogs::{MathsCore, RngCore},
reporter::Reporter,
};
use necsim_impls_std::event_log::recorder::EventLogRecorder;
use necsim_impls_std::event_log::recorder::EventLogConfig;
use rustcoalescence_algorithms::AlgorithmDispatch;
use rustcoalescence_scenarios::Scenario;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -52,6 +52,14 @@ impl Partitioning {
}
}

#[cfg_attr(
not(any(
feature = "gillespie-algorithms",
feature = "independent-algorithm",
feature = "cuda-algorithm"
)),
allow(dead_code)
)]
pub fn get_logical_partition_size<
M: MathsCore,
G: RngCore<M>,
Expand All @@ -77,7 +85,15 @@ impl Partitioning {
}
}

pub fn will_report_live(&self, event_log: &Option<EventLogRecorder>) -> bool {
#[cfg_attr(
not(any(
feature = "gillespie-algorithms",
feature = "independent-algorithm",
feature = "cuda-algorithm"
)),
allow(dead_code)
)]
pub fn will_report_live(&self, event_log: &Option<EventLogConfig>) -> bool {
// TODO: get this information from the partitioning
match self {
Partitioning::Monolithic(_) => event_log.is_none(),
Expand Down
4 changes: 2 additions & 2 deletions rustcoalescence/src/cli/simulate/dispatch/fallback.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use necsim_core_bond::{NonNegativeF64, OpenClosedUnitF64 as PositiveUnitF64};
use necsim_impls_std::event_log::recorder::EventLogRecorder;
use necsim_impls_std::event_log::recorder::EventLogConfig;
use necsim_plugins_core::import::AnyReporterPluginVec;

use crate::{
Expand All @@ -14,7 +14,7 @@ use super::super::BufferingSimulateArgsBuilder;
#[allow(clippy::too_many_arguments, clippy::needless_pass_by_value)]
pub(in super::super) fn dispatch(
_partitioning: Partitioning,
_event_log: Option<EventLogRecorder>,
_event_log: Option<EventLogConfig>,
_reporters: AnyReporterPluginVec,

_speciation_probability_per_generation: PositiveUnitF64,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use necsim_core::reporter::Reporter;
use necsim_core_bond::{NonNegativeF64, OpenClosedUnitF64 as PositiveUnitF64};
use necsim_impls_std::event_log::recorder::EventLogRecorder;
use necsim_impls_std::event_log::recorder::EventLogConfig;
use necsim_partitioning_core::reporter::ReporterContext;

use rustcoalescence_algorithms::AlgorithmDefaults;
Expand Down Expand Up @@ -113,7 +113,7 @@ macro_rules! match_scenario_algorithm {
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
pub(super) fn dispatch<R: Reporter, P: ReporterContext<Reporter = R>>(
partitioning: Partitioning,
event_log: Option<EventLogRecorder>,
event_log: Option<EventLogConfig>,
reporter_context: P,

speciation_probability_per_generation: PositiveUnitF64,
Expand Down
Loading

0 comments on commit 121e0dc

Please sign in to comment.