Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1382: collect histogram of process loads #1873

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/vt/vrt/collection/balance/lb_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ NLOHMANN_JSON_SERIALIZE_ENUM(StatisticQuantity, {
{StatisticQuantity::imb, "imb"},
{StatisticQuantity::npr, "npr"},
{StatisticQuantity::sum, "sum"},
{StatisticQuantity::lbh, "lbh"},
})

nlohmann::json jsonifyPhaseStatistics(const StatisticMap &statistics) {
Expand Down
10 changes: 8 additions & 2 deletions src/vt/vrt/collection/balance/lb_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ LoadSummary getNodeLoads(std::shared_ptr<LoadModel> model, PhaseOffset when);
namespace lb {

enum struct StatisticQuantity : int8_t {
min, max, avg, std, var, skw, kur, car, imb, npr, sum
min, max, avg, std, var, skw, kur, car, imb, npr, sum, lbh
};

enum struct Statistic : int8_t {
Expand All @@ -188,11 +188,17 @@ enum struct Statistic : int8_t {
ObjectRatio,
// EdgeCardinality,
EdgeRatio,
LBHist,
// ExternalEdgesCardinality,
// InternalEdgesCardinality
};

using StatisticQuantityMap = std::map<StatisticQuantity, double>;
using UnionValueType = vt::adt::SafeUnion<
double,
adt::HistogramApprox<double, int64_t>
>;

using StatisticQuantityMap = std::map<StatisticQuantity, UnionValueType>;
using StatisticMap = std::unordered_map<Statistic, StatisticQuantityMap>;

nlohmann::json jsonifyPhaseStatistics(const StatisticMap &statistics);
Expand Down
2 changes: 2 additions & 0 deletions src/vt/vrt/collection/balance/lb_invoke/lb_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ void LBManager::statsHandler(StatsMsgType* msg) {
auto stdv = st.stdv();
auto skew = st.skew();
auto krte = st.krte();
auto lbhist = st.lbhist();

stats[stat][lb::StatisticQuantity::max] = max;
stats[stat][lb::StatisticQuantity::min] = min;
Expand All @@ -516,6 +517,7 @@ void LBManager::statsHandler(StatsMsgType* msg) {
stats[stat][lb::StatisticQuantity::std] = stdv;
stats[stat][lb::StatisticQuantity::skw] = skew;
stats[stat][lb::StatisticQuantity::kur] = krte;
stats[stat][lb::StatisticQuantity::lbh] = lbhist;

if (stat == rank_statistic) {
if (before_lb_stats_) {
Expand Down
17 changes: 13 additions & 4 deletions src/vt/vrt/collection/balance/stats_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ struct LoadData {
a1.sum_ += a2.sum_;
a1.P_ += a2.P_;
a1.stat_ = a2.stat_;
a1.lb_hist_.mergeIn(a2.lb_hist_);

return a1;
}
Expand Down Expand Up @@ -149,6 +150,7 @@ struct LoadData {
TimeType I() const { return avg() > 0.0 ? (max() / avg()) - 1.0f : 0.0; }
TimeType stdv() const { return std::sqrt(var()); }
int32_t npr() const { return P_; }
adt::HistogramApprox<double, int64_t> lbhist() const {return lb_hist_; }

static_assert(
std::is_same<TimeType, double>::value == true,
Expand All @@ -165,12 +167,19 @@ struct LoadData {
int32_t N_ = 0;
int32_t P_ = 0;
lb::Statistic stat_ = lb::Statistic::Rank_load_modeled;
adt::HistogramApprox<double, int64_t> lb_hist_;

template <typename SerializerT>
void serialize(SerializerT& s) {
s | max_ | sum_ | min_ | avg_ | M2_ | M3_ | M4_ |
N_ | P_ | stat_| lb_hist_;
}
};

static_assert(
vt::messaging::is_byte_copyable_t<LoadData>::value,
"Must be trivially copyable to avoid serialization."
);
// static_assert(
// vt::messaging::is_byte_copyable_t<LoadData>::value,
// "Must be trivially copyable to avoid serialization."
// );
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment can just be deleted now


struct NodeStatsMsg : SerializeRequired<
collective::ReduceTMsg<std::vector<LoadData>>,
Expand Down
170 changes: 170 additions & 0 deletions tests/unit/collection/test_lb_histogram.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
//@HEADER
// *****************************************************************************
//
// test_lb_histogram.cc
// DARMA/vt => Virtual Transport
//
// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC
// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
// Government retains certain rights in this software.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// * Neither the name of the copyright holder nor the names of its
// contributors may be used to endorse or promote products derived from this
// software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
//
// Questions? Contact [email protected]
//
// *****************************************************************************
//@HEADER
*/

#include <vt/vrt/collection/balance/col_lb_data.h>
#include <vt/vrt/collection/balance/model/persistence_median_last_n.h>
#include <vt/vrt/collection/balance/model/load_model.h>
#include <vt/vrt/collection/balance/lb_invoke/lb_manager.h>
#include <vt/vrt/collection/types/type_aliases.h>
#include <vt/vrt/collection/manager.h>
#include <vt/vrt/collection/balance/stats_msg.h>

#include <gtest/gtest.h>
#include "test_helpers.h"
#include "test_parallel_harness.h"

#include <memory>

namespace vt { namespace tests { namespace unit {

template <typename ColT>
struct MyMsg : vt::CollectionMessage<ColT> { };

struct TestCol : vt::Collection<TestCol,vt::Index1D> {
unsigned int prev_calls_ = 0;

unsigned int prevCalls() { return prev_calls_++; }

static void colHandler(MyMsg<TestCol>* msg, TestCol* col) {
fmt::print("format_print_debug_test_col");
auto& lb_data = col->lb_data_;
auto load_phase_count = lb_data.getLoadPhaseCount();
auto comm_phase_count = lb_data.getCommPhaseCount();
auto sp_load_phase_count = lb_data.getSubphaseLoadPhaseCount();
auto sp_comm_phase_count = lb_data.getSubphaseCommPhaseCount();

#if vt_check_enabled(lblite)
auto phase = col->prevCalls();
auto model = theLBManager()->getLoadModel();
auto phases_needed = model->getNumPastPhasesNeeded();
if (phase > phases_needed) {
// updatePhase will have caused entries to be added for the
// next phase already
EXPECT_EQ(load_phase_count, phases_needed + 1);
EXPECT_EQ(sp_load_phase_count, phases_needed + 1);
EXPECT_EQ(comm_phase_count, phases_needed + 1);
EXPECT_EQ(sp_comm_phase_count, phases_needed + 1);
} else if (phase == 0) {
EXPECT_EQ(load_phase_count, phase);
EXPECT_EQ(sp_load_phase_count, phase);
EXPECT_EQ(comm_phase_count, phase);
EXPECT_EQ(sp_comm_phase_count, phase);
} else {
// updatePhase will have caused entries to be added for the
// next phase already
EXPECT_EQ(load_phase_count, phase + 1);
EXPECT_EQ(sp_load_phase_count, phase + 1);
EXPECT_EQ(comm_phase_count, phase + 1);
EXPECT_EQ(sp_comm_phase_count, phase + 1);
}
#else
EXPECT_EQ(load_phase_count, 0);
EXPECT_EQ(sp_load_phase_count, 0);
EXPECT_EQ(comm_phase_count, 0);
EXPECT_EQ(sp_comm_phase_count, 0);
#endif
}
};

using TestLBHistogram = TestParallelHarness;

using vt::vrt::collection::balance::LoadModel;
using vt::vrt::collection::balance::PersistenceMedianLastN;

static constexpr int32_t const num_elms = 16;


//TODO
//Create a collection
//do some work (thereby calling computeStatistics)
//verify reduction of histogram is successful
//Done

TEST_F(TestLBHistogram, test_lbhistogram) {
static constexpr int const num_phases = 5;
SET_MIN_NUM_NODES_CONSTRAINT(2);

auto this_node = vt::theContext()->getNode();
auto num_nodes = static_cast<int32_t>(theContext()->getNumNodes());


// We must have more or equal number of elements than nodes for this test to
// work properly
EXPECT_GE(num_elms, vt::theContext()->getNumNodes());

auto range = vt::Index1D(num_elms);

vt::vrt::collection::CollectionProxy<TestCol> proxy;
if(this_node == 0){std::cout<<"1st I am in the test. I ran\n";}

// Construct two collections
runInEpochCollective([&]{
proxy = vt::theCollection()->constructCollective<TestCol>(
range, "test_lbhistogram"
);
});

// Get the base model, assert it's valid
auto base = theLBManager()->getBaseLoadModel();
EXPECT_NE(base, nullptr);

// Create a new model
auto persist = std::make_shared<PersistenceMedianLastN>(base, 1U);

// Set the new model
theLBManager()->setLoadModel(persist);
for (int i=0; i<num_phases; ++i) {
runInEpochCollective([&]{
// Do some work.
//this calls theLBManager()->computeStatistics(persist, false, phase, stats_cb);
proxy.broadcastCollective<MyMsg<TestCol>, &TestCol::colHandler>();
});
// Go to the next phase.
vt::thePhase()->nextPhaseCollective();
}
auto phase = vt::thePhase()->getCurrentPhase();
//TODO: access histogram and verify it's correct

}

}}} // end namespace vt::tests::unit