Skip to content

Commit

Permalink
Remove now extraneous time-steps reduction
Browse files Browse the repository at this point in the history
  • Loading branch information
juntyr committed May 27, 2024
1 parent 9ac90c4 commit 72121b0
Show file tree
Hide file tree
Showing 19 changed files with 37 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,8 @@ pub fn simulate<

proxy.local_partition().report_progress_sync(0_u64);

let (global_time, global_steps) = proxy
.local_partition()
.reduce_global_time_steps(max_time, total_steps);
let local_time = max_time;
let local_steps = total_steps;

(Status::Done, global_time, global_steps, lineages)
(Status::Done, local_time, local_steps, lineages)
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,8 @@ pub fn simulate<

proxy.local_partition().report_progress_sync(0_u64);

let (global_time, global_steps) = proxy
.local_partition()
.reduce_global_time_steps(max_time, total_steps);
let local_time = max_time;
let local_steps = total_steps;

(Status::Done, global_time, global_steps, lineages)
(Status::Done, local_time, local_steps, lineages)
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ pub fn simulate<
);

let status = Status::paused(local_partition.reduce_vote_any(!slow_lineages.is_empty()));
let (global_time, global_steps) =
local_partition.reduce_global_time_steps(max_time, total_steps);
let local_time = max_time;
let local_steps = total_steps;
let lineages = slow_lineages.into_iter().map(|(lineage, _)| lineage);

(status, global_time, global_steps, lineages)
(status, local_time, local_steps, lineages)
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,8 @@ pub fn simulate<

local_partition.report_progress_sync(0_u64);

let (global_time, global_steps) = local_partition.reduce_global_time_steps(
simulation.active_lineage_sampler().get_last_event_time(),
total_steps,
);
let local_time = simulation.active_lineage_sampler().get_last_event_time();
let local_steps = total_steps;

(Status::Done, global_time, global_steps)
(Status::Done, local_time, local_steps)
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,8 @@ pub fn simulate<

local_partition.report_progress_sync(0_u64);

let (global_time, global_steps) = local_partition.reduce_global_time_steps(
simulation.active_lineage_sampler().get_last_event_time(),
total_steps,
);
let local_time = simulation.active_lineage_sampler().get_last_event_time();
let local_steps = total_steps;

(Status::Done, global_time, global_steps)
(Status::Done, local_time, local_steps)
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ pub fn simulate<
local_partition
.reduce_vote_any(simulation.active_lineage_sampler().number_active_lineages() > 0),
);
let (global_time, global_steps) = local_partition.reduce_global_time_steps(time, steps);
let local_time = time;
let local_steps = steps;

(status, global_time, global_steps)
(status, local_time, local_steps)
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,8 @@ pub fn simulate<

proxy.local_partition().report_progress_sync(0_u64);

let (global_time, global_steps) = proxy.local_partition().reduce_global_time_steps(
simulation.active_lineage_sampler().get_last_event_time(),
total_steps,
);
let local_time = simulation.active_lineage_sampler().get_last_event_time();
let local_steps = total_steps;

(Status::Done, global_time, global_steps)
(Status::Done, local_time, local_steps)
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,8 @@ pub fn simulate<

local_partition.report_progress_sync(0_u64);

let (global_time, global_steps) = local_partition.reduce_global_time_steps(
simulation.active_lineage_sampler().get_last_event_time(),
total_steps,
);
let local_time = simulation.active_lineage_sampler().get_last_event_time();
let local_steps = total_steps;

(Status::Done, global_time, global_steps)
(Status::Done, local_time, local_steps)
}
8 changes: 1 addition & 7 deletions necsim/partitioning/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use necsim_core::{
lineage::MigratingLineage,
reporter::{boolean::Boolean, Reporter},
};
use necsim_core_bond::{NonNegativeF64, PositiveF64};
use necsim_core_bond::PositiveF64;

pub mod context;
pub mod iterator;
Expand Down Expand Up @@ -86,12 +86,6 @@ pub trait LocalPartition<'p, R: Reporter>: Sized {

fn wait_for_termination(&mut self) -> ControlFlow<(), ()>;

fn reduce_global_time_steps(
&mut self,
local_time: NonNegativeF64,
local_steps: u64,
) -> (NonNegativeF64, u64);

fn report_progress_sync(&mut self, remaining: u64);

fn finalise_reporting(self);
Expand Down
15 changes: 1 addition & 14 deletions necsim/partitioning/monolithic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use necsim_core::{
lineage::MigratingLineage,
reporter::{boolean::True, Reporter},
};
use necsim_core_bond::{NonNegativeF64, PositiveF64};
use necsim_core_bond::PositiveF64;

use necsim_partitioning_core::{
context::ReporterContext,
Expand Down Expand Up @@ -164,19 +164,6 @@ impl<'p, R: Reporter> LocalPartition<'p, R> for MonolithicLocalPartition<R> {
}
}

fn reduce_global_time_steps(
&mut self,
local_time: NonNegativeF64,
local_steps: u64,
) -> (NonNegativeF64, u64) {
match self {
Self::Live(partition) => partition.reduce_global_time_steps(local_time, local_steps),
Self::Recorded(partition) => {
partition.reduce_global_time_steps(local_time, local_steps)
},
}
}

fn report_progress_sync(&mut self, remaining: u64) {
match self {
Self::Live(partition) => partition.report_progress_sync(remaining),
Expand Down
10 changes: 1 addition & 9 deletions necsim/partitioning/monolithic/src/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use necsim_core::{
lineage::MigratingLineage,
reporter::{boolean::True, FilteredReporter, Reporter},
};
use necsim_core_bond::{NonNegativeF64, PositiveF64};
use necsim_core_bond::PositiveF64;

use necsim_partitioning_core::{
context::ReporterContext, iterator::ImmigrantPopIterator, partition::Partition, LocalPartition,
Expand Down Expand Up @@ -87,14 +87,6 @@ impl<'p, R: Reporter> LocalPartition<'p, R> for LiveMonolithicLocalPartition<R>
}
}

fn reduce_global_time_steps(
&mut self,
local_time: NonNegativeF64,
local_steps: u64,
) -> (NonNegativeF64, u64) {
(local_time, local_steps)
}

fn report_progress_sync(&mut self, remaining: u64) {
self.reporter.report_progress(&remaining.into());
}
Expand Down
10 changes: 1 addition & 9 deletions necsim/partitioning/monolithic/src/recorded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use necsim_core::{
FilteredReporter, Reporter,
},
};
use necsim_core_bond::{NonNegativeF64, PositiveF64};
use necsim_core_bond::PositiveF64;

use necsim_impls_std::event_log::recorder::EventLogRecorder;

Expand Down Expand Up @@ -97,14 +97,6 @@ impl<'p, R: Reporter> LocalPartition<'p, R> for RecordedMonolithicLocalPartition
}
}

fn reduce_global_time_steps(
&mut self,
local_time: NonNegativeF64,
local_steps: u64,
) -> (NonNegativeF64, u64) {
(local_time, local_steps)
}

fn report_progress_sync(&mut self, remaining: u64) {
self.reporter.report_progress(&remaining.into());
}
Expand Down
26 changes: 1 addition & 25 deletions necsim/partitioning/mpi/src/partition/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use mpi::{
};

use necsim_core::lineage::MigratingLineage;
use necsim_core_bond::{NonNegativeF64, PositiveF64};
use necsim_core_bond::PositiveF64;

use necsim_partitioning_core::{
iterator::ImmigrantPopIterator,
Expand Down Expand Up @@ -292,28 +292,4 @@ impl<'p> MpiCommonPartition<'p> {
ControlFlow::Break(())
}
}

#[must_use]
pub fn reduce_global_time_steps(
&mut self,
local_time: NonNegativeF64,
local_steps: u64,
) -> (NonNegativeF64, u64) {
let mut global_time_max = 0.0_f64;
let mut global_steps_sum = 0_u64;

self.world.all_reduce_into(
&local_time.get(),
&mut global_time_max,
SystemOperation::max(),
);
self.world
.all_reduce_into(&local_steps, &mut global_steps_sum, SystemOperation::sum());

// Safety: `global_time_max` is the max of multiple `NonNegativeF64`s
// communicated through MPI
let global_time_max = unsafe { NonNegativeF64::new_unchecked(global_time_max) };

(global_time_max, global_steps_sum)
}
}
15 changes: 1 addition & 14 deletions necsim/partitioning/mpi/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use necsim_core::{
lineage::MigratingLineage,
reporter::{boolean::False, Reporter},
};
use necsim_core_bond::{NonNegativeF64, PositiveF64};
use necsim_core_bond::PositiveF64;

use necsim_partitioning_core::{
iterator::ImmigrantPopIterator, partition::Partition, LocalPartition, MigrationMode,
Expand Down Expand Up @@ -95,19 +95,6 @@ impl<'p, R: Reporter> LocalPartition<'p, R> for MpiLocalPartition<'p, R> {
}
}

fn reduce_global_time_steps(
&mut self,
local_time: NonNegativeF64,
local_steps: u64,
) -> (NonNegativeF64, u64) {
match self {
Self::Root(partition) => partition.reduce_global_time_steps(local_time, local_steps),
Self::Parallel(partition) => {
partition.reduce_global_time_steps(local_time, local_steps)
},
}
}

fn report_progress_sync(&mut self, remaining: u64) {
match self {
Self::Root(partition) => partition.report_progress_sync(remaining),
Expand Down
11 changes: 1 addition & 10 deletions necsim/partitioning/mpi/src/partition/parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use necsim_core::{
Reporter,
},
};
use necsim_core_bond::{NonNegativeF64, PositiveF64};
use necsim_core_bond::PositiveF64;

use necsim_impls_std::event_log::recorder::EventLogRecorder;
use necsim_partitioning_core::{
Expand Down Expand Up @@ -127,15 +127,6 @@ impl<'p, R: Reporter> LocalPartition<'p, R> for MpiParallelPartition<'p, R> {
self.common.wait_for_termination()
}

fn reduce_global_time_steps(
&mut self,
local_time: NonNegativeF64,
local_steps: u64,
) -> (NonNegativeF64, u64) {
self.common
.reduce_global_time_steps(local_time, local_steps)
}

fn report_progress_sync(&mut self, remaining: u64) {
let root_process = self
.common
Expand Down
10 changes: 1 addition & 9 deletions necsim/partitioning/mpi/src/partition/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use necsim_core::{
FilteredReporter, Reporter,
},
};
use necsim_core_bond::{NonNegativeF64, PositiveF64};
use necsim_core_bond::PositiveF64;

use necsim_impls_std::event_log::recorder::EventLogRecorder;
use necsim_partitioning_core::{
Expand Down Expand Up @@ -152,14 +152,6 @@ impl<'p, R: Reporter> LocalPartition<'p, R> for MpiRootPartition<'p, R> {
result
}

fn reduce_global_time_steps(
&mut self,
local_time: NonNegativeF64,
local_steps: u64,
) -> (NonNegativeF64, u64) {
self.reduce_global_time_steps(local_time, local_steps)
}

fn report_progress_sync(&mut self, remaining: u64) {
let root_process = self
.common
Expand Down
7 changes: 1 addition & 6 deletions necsim/partitioning/threads/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{

use anyhow::Context;
use humantime_serde::re::humantime::format_duration;
use necsim_core_bond::{NonNegativeF64, PositiveF64};
use necsim_core_bond::PositiveF64;
use serde::{ser::SerializeStruct, Deserialize, Deserializer, Serialize, Serializer};
use thiserror::Error;

Expand Down Expand Up @@ -129,7 +129,6 @@ impl Partitioning for ThreadsPartitioning {
self.size
}

#[allow(clippy::too_many_lines)]
/// # Errors
///
/// Returns `MissingEventLog` if the local partition is non-monolithic and
Expand Down Expand Up @@ -165,8 +164,6 @@ impl Partitioning for ThreadsPartitioning {

let vote_any = Vote::new(self.size.get() as usize);
let vote_min_time = Vote::new_with_dummy(self.size.get() as usize, (PositiveF64::one(), 0));
let vote_time_steps =
Vote::new_with_dummy(self.size.get() as usize, (NonNegativeF64::zero(), 0));
let vote_termination =
AsyncVote::new_with_dummy(self.size.get() as usize, ControlFlow::Continue(()));

Expand Down Expand Up @@ -200,7 +197,6 @@ impl Partitioning for ThreadsPartitioning {
let result = std::thread::scope(|scope| {
let vote_any = &vote_any;
let vote_min_time = &vote_min_time;
let vote_time_steps = &vote_time_steps;
let vote_termination = &vote_termination;
let emigration_channels = emigration_channels.as_slice();
let sync_barrier = &sync_barrier;
Expand All @@ -219,7 +215,6 @@ impl Partitioning for ThreadsPartitioning {
partition,
vote_any,
vote_min_time,
vote_time_steps,
vote_termination,
emigration_channels,
immigration_channel,
Expand Down
Loading

0 comments on commit 72121b0

Please sign in to comment.