From fb393daddf42bbd67e04c36ce1c24efec0f5ccae Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Tue, 3 May 2022 15:37:42 -0700 Subject: [PATCH 1/6] #1129: tests: add TDD tests for subphase management --- tests/unit/phase/test_subphase_management.cc | 735 +++++++++++++++++++ 1 file changed, 735 insertions(+) create mode 100644 tests/unit/phase/test_subphase_management.cc diff --git a/tests/unit/phase/test_subphase_management.cc b/tests/unit/phase/test_subphase_management.cc new file mode 100644 index 0000000000..bffa0b08fb --- /dev/null +++ b/tests/unit/phase/test_subphase_management.cc @@ -0,0 +1,735 @@ +/* +//@HEADER +// ***************************************************************************** +// +// test_subphase_management.cc +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#include + +#include "test_parallel_harness.h" +#include "test_helpers.h" + +#include +#include +#include + +#if vt_check_enabled(lblite) + +namespace vt { namespace tests { namespace unit { namespace subphase { + +static constexpr int const num_elms = 32; +static constexpr int const num_phases = 3; + +struct MyCol : vt::Collection { + MyCol() = default; + + explicit MyCol(checkpoint::SERIALIZE_CONSTRUCT_TAG) {} +}; + +struct MyMsg : vt::CollectionMessage { + explicit MyMsg(vt::PhaseType expected_subphase) { + expected_subphase_ = expected_subphase; + } + + vt::PhaseType expected_subphase_ = vt::no_lb_phase; +}; + +void colHandler(MyMsg* msg, MyCol* col) { + EXPECT_EQ(vt::thePhase()->getCurrentSubphase(), msg->expected_subphase_); +} + +struct MyObjgrp { + MyObjgrp() = default; + MyObjgrp(const MyObjgrp& obj) = delete; + MyObjgrp& operator=(const MyObjgrp& obj) = delete; + MyObjgrp(MyObjgrp&&) noexcept = default; + MyObjgrp& operator=(MyObjgrp&& obj) noexcept = default; + ~MyObjgrp() = default; + + void handler(MyMsg* msg) { } +}; + +using TestSubphaseManagement = TestParallelHarness; + +TEST_F(TestSubphaseManagement, test_no_subphases) { + auto range = vt::Index1D(num_elms); + + auto this_node = theContext()->getNode(); + + auto o_proxy = vt::theObjGroup()->makeCollective(); + + auto c_proxy = vt::makeCollection() + .bounds(range) + .bulkInsert() + .wait(); + + PhaseType n_subphases = 0; + for (int phase = 0; phase < num_phases; phase++) { + PhaseType expected_subphase = 0; + + runInEpochCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + } + }); + + runInEpochCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + } + }); + + runInEpochCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + } + }); + + if (phase == 0) { + n_subphases = expected_subphase + 1; + } else { + EXPECT_EQ(n_subphases, expected_subphase + 1); + } + + // Go to the next phase. + vt::thePhase()->nextPhaseCollective(); + + auto lbdh = theNodeLBData()->getLBData(); + ASSERT_TRUE(lbdh->node_data_.find(phase) != lbdh->node_data_.end()); + auto &phase_data = lbdh->node_data_.at(phase); + for (auto &obj_data : phase_data) { + EXPECT_EQ(obj_data.second.subphase_loads.size(), n_subphases); + } + } +} + +TEST_F(TestSubphaseManagement, test_subphase_collective) { + auto range = vt::Index1D(num_elms); + + auto this_node = theContext()->getNode(); + + auto o_proxy = vt::theObjGroup()->makeCollective(); + + auto c_proxy = vt::makeCollection() + .bounds(range) + .bulkInsert() + .wait(); + + PhaseType n_subphases = 0; + for (int phase = 0; phase < num_phases; phase++) { + PhaseType expected_subphase = 0; + + runSubphaseCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + } + }); + ++expected_subphase; + + runSubphaseCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + } + }); + ++expected_subphase; + + runSubphaseCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + } + }); + ++expected_subphase; + + if (phase == 0) { + n_subphases = expected_subphase + 1; + } else { + EXPECT_EQ(n_subphases, expected_subphase + 1); + } + + // Go to the next phase. + vt::thePhase()->nextPhaseCollective(); + + auto lbdh = theNodeLBData()->getLBData(); + ASSERT_TRUE(lbdh->node_data_.find(phase) != lbdh->node_data_.end()); + auto &phase_data = lbdh->node_data_.at(phase); + for (auto &obj_data : phase_data) { + EXPECT_EQ(obj_data.second.subphase_loads.size(), n_subphases); + } + } +} + +TEST_F(TestSubphaseManagement, test_subphase_collective_with_non_1) { + auto range = vt::Index1D(num_elms); + + auto this_node = theContext()->getNode(); + + auto o_proxy = vt::theObjGroup()->makeCollective(); + + auto c_proxy = vt::makeCollection() + .bounds(range) + .bulkInsert() + .wait(); + + PhaseType n_subphases = 0; + for (int phase = 0; phase < num_phases; phase++) { + PhaseType expected_subphase = 0; + + runSubphaseCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + } + }); + ++expected_subphase; + + runInEpochCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + } + }); + + runSubphaseCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + } + }); + ++expected_subphase; + + if (phase == 0) { + n_subphases = expected_subphase + 1; + } else { + EXPECT_EQ(n_subphases, expected_subphase + 1); + } + + // Go to the next phase. + vt::thePhase()->nextPhaseCollective(); + + auto lbdh = theNodeLBData()->getLBData(); + ASSERT_TRUE(lbdh->node_data_.find(phase) != lbdh->node_data_.end()); + auto &phase_data = lbdh->node_data_.at(phase); + for (auto &obj_data : phase_data) { + EXPECT_EQ(obj_data.second.subphase_loads.size(), n_subphases); + } + } +} + +TEST_F(TestSubphaseManagement, test_subphase_collective_with_non_2) { + auto range = vt::Index1D(num_elms); + + auto this_node = theContext()->getNode(); + + auto o_proxy = vt::theObjGroup()->makeCollective(); + + auto c_proxy = vt::makeCollection() + .bounds(range) + .bulkInsert() + .wait(); + + PhaseType n_subphases = 0; + for (int phase = 0; phase < num_phases; phase++) { + PhaseType expected_subphase = 0; + + runSubphaseCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + } + }); + ++expected_subphase; + + runInEpochCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + c_proxy.broadcast(expected_subphase); + } + }); + + runSubphaseCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + } + }); + ++expected_subphase; + + if (phase == 0) { + n_subphases = expected_subphase + 1; + } else { + EXPECT_EQ(n_subphases, expected_subphase + 1); + } + + // Go to the next phase. + vt::thePhase()->nextPhaseCollective(); + + auto lbdh = theNodeLBData()->getLBData(); + ASSERT_TRUE(lbdh->node_data_.find(phase) != lbdh->node_data_.end()); + auto &phase_data = lbdh->node_data_.at(phase); + for (auto &obj_data : phase_data) { + EXPECT_EQ(obj_data.second.subphase_loads.size(), n_subphases); + } + } +} + +TEST_F(TestSubphaseManagement, test_subphase_collective_nested_with_non) { + auto range = vt::Index1D(num_elms); + + auto this_node = theContext()->getNode(); + + auto o_proxy = vt::theObjGroup()->makeCollective(); + + auto c_proxy = vt::makeCollection() + .bounds(range) + .bulkInsert() + .wait(); + + PhaseType n_subphases = 0; + for (int phase = 0; phase < num_phases; phase++) { + PhaseType expected_subphase = 0; + + runSubphaseCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + } + }); + ++expected_subphase; + + runInEpochCollective([&]{ + runSubphaseCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + } + }); + ++expected_subphase; + + runSubphaseCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + } + }); + ++expected_subphase; + }); + + runSubphaseCollective([&]{ + if (this_node == 0) { + c_proxy.broadcast(expected_subphase); + o_proxy.broadcast(expected_subphase); + } + }); + ++expected_subphase; + + if (phase == 0) { + n_subphases = expected_subphase + 1; + } else { + EXPECT_EQ(n_subphases, expected_subphase + 1); + } + + // Go to the next phase. + vt::thePhase()->nextPhaseCollective(); + + auto lbdh = theNodeLBData()->getLBData(); + ASSERT_TRUE(lbdh->node_data_.find(phase) != lbdh->node_data_.end()); + auto &phase_data = lbdh->node_data_.at(phase); + for (auto &obj_data : phase_data) { + EXPECT_EQ(obj_data.second.subphase_loads.size(), n_subphases); + } + } +} + +TEST_F(TestSubphaseManagement, test_chainset_no_subphases) { + auto range = vt::Index1D(num_elms); + + auto o_proxy = vt::theObjGroup()->makeCollective(); + + auto c_proxy = vt::makeCollection() + .bounds(range) + .bulkInsert() + .wait(); + + std::unique_ptr> chains + = std::make_unique>(c_proxy); + + PhaseType n_subphases = 0; + for (int phase = 0; phase < num_phases; phase++) { + PhaseType expected_subphase = 0; + + runInEpochCollective([&]{ + chains->nextStepCollective("first", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + + chains->nextStepCollective("second", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + + chains->nextStepCollective("third", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + + chains->phaseDone(); + }); + + runInEpochCollective([&]{ + o_proxy.broadcast(expected_subphase); + }); + + if (phase == 0) { + n_subphases = expected_subphase + 1; + } else { + EXPECT_EQ(n_subphases, expected_subphase + 1); + } + + // Go to the next phase. + vt::thePhase()->nextPhaseCollective(); + + auto lbdh = theNodeLBData()->getLBData(); + ASSERT_TRUE(lbdh->node_data_.find(phase) != lbdh->node_data_.end()); + auto &phase_data = lbdh->node_data_.at(phase); + for (auto &obj_data : phase_data) { + EXPECT_EQ(obj_data.second.subphase_loads.size(), n_subphases); + } + } +} + +TEST_F(TestSubphaseManagement, test_chainset_subphases_1) { + auto range = vt::Index1D(num_elms); + + auto c_proxy = vt::makeCollection() + .bounds(range) + .bulkInsert() + .wait(); + + std::unique_ptr> chains + = std::make_unique>(c_proxy); + + PhaseType n_subphases = 0; + for (int phase = 0; phase < num_phases; phase++) { + PhaseType expected_subphase = 0; + + runInEpochCollective([&]{ + chains->nextStepCollectiveSubphase("first", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->nextStepCollectiveSubphase("second", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->nextStepCollectiveSubphase("third", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->phaseDone(); + }); + + if (phase == 0) { + n_subphases = expected_subphase + 1; + } else { + EXPECT_EQ(n_subphases, expected_subphase + 1); + } + + // Go to the next phase. + vt::thePhase()->nextPhaseCollective(); + + auto lbdh = theNodeLBData()->getLBData(); + ASSERT_TRUE(lbdh->node_data_.find(phase) != lbdh->node_data_.end()); + auto &phase_data = lbdh->node_data_.at(phase); + for (auto &obj_data : phase_data) { + EXPECT_EQ(obj_data.second.subphase_loads.size(), n_subphases); + } + } +} + +TEST_F(TestSubphaseManagement, test_chainset_subphases_2) { + auto range = vt::Index1D(num_elms); + + auto o_proxy = vt::theObjGroup()->makeCollective(); + + auto c_proxy = vt::makeCollection() + .bounds(range) + .bulkInsert() + .wait(); + + std::unique_ptr> chains + = std::make_unique>(c_proxy); + + PhaseType n_subphases = 0; + for (int phase = 0; phase < num_phases; phase++) { + PhaseType expected_subphase = 0; + + runInEpochCollective([&]{ + o_proxy.broadcast(expected_subphase); + }); + + runInEpochCollective([&]{ + chains->nextStepCollectiveSubphase("first", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->nextStepCollectiveSubphase("second", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->nextStepCollectiveSubphase("third", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->phaseDone(); + }); + + if (phase == 0) { + n_subphases = expected_subphase + 1; + } else { + EXPECT_EQ(n_subphases, expected_subphase + 1); + } + + // Go to the next phase. + vt::thePhase()->nextPhaseCollective(); + + auto lbdh = theNodeLBData()->getLBData(); + ASSERT_TRUE(lbdh->node_data_.find(phase) != lbdh->node_data_.end()); + auto &phase_data = lbdh->node_data_.at(phase); + for (auto &obj_data : phase_data) { + EXPECT_EQ(obj_data.second.subphase_loads.size(), n_subphases); + } + } +} + +TEST_F(TestSubphaseManagement, test_chainset_subphases_3) { + auto range = vt::Index1D(num_elms); + + auto o_proxy = vt::theObjGroup()->makeCollective(); + + auto c_proxy = vt::makeCollection() + .bounds(range) + .bulkInsert() + .wait(); + + std::unique_ptr> chains + = std::make_unique>(c_proxy); + + PhaseType n_subphases = 0; + for (int phase = 0; phase < num_phases; phase++) { + PhaseType expected_subphase = 0; + + runInEpochCollective([&]{ + chains->nextStepCollectiveSubphase("first", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->nextStepCollectiveSubphase("second", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->nextStepCollectiveSubphase("third", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->phaseDone(); + }); + + runInEpochCollective([&]{ + o_proxy.broadcast(expected_subphase); + }); + + if (phase == 0) { + n_subphases = expected_subphase + 1; + } else { + EXPECT_EQ(n_subphases, expected_subphase + 1); + } + + // Go to the next phase. + vt::thePhase()->nextPhaseCollective(); + + auto lbdh = theNodeLBData()->getLBData(); + ASSERT_TRUE(lbdh->node_data_.find(phase) != lbdh->node_data_.end()); + auto &phase_data = lbdh->node_data_.at(phase); + for (auto &obj_data : phase_data) { + EXPECT_EQ(obj_data.second.subphase_loads.size(), n_subphases); + } + } +} + +TEST_F(TestSubphaseManagement, test_chainset_subphases_with_rooted) { + auto range = vt::Index1D(num_elms); + + auto c_proxy = vt::makeCollection() + .bounds(range) + .bulkInsert() + .wait(); + + std::unique_ptr> chains + = std::make_unique>(c_proxy); + + PhaseType n_subphases = 0; + for (int phase = 0; phase < num_phases; phase++) { + PhaseType expected_subphase = 0; + + runInEpochCollective([&]{ + chains->nextStepCollectiveSubphase("first", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->nextStepCollective("second", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + + chains->nextStepCollectiveSubphase("third", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->phaseDone(); + }); + + if (phase == 0) { + n_subphases = expected_subphase + 1; + } else { + EXPECT_EQ(n_subphases, expected_subphase + 1); + } + + // Go to the next phase. + vt::thePhase()->nextPhaseCollective(); + + auto lbdh = theNodeLBData()->getLBData(); + ASSERT_TRUE(lbdh->node_data_.find(phase) != lbdh->node_data_.end()); + auto &phase_data = lbdh->node_data_.at(phase); + for (auto &obj_data : phase_data) { + EXPECT_EQ(obj_data.second.subphase_loads.size(), n_subphases); + } + } +} + +TEST_F(TestSubphaseManagement, test_collective_and_chainset_subphases) { + auto range = vt::Index1D(num_elms); + + auto o_proxy = vt::theObjGroup()->makeCollective(); + + auto c_proxy = vt::makeCollection() + .bounds(range) + .bulkInsert() + .wait(); + + std::unique_ptr> chains + = std::make_unique>(c_proxy); + + PhaseType n_subphases = 0; + for (int phase = 0; phase < num_phases; phase++) { + PhaseType expected_subphase = 0; + + runInEpochCollective([&]{ + chains->nextStepCollectiveSubphase("first", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->nextStepCollectiveSubphase("second", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->phaseDone(); + }); + + runSubphaseCollective([&]{ + o_proxy.broadcast(expected_subphase); + }); + ++expected_subphase; + + runInEpochCollective([&]{ + chains->nextStepCollectiveSubphase("third", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->nextStepCollectiveSubphase("fourth", [=](vt::Index1D idx) { + return c_proxy(idx).template send(expected_subphase); + }); + ++expected_subphase; + + chains->phaseDone(); + }); + + if (phase == 0) { + n_subphases = expected_subphase + 1; + } else { + EXPECT_EQ(n_subphases, expected_subphase + 1); + } + + // Go to the next phase. + vt::thePhase()->nextPhaseCollective(); + + auto lbdh = theNodeLBData()->getLBData(); + ASSERT_TRUE(lbdh->node_data_.find(phase) != lbdh->node_data_.end()); + auto &phase_data = lbdh->node_data_.at(phase); + for (auto &obj_data : phase_data) { + EXPECT_EQ(obj_data.second.subphase_loads.size(), n_subphases); + } + } +} + +}}}} // end namespace vt::tests::unit::subphase + +#endif /*vt_check_enabled(lblite)*/ From f5c9d8ff036a579537d4047b7164a4b6a58a059c Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 13 May 2022 14:48:17 -0700 Subject: [PATCH 2/6] #1129: examples: update lb_iter subphase usage --- examples/collection/lb_iter.cc | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/examples/collection/lb_iter.cc b/examples/collection/lb_iter.cc index 980f94bf49..d306918529 100644 --- a/examples/collection/lb_iter.cc +++ b/examples/collection/lb_iter.cc @@ -52,14 +52,13 @@ struct IterCol : vt::Collection { struct IterMsg : vt::CollectionMessage { IterMsg() = default; IterMsg( - int64_t const in_work_amt, int64_t const in_iter, int64_t const subphase + int64_t const in_work_amt, int64_t const in_iter ) - : iter_(in_iter), work_amt_(in_work_amt), subphase_(subphase) + : iter_(in_iter), work_amt_(in_work_amt) { } int64_t iter_ = 0; int64_t work_amt_ = 0; - int64_t subphase_ = 0; }; void iterWork(IterMsg* msg); @@ -77,7 +76,6 @@ struct IterCol : vt::Collection { static double weight = 1.0f; void IterCol::iterWork(IterMsg* msg) { - this->lb_data_.setSubPhase(msg->subphase_); double val = 0.1f; double val2 = 0.4f * msg->work_amt_; auto const idx = getIndex().x(); @@ -130,14 +128,14 @@ int main(int argc, char** argv) { for (int i = 0; i < num_iter; i++) { auto cur_time = vt::timing::getCurrentTime(); - vt::runInEpochCollective([=]{ - proxy.broadcastCollective(10, i, 0); + vt::runSubphaseCollective([=]{ + proxy.broadcastCollective(10, i); }); - vt::runInEpochCollective([=]{ - proxy.broadcastCollective(5, i, 1); + vt::runSubphaseCollective([=]{ + proxy.broadcastCollective(5, i); }); - vt::runInEpochCollective([=]{ - proxy.broadcastCollective(15, i, 2); + vt::runSubphaseCollective([=]{ + proxy.broadcastCollective(15, i); }); auto total_time = vt::timing::getCurrentTime() - cur_time; From be708b65dc487932b363ceedcf596b56b40f2b6d Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 13 May 2022 14:50:01 -0700 Subject: [PATCH 3/6] #1129: phase and elm: centralize subphases --- src/vt/elm/elm_lb_data.cc | 29 ++++++++++++++--------------- src/vt/elm/elm_lb_data.h | 4 ---- src/vt/phase/phase_manager.cc | 1 + src/vt/phase/phase_manager.h | 14 ++++++++++++++ 4 files changed, 29 insertions(+), 19 deletions(-) diff --git a/src/vt/elm/elm_lb_data.cc b/src/vt/elm/elm_lb_data.cc index 3496f48b19..1d1e11385d 100644 --- a/src/vt/elm/elm_lb_data.cc +++ b/src/vt/elm/elm_lb_data.cc @@ -45,6 +45,7 @@ #define INCLUDED_VT_ELM_ELM_LB_DATA_CC #include "vt/elm/elm_lb_data.h" +#include "vt/phase/phase_manager.h" #include "vt/config.h" @@ -87,17 +88,19 @@ void ElementLBData::sendToEntity( } void ElementLBData::sendComm(elm::CommKey key, double bytes) { + auto cur_subphase = thePhase()->getCurrentSubphase(); phase_comm_[cur_phase_][key].sendMsg(bytes); - subphase_comm_[cur_phase_].resize(cur_subphase_ + 1); - subphase_comm_[cur_phase_].at(cur_subphase_)[key].sendMsg(bytes); + subphase_comm_[cur_phase_].resize(cur_subphase + 1); + subphase_comm_[cur_phase_].at(cur_subphase)[key].sendMsg(bytes); } void ElementLBData::recvComm( elm::CommKey key, double bytes ) { + auto cur_subphase = thePhase()->getCurrentSubphase(); phase_comm_[cur_phase_][key].receiveMsg(bytes); - subphase_comm_[cur_phase_].resize(cur_subphase_ + 1); - subphase_comm_[cur_phase_].at(cur_subphase_)[key].receiveMsg(bytes); + subphase_comm_[cur_phase_].resize(cur_subphase + 1); + subphase_comm_[cur_phase_].at(cur_subphase)[key].receiveMsg(bytes); } void ElementLBData::recvObjData( @@ -127,8 +130,9 @@ void ElementLBData::recvToNode( void ElementLBData::addTime(TimeTypeWrapper const& time) { phase_timings_[cur_phase_] += time.seconds(); - subphase_timings_[cur_phase_].resize(cur_subphase_ + 1); - subphase_timings_[cur_phase_].at(cur_subphase_) += time.seconds(); + auto cur_subphase = thePhase()->getCurrentSubphase(); + subphase_timings_[cur_phase_].resize(cur_subphase + 1); + subphase_timings_[cur_phase_].at(cur_subphase) += time.seconds(); vt_debug_print( verbose,lb, @@ -199,6 +203,8 @@ TimeType ElementLBData::getLoad(PhaseType phase, SubphaseType subphase) const { } std::vector const& ElementLBData::getSubphaseTimes(PhaseType phase) { + auto cur_subphase = thePhase()->getCurrentSubphase(); + subphase_timings_[phase].resize(cur_subphase + 1); return subphase_timings_[phase]; } @@ -216,6 +222,8 @@ ElementLBData::getComm(PhaseType const& phase) { } std::vector const& ElementLBData::getSubphaseComm(PhaseType phase) { + auto cur_subphase = thePhase()->getCurrentSubphase(); + subphase_comm_[phase].resize(cur_subphase + 1); auto const& subphase_comm = subphase_comm_[phase]; vt_debug_print( @@ -227,15 +235,6 @@ std::vector const& ElementLBData::getSubphaseComm(PhaseType phase) return subphase_comm; } -void ElementLBData::setSubPhase(SubphaseType subphase) { - vtAssert(subphase < no_subphase, "subphase must be less than sentinel"); - cur_subphase_ = subphase; -} - -SubphaseType ElementLBData::getSubPhase() const { - return cur_subphase_; -} - void ElementLBData::releaseLBDataFromUnneededPhases(PhaseType phase, unsigned int look_back) { if (phase >= look_back) { phase_timings_.erase(phase - look_back); diff --git a/src/vt/elm/elm_lb_data.h b/src/vt/elm/elm_lb_data.h index 554856f8d1..79701adf7d 100644 --- a/src/vt/elm/elm_lb_data.h +++ b/src/vt/elm/elm_lb_data.h @@ -90,8 +90,6 @@ struct ElementLBData { CommMapType const& getComm(PhaseType const& phase); std::vector const& getSubphaseComm(PhaseType phase); std::vector const& getSubphaseTimes(PhaseType phase); - void setSubPhase(SubphaseType subphase); - SubphaseType getSubPhase() const; // these are just for unit testing std::size_t getLoadPhaseCount() const; @@ -106,7 +104,6 @@ struct ElementLBData { s | cur_phase_; s | phase_timings_; s | phase_comm_; - s | cur_subphase_; s | subphase_timings_; s | subphase_comm_; } @@ -129,7 +126,6 @@ struct ElementLBData { std::unordered_map phase_timings_ = {}; std::unordered_map phase_comm_ = {}; - SubphaseType cur_subphase_ = 0; std::unordered_map> subphase_timings_ = {}; std::unordered_map> subphase_comm_ = {}; }; diff --git a/src/vt/phase/phase_manager.cc b/src/vt/phase/phase_manager.cc index 8c875dab18..59a6bc8408 100644 --- a/src/vt/phase/phase_manager.cc +++ b/src/vt/phase/phase_manager.cc @@ -160,6 +160,7 @@ void PhaseManager::nextPhaseCollective() { runHooks(PhaseHook::EndPostMigration); cur_phase_++; + cur_subphase_ = 0; vt_debug_print( normal, phase, diff --git a/src/vt/phase/phase_manager.h b/src/vt/phase/phase_manager.h index 1a8b2a019e..bfa3cbd6f8 100644 --- a/src/vt/phase/phase_manager.h +++ b/src/vt/phase/phase_manager.h @@ -105,6 +105,18 @@ struct PhaseManager : runtime::component::Component { */ PhaseType getCurrentPhase() const { return cur_phase_; } + /** + * \brief Get the current subphase + * + * \return the current subphase + */ + SubphaseType getCurrentSubphase() const { return cur_subphase_; } + + /** + * \brief Advance subphase + */ + void advanceSubphase() { ++cur_subphase_; } + /** * \brief Collectively register a phase hook that triggers depending on the * type of hook @@ -200,6 +212,7 @@ struct PhaseManager : runtime::component::Component { template void serialize(SerializerT& s) { s | cur_phase_ + | cur_subphase_ | proxy_ | collective_hooks_ | rooted_hooks_ @@ -213,6 +226,7 @@ struct PhaseManager : runtime::component::Component { private: PhaseType cur_phase_ = 0; /**< Current phase */ + SubphaseType cur_subphase_ = 0; /**< Current subphase */ ObjGroupProxyType proxy_ = no_obj_group; /**< Objgroup proxy */ HookIDMapType collective_hooks_; /**< Collective regisstered hooks */ HookIDMapType rooted_hooks_; /**< Rooted regisstered hooks */ From b780c297b752a482a4e0c9f71658f0547f5eff18 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 13 May 2022 14:50:38 -0700 Subject: [PATCH 4/6] #1129: chainset: add subphases to collection chain set --- src/vt/messaging/collection_chain_set.h | 49 +++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/src/vt/messaging/collection_chain_set.h b/src/vt/messaging/collection_chain_set.h index 93829e1bbd..e898ba0e49 100644 --- a/src/vt/messaging/collection_chain_set.h +++ b/src/vt/messaging/collection_chain_set.h @@ -256,6 +256,55 @@ class CollectionChainSet final { return nextStepCollective("", step_action); } + /** + * \brief The next collective step to execute for each index that is added + * to the CollectionChainSet on each node. + * + * Should be used for steps with internal recursive communication and global + * inter-dependence. Creates a global (on the communicator), collective epoch + * to track all the casually related messages and collectively wait for + * termination of all of the recursive sends. Advances the subphase at + * termination. + * + * \param[in] label Label for the epoch created for debugging + * \param[in] step_action the next step to execute, returning a \c PendingSend + */ + void nextStepCollectiveSubphase( + std::string const& label, std::function step_action) { + auto epoch = theTerm()->makeEpochCollective(label); + + theTerm()->addActionEpoch(epoch, [=]{ + thePhase()->advanceSubphase(); + }); + + vt::theMsg()->pushEpoch(epoch); + + for (auto& entry : chains_) { + auto& idx = entry.first; + auto& chain = entry.second; + chain.add(epoch, step_action(idx)); + } + + vt::theMsg()->popEpoch(epoch); + theTerm()->finishedEpoch(epoch); + } + + /** + * \brief The next collective step to execute for each index that is added + * to the CollectionChainSet on each node. + * + * Should be used for steps with internal recursive communication and global + * inter-dependence. Creates a global (on the communicator), collective epoch + * to track all the casually related messages and collectively wait for + * termination of all of the recursive sends. Advances the subphase at + * termination. + * + * \param[in] step_action the next step to execute, returning a \c PendingSend + */ + void nextStepCollectiveSubphase(std::function step_action) { + return nextStepCollectiveSubphase("", step_action); + } + /** * \brief The next collective step of both CollectionChainSets * to execute over all shared indices of the CollectionChainSets over all From ff1bfc2c5c1786097260357426b84ce6f2d23631 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 13 May 2022 14:51:07 -0700 Subject: [PATCH 5/6] #1129: scheduler: add subphase collectives --- src/vt/scheduler/scheduler.h | 6 ++++++ src/vt/scheduler/scheduler.impl.h | 15 +++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/src/vt/scheduler/scheduler.h b/src/vt/scheduler/scheduler.h index 373829ce1a..3b824c7e20 100644 --- a/src/vt/scheduler/scheduler.h +++ b/src/vt/scheduler/scheduler.h @@ -76,6 +76,12 @@ void runInEpochCollective(Callable&& fn); template void runInEpochCollective(std::string const& label, Callable&& fn); +template +void runSubphaseCollective(Callable&& fn); + +template +void runSubphaseCollective(std::string const& label, Callable&& fn); + namespace messaging { template diff --git a/src/vt/scheduler/scheduler.impl.h b/src/vt/scheduler/scheduler.impl.h index 1e0c9b7520..6486c61815 100644 --- a/src/vt/scheduler/scheduler.impl.h +++ b/src/vt/scheduler/scheduler.impl.h @@ -47,6 +47,7 @@ #include "vt/config.h" #include "vt/messaging/active.h" #include "vt/termination/termination.h" +#include "vt/phase/phase_manager.h" namespace vt { @@ -72,6 +73,20 @@ void runInEpochCollective(std::string const& label, Callable&& fn) { runInEpoch(ep, std::forward(fn)); } +template +void runSubphaseCollective(Callable&& fn) { + runSubphaseCollective("UNLABELED", std::forward(fn)); +} + +template +void runSubphaseCollective(std::string const& label, Callable&& fn) { + auto ep = theTerm()->makeEpochCollective(label); + theTerm()->addActionEpoch(ep, [=]{ + thePhase()->advanceSubphase(); + }); + runInEpoch(ep, std::forward(fn)); +} + template void runInEpochRooted(Callable&& fn) { runInEpochRooted("UNLABELED", std::forward(fn)); From 256a5fc79cf1af0f2d7d81cedbc85d42255f31eb Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 13 May 2022 15:12:31 -0700 Subject: [PATCH 6/6] #1129: elm: centralize phase management --- src/vt/elm/elm_lb_data.cc | 47 ++++++++----------- src/vt/elm/elm_lb_data.h | 6 +-- src/vt/messaging/active.cc | 4 +- src/vt/objgroup/manager.cc | 5 +- .../vrt/collection/balance/col_lb_data.impl.h | 12 +++-- src/vt/vrt/collection/balance/node_lb_data.cc | 5 +- src/vt/vrt/collection/balance/node_lb_data.h | 1 + src/vt/vrt/collection/manager.impl.h | 2 - tests/unit/phase/test_phase_insertions.cc | 2 +- 9 files changed, 39 insertions(+), 45 deletions(-) diff --git a/src/vt/elm/elm_lb_data.cc b/src/vt/elm/elm_lb_data.cc index 1d1e11385d..9aaeaa9c77 100644 --- a/src/vt/elm/elm_lb_data.cc +++ b/src/vt/elm/elm_lb_data.cc @@ -88,19 +88,21 @@ void ElementLBData::sendToEntity( } void ElementLBData::sendComm(elm::CommKey key, double bytes) { + auto cur_phase = thePhase()->getCurrentPhase(); auto cur_subphase = thePhase()->getCurrentSubphase(); - phase_comm_[cur_phase_][key].sendMsg(bytes); - subphase_comm_[cur_phase_].resize(cur_subphase + 1); - subphase_comm_[cur_phase_].at(cur_subphase)[key].sendMsg(bytes); + phase_comm_[cur_phase][key].sendMsg(bytes); + subphase_comm_[cur_phase].resize(cur_subphase + 1); + subphase_comm_[cur_phase].at(cur_subphase)[key].sendMsg(bytes); } void ElementLBData::recvComm( elm::CommKey key, double bytes ) { + auto cur_phase = thePhase()->getCurrentPhase(); auto cur_subphase = thePhase()->getCurrentSubphase(); - phase_comm_[cur_phase_][key].receiveMsg(bytes); - subphase_comm_[cur_phase_].resize(cur_subphase + 1); - subphase_comm_[cur_phase_].at(cur_subphase)[key].receiveMsg(bytes); + phase_comm_[cur_phase][key].receiveMsg(bytes); + subphase_comm_[cur_phase].resize(cur_subphase + 1); + subphase_comm_[cur_phase].at(cur_subphase)[key].receiveMsg(bytes); } void ElementLBData::recvObjData( @@ -128,43 +130,34 @@ void ElementLBData::recvToNode( } void ElementLBData::addTime(TimeTypeWrapper const& time) { - phase_timings_[cur_phase_] += time.seconds(); + auto cur_phase = thePhase()->getCurrentPhase(); + phase_timings_[cur_phase] += time.seconds(); auto cur_subphase = thePhase()->getCurrentSubphase(); - subphase_timings_[cur_phase_].resize(cur_subphase + 1); - subphase_timings_[cur_phase_].at(cur_subphase) += time.seconds(); + subphase_timings_[cur_phase].resize(cur_subphase + 1); + subphase_timings_[cur_phase].at(cur_subphase) += time.seconds(); vt_debug_print( verbose,lb, "ElementLBData: addTime: time={}, cur_load={}\n", time, - TimeTypeWrapper(phase_timings_[cur_phase_]) + TimeTypeWrapper(phase_timings_[cur_phase]) ); } -void ElementLBData::updatePhase(PhaseType const& inc) { +void ElementLBData::updatePhase(PhaseType const& cur_phase) { vt_debug_print( verbose, lb, - "ElementLBData: updatePhase: cur_phase_={}, inc={}\n", - cur_phase_, inc + "ElementLBData: updatePhase: new_phase={}\n", + cur_phase ); - cur_phase_ += inc; - // Access all table entries for current phase, to ensure presence even // if they're left empty - phase_timings_[cur_phase_]; - subphase_timings_[cur_phase_]; - phase_comm_[cur_phase_]; - subphase_comm_[cur_phase_]; -} - -void ElementLBData::resetPhase() { - cur_phase_ = fst_lb_phase; -} - -PhaseType ElementLBData::getPhase() const { - return cur_phase_; + phase_timings_[cur_phase]; + subphase_timings_[cur_phase]; + phase_comm_[cur_phase]; + subphase_comm_[cur_phase]; } TimeType ElementLBData::getLoad(PhaseType const& phase) const { diff --git a/src/vt/elm/elm_lb_data.h b/src/vt/elm/elm_lb_data.h index 79701adf7d..2648090f06 100644 --- a/src/vt/elm/elm_lb_data.h +++ b/src/vt/elm/elm_lb_data.h @@ -81,9 +81,7 @@ struct ElementLBData { NodeType to, ElementIDStruct from_perm, double bytes, bool bcast ); - void updatePhase(PhaseType const& inc = 1); - void resetPhase(); - PhaseType getPhase() const; + void updatePhase(PhaseType const& phase); TimeType getLoad(PhaseType const& phase) const; TimeType getLoad(PhaseType phase, SubphaseType subphase) const; @@ -101,7 +99,6 @@ struct ElementLBData { void serialize(Serializer& s) { s | cur_time_started_; s | cur_time_; - s | cur_phase_; s | phase_timings_; s | phase_comm_; s | subphase_timings_; @@ -122,7 +119,6 @@ struct ElementLBData { protected: bool cur_time_started_ = false; TimeType cur_time_ = 0.0; - PhaseType cur_phase_ = fst_lb_phase; std::unordered_map phase_timings_ = {}; std::unordered_map phase_comm_ = {}; diff --git a/src/vt/messaging/active.cc b/src/vt/messaging/active.cc index 46f908dc9b..8533cff0c5 100644 --- a/src/vt/messaging/active.cc +++ b/src/vt/messaging/active.cc @@ -160,8 +160,10 @@ void ActiveMessenger::startup() { #if vt_check_enabled(lblite) // Hook to collect LB data about objgroups thePhase()->registerHookCollective(phase::PhaseHook::End, [this]{ + auto const phase = thePhase()->getCurrentPhase(); theNodeLBData()->addNodeLBData( - bare_handler_dummy_elm_id_for_lb_data_, &bare_handler_lb_data_, nullptr + bare_handler_dummy_elm_id_for_lb_data_, &bare_handler_lb_data_, nullptr, + phase ); }); #endif diff --git a/src/vt/objgroup/manager.cc b/src/vt/objgroup/manager.cc index 3865333d7c..1068882eeb 100644 --- a/src/vt/objgroup/manager.cc +++ b/src/vt/objgroup/manager.cc @@ -60,6 +60,7 @@ void ObjGroupManager::startup() { #if vt_check_enabled(lblite) // Hook to collect LB data about objgroups thePhase()->registerHookCollective(phase::PhaseHook::End, []{ + auto const phase = thePhase()->getCurrentPhase(); auto& objs = theObjGroup()->objs_; for (auto&& obj : objs) { auto holder = obj.second.get(); @@ -68,7 +69,9 @@ void ObjGroupManager::startup() { auto proxy = elm::ElmIDBits::getObjGroupProxy(elm_id.id, false); vtAssertExpr(proxy == obj.first); theNodeLBData()->registerObjGroupInfo(elm_id, obj.first); - theNodeLBData()->addNodeLBData(elm_id, &holder->getLBData(), nullptr); + theNodeLBData()->addNodeLBData( + elm_id, &holder->getLBData(), nullptr, phase + ); } } }); diff --git a/src/vt/vrt/collection/balance/col_lb_data.impl.h b/src/vt/vrt/collection/balance/col_lb_data.impl.h index 271939ced9..13f4a26b48 100644 --- a/src/vt/vrt/collection/balance/col_lb_data.impl.h +++ b/src/vt/vrt/collection/balance/col_lb_data.impl.h @@ -61,20 +61,22 @@ namespace vt { namespace vrt { namespace collection { namespace balance { template /*static*/ void CollectionLBData::syncNextPhase(CollectStatsMsg* msg, ColT* col) { - auto& lb_data = col->lb_data_; + auto const phase = thePhase()->getCurrentPhase(); vt_debug_print( normal, lb, - "ElementLBData: syncNextPhase ({}) (idx={}): lb_data.getPhase()={}, " + "ElementLBData: syncNextPhase ({}) (idx={}): phase()={}, " "msg->getPhase()={}\n", - print_ptr(col), col->getIndex(), lb_data.getPhase(), msg->getPhase() + print_ptr(col), col->getIndex(), phase, msg->getPhase() ); - vtAssert(lb_data.getPhase() == msg->getPhase(), "Phases must match"); + vtAssert(phase == msg->getPhase(), "Phases must match"); auto const proxy = col->getProxy(); auto const subphase = getFocusedSubPhase(proxy); - theNodeLBData()->addNodeLBData(col->elm_id_, &col->lb_data_, col, subphase); + theNodeLBData()->addNodeLBData( + col->elm_id_, &col->lb_data_, col, phase, subphase + ); std::vector idx; for (index::NumDimensionsType i = 0; i < col->getIndex().ndims(); i++) { diff --git a/src/vt/vrt/collection/balance/node_lb_data.cc b/src/vt/vrt/collection/balance/node_lb_data.cc index d62ad90868..6e7bed54ca 100644 --- a/src/vt/vrt/collection/balance/node_lb_data.cc +++ b/src/vt/vrt/collection/balance/node_lb_data.cc @@ -247,14 +247,13 @@ void NodeLBData::registerObjGroupInfo( void NodeLBData::addNodeLBData( ElementIDStruct id, elm::ElementLBData* in, StorableType *storable, - SubphaseType focused_subphase + PhaseType phase, SubphaseType focused_subphase ) { vt_debug_print( normal, lb, "NodeLBData::addNodeLBData: id={}\n", id ); - auto const phase = in->getPhase(); auto const& total_load = in->getLoad(phase, focused_subphase); auto &phase_data = lb_data_->node_data_[phase]; @@ -289,7 +288,7 @@ void NodeLBData::addNodeLBData( ); } - in->updatePhase(1); + in->updatePhase(phase + 1); auto model = theLBManager()->getLoadModel(); in->releaseLBDataFromUnneededPhases(phase, model->getNumPastPhasesNeeded()); diff --git a/src/vt/vrt/collection/balance/node_lb_data.h b/src/vt/vrt/collection/balance/node_lb_data.h index fdd83ebdbb..6647f8d853 100644 --- a/src/vt/vrt/collection/balance/node_lb_data.h +++ b/src/vt/vrt/collection/balance/node_lb_data.h @@ -134,6 +134,7 @@ struct NodeLBData : runtime::component::Component { */ void addNodeLBData( ElementIDStruct id, elm::ElementLBData* in, StorableType *storable, + PhaseType phase, SubphaseType focused_subphase = elm::ElementLBData::no_subphase ); diff --git a/src/vt/vrt/collection/manager.impl.h b/src/vt/vrt/collection/manager.impl.h index 357e726297..69b44ffaa5 100644 --- a/src/vt/vrt/collection/manager.impl.h +++ b/src/vt/vrt/collection/manager.impl.h @@ -2263,7 +2263,6 @@ void CollectionManager::restoreFromFileInPlace( auto ptr = elm_holder->lookup(idx).getRawPtr(); checkpoint::deserializeInPlaceFromFile(file_name, static_cast(ptr)); - ptr->lb_data_.resetPhase(); } } @@ -2306,7 +2305,6 @@ CollectionManager::restoreFromFile( // @todo: error check the file read with bytes in directory auto col_ptr = checkpoint::deserializeFromFile(file_name); - col_ptr->lb_data_.resetPhase(); elms.emplace_back(std::make_tuple(idx, std::move(col_ptr))); } diff --git a/tests/unit/phase/test_phase_insertions.cc b/tests/unit/phase/test_phase_insertions.cc index fb83aab5ed..8e02feffb3 100644 --- a/tests/unit/phase/test_phase_insertions.cc +++ b/tests/unit/phase/test_phase_insertions.cc @@ -75,7 +75,7 @@ struct MyCol : vt::Collection { s | val; } - vt::PhaseType getPhase() { return this->getLBData().getPhase(); } + vt::PhaseType getPhase() { return vt::thePhase()->getCurrentPhase(); } double val = 0.0; };