Skip to content

Commit

Permalink
#2240: Add unit tests for new allreduce and cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
JacobDomagala committed May 21, 2024
1 parent cbb21e0 commit 80d6712
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 61 deletions.
37 changes: 26 additions & 11 deletions src/vt/collective/reduce/allreduce/rabenseifner.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@
//@HEADER
*/


#if !defined INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RABENSEIFNER_H
#define INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RABENSEIFNER_H

#include "vt/config.h"
#include "vt/context/context.h"
#include "vt/messaging/message/message.h"
#include "vt/objgroup/proxy/proxy_objgroup.h"
#include "vt/registry/auto/auto_registry.h"
#include "vt/pipe/pipe_manager.h"

#include <tuple>
#include <cstdint>
Expand Down Expand Up @@ -95,7 +98,6 @@ struct Rabenseifner {
vt::objgroup::proxy::Proxy<ObjT> parentProxy, NodeType num_nodes,
Args&&... args)
: parent_proxy_(parentProxy),
val_(std::forward<Args>(args)...),
num_nodes_(num_nodes),
this_node_(vt::theContext()->getNode()),
is_even_(this_node_ % 2 == 0),
Expand All @@ -104,7 +106,15 @@ struct Rabenseifner {
nprocs_rem_(num_nodes_ - nprocs_pof2_),
gather_step_(num_steps_ - 1),
gather_mask_(nprocs_pof2_ >> 1),
finished_adjustment_part_(nprocs_rem_ == 0) {
finished_adjustment_part_(nprocs_rem_ == 0)
{
initialize(std::forward<Args>(args)...);
}

template <typename... Args>
void initialize(Args&&... args) {
val_ = DataT(std::forward<Args>(args)...);

is_part_of_adjustment_group_ = this_node_ < (2 * nprocs_rem_);
if (is_part_of_adjustment_group_) {
if (is_even_) {
Expand Down Expand Up @@ -156,6 +166,13 @@ struct Rabenseifner {
scatter_steps_recv_.resize(num_steps_, false);
}

void executeFinalHan() {

// theCB()->makeSend<finalHandler>(parent_proxy_[this_node_]).sendTuple(std::make_tuple(val_));
parent_proxy_[this_node_].template invoke<finalHandler>(val_);
completed_ = true;
}

void allreduce() {
if (nprocs_rem_) {
adjustForPowerOfTwo();
Expand All @@ -181,7 +198,7 @@ struct Rabenseifner {
}

void adjustForPowerOfTwoRightHalf(AllreduceRbnMsg<DataT>* msg) {
for (int i = 0; i < msg->val_.size(); i++) {
for (uint32_t i = 0; i < msg->val_.size(); i++) {
val_[(val_.size() / 2) + i] += msg->val_[i];
}

Expand All @@ -192,13 +209,13 @@ struct Rabenseifner {
}

void adjustForPowerOfTwoLeftHalf(AllreduceRbnMsg<DataT>* msg) {
for (int i = 0; i < msg->val_.size(); i++) {
for (uint32_t i = 0; i < msg->val_.size(); i++) {
val_[i] += msg->val_[i];
}
}

void adjustForPowerOfTwoFinalPart(AllreduceRbnMsg<DataT>* msg) {
for (int i = 0; i < msg->val_.size(); i++) {
for (uint32_t i = 0; i < msg->val_.size(); i++) {
val_[(val_.size() / 2) + i] = msg->val_[i];
}

Expand Down Expand Up @@ -243,7 +260,7 @@ struct Rabenseifner {
[](const auto val) { return val; })) {
auto& in_msg = scatter_messages_.at(step);
auto& in_val = in_msg->val_;
for (int i = 0; i < in_val.size(); i++) {
for (uint32_t i = 0; i < in_val.size(); i++) {
Op<typename DataT::value_type>()(
val_[r_index_[in_msg->step_] + i], in_val[i]);
}
Expand Down Expand Up @@ -339,7 +356,7 @@ struct Rabenseifner {
if (doRed) {
auto& in_msg = gather_messages_.at(step);
auto& in_val = in_msg->val_;
for (int i = 0; i < in_val.size(); i++) {
for (uint32_t i = 0; i < in_val.size(); i++) {
val_[s_index_[in_msg->step_] + i] = in_val[i];
}

Expand Down Expand Up @@ -417,8 +434,7 @@ struct Rabenseifner {
sendToExcludedNodes();
}

parent_proxy_[this_node_].template invoke<finalHandler>(val_);
completed_ = true;
executeFinalHan();
}

void sendToExcludedNodes() {
Expand All @@ -435,8 +451,7 @@ struct Rabenseifner {
void sendToExcludedNodesHandler(AllreduceRbnMsg<DataT>* msg) {
val_ = msg->val_;

parent_proxy_[this_node_].template invoke<finalHandler>(val_);
completed_ = true;
executeFinalHan();
}

vt::objgroup::proxy::Proxy<Rabenseifner> proxy_ = {};
Expand Down
14 changes: 10 additions & 4 deletions src/vt/collective/reduce/allreduce/recursive_doubling.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,19 @@ struct DistanceDoubling {
vt::objgroup::proxy::Proxy<ObjT> parentProxy, NodeType num_nodes,
Args&&... args)
: parent_proxy_(parentProxy),
val_(std::forward<Args>(args)...),
num_nodes_(num_nodes),
this_node_(vt::theContext()->getNode()),
is_even_(this_node_ % 2 == 0),
num_steps_(static_cast<int32_t>(log2(num_nodes_))),
nprocs_pof2_(1 << num_steps_),
nprocs_rem_(num_nodes_ - nprocs_pof2_),
finished_adjustment_part_(nprocs_rem_ == 0) {
initialize(std::forward<Args>(args)...);
}

template <typename... Args>
void initialize(Args&&... args) {
val_ = DataT(std::forward<Args>(args)...);
is_part_of_adjustment_group_ = this_node_ < (2 * nprocs_rem_);
if (is_part_of_adjustment_group_) {
if (is_even_) {
Expand Down Expand Up @@ -168,8 +173,8 @@ struct DistanceDoubling {
[](const auto val) { return val; });
}
bool isReady() {
return (is_part_of_adjustment_group_ and finished_adjustment_part_) and
step_ == 0 or
return ((is_part_of_adjustment_group_ and finished_adjustment_part_) and
step_ == 0) or
allMessagesReceived();
}

Expand Down Expand Up @@ -279,8 +284,9 @@ struct DistanceDoubling {
vt::objgroup::proxy::Proxy<ObjT> parent_proxy_ = {};

DataT val_ = {};
NodeType this_node_ = {};
NodeType num_nodes_ = {};
NodeType this_node_ = {};

bool is_even_ = false;
int32_t num_steps_ = {};
int32_t nprocs_pof2_ = {};
Expand Down
6 changes: 6 additions & 0 deletions src/vt/objgroup/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
//@HEADER
*/

#include "vt/configs/types/types_type.h"
#if !defined INCLUDED_VT_OBJGROUP_MANAGER_H
#define INCLUDED_VT_OBJGROUP_MANAGER_H

Expand Down Expand Up @@ -291,6 +292,9 @@ struct ObjGroupManager : runtime::component::Component<ObjGroupManager> {
ProxyType<ObjT> proxy, std::string const& name, std::string const& parent = ""
);

template <typename Reducer, auto f, typename ObjT, template <typename Arg> class Op, typename DataT>
ObjGroupManager::PendingSendType allreduce(ProxyType<ObjT> proxy, const DataT& data);

template <auto f, typename ObjT, template <typename Arg> class Op, typename DataT>
ObjGroupManager::PendingSendType allreduce(ProxyType<ObjT> proxy, const DataT& data);

Expand Down Expand Up @@ -504,6 +508,8 @@ ObjGroupManager::PendingSendType allreduce(ProxyType<ObjT> proxy, const DataT& d
std::unordered_map<ObjGroupProxyType, std::vector<ActionType>> pending_;
/// Map of object groups' labels
std::unordered_map<ObjGroupProxyType, std::string> labels_;

std::unordered_map<ObjGroupProxyType, ObjGroupProxyType> reducers_;
};

}} /* end namespace vt::objgroup */
Expand Down
105 changes: 61 additions & 44 deletions src/vt/objgroup/manager.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
//@HEADER
*/

#include "vt/configs/types/types_sentinels.h"
#if !defined INCLUDED_VT_OBJGROUP_MANAGER_IMPL_H
#define INCLUDED_VT_OBJGROUP_MANAGER_IMPL_H

Expand All @@ -58,7 +59,10 @@
#include "vt/messaging/active.h"
#include "vt/elm/elm_id_bits.h"
#include "vt/messaging/message/smart_ptr.h"
#include "vt/collective/reduce/allreduce/rabenseifner.h"
#include "vt/collective/reduce/allreduce/recursive_doubling.h"
#include <utility>
#include <array>

#include <memory>

Expand Down Expand Up @@ -264,57 +268,70 @@ ObjGroupManager::PendingSendType ObjGroupManager::broadcast(MsgSharedPtr<MsgT> m
return objgroup::broadcast(msg,han);
}


// Helper trait to detect if a type is a specialization of a given variadic template
template <template <typename...> class Template, typename T>
struct is_specialization_of : std::false_type {};

template <template <typename...> class Template, typename... Args>
struct is_specialization_of<Template, Template<Args...>> : std::true_type {};

// Specialized trait for std::array
template <typename T>
struct is_std_array : std::false_type {};

template <typename T, std::size_t N>
struct is_std_array<std::array<T, N>> : std::true_type {};

// Trait to detect if a type is a standard container (std::vector or std::array in this case)
template <typename T>
struct is_std_container : std::integral_constant<bool,
is_specialization_of<std::vector, T>::value || is_std_array<T>::value> {};

template <
typename Reducer, auto f, typename ObjT, template <typename Arg> class Op, typename DataT>
ObjGroupManager::PendingSendType ObjGroupManager::allreduce(
ProxyType<ObjT> proxy, const DataT& data) {
return PendingSendType{
theTerm()->getEpoch(), [=] {
auto const this_node = vt::theContext()->getNode();
auto const num_nodes = theContext()->getNumNodes();

proxy::Proxy<Reducer> grp_proxy = {};

if (reducers_.find(proxy.getProxy()) != reducers_.end()) {
auto* obj = reinterpret_cast<Reducer*>(
objs_[reducers_[proxy.getProxy()]]->getPtr()
);
obj->initialize(data);
grp_proxy = obj->proxy_;
} else {
grp_proxy = vt::theObjGroup()->makeCollective<Reducer>(
"allreduce_rabenseifner", proxy, num_nodes, data);
grp_proxy[this_node].get()->proxy_ = grp_proxy;
}

grp_proxy[this_node].template invoke<&Reducer::allreduce>();
}};
}

template <
auto f, typename ObjT, template <typename Arg> class Op, typename DataT>
ObjGroupManager::PendingSendType
ObjGroupManager::allreduce(ProxyType<ObjT> proxy, const DataT& data) {
// check payload size and choose appropriate algorithm

auto const this_node = vt::theContext()->getNode();
auto const num_nodes = theContext()->getNumNodes();

if (num_nodes < 2) {
if (theContext()->getNumNodes() < 2) {
return PendingSendType{nullptr};
}

// using Reducer = collective::reduce::allreduce::Rabenseifner<DataT>;
// using Reducer = collective::reduce::allreduce::DistanceDoubling<DataT, Op, ObjT, f>;

return PendingSendType{theTerm()->getEpoch(), [=] {
// auto grp_proxy =
// vt::theObjGroup()->makeCollective<Reducer>("allreduce_rabenseifner");
// if constexpr (std::is_same_v<
// Reducer,
// collective::reduce::allreduce::DistanceDoubling<DataT, Op, ObjT, f>>) {
// grp_proxy[this_node].template invoke<&Reducer::initialize>(
// data, grp_proxy, proxy, num_nodes);

// grp_proxy[this_node].template invoke<&Reducer::partOne>();

// } else if constexpr (std::is_same_v<
// Reducer,
// collective::reduce::allreduce::Rabenseifner<
// DataT, Op, ObjT, f>>) {
// grp_proxy[this_node].template invoke<&Reducer::initialize>(
// data, grp_proxy, num_nodes);

// if (grp_proxy.get()->nprocs_rem_) {
// vt::runInEpochCollective(
// [=] { grp_proxy[this_node].template invoke<&Reducer::partOne>(); });
// }

// vt::runInEpochCollective(
// [=] { grp_proxy[this_node].template invoke<&Reducer::partTwo>(); });

// vt::runInEpochCollective(
// [=] { grp_proxy[this_node].template invoke<&Reducer::partThree>(); });

// if (grp_proxy.get()->nprocs_rem_) {
// vt::runInEpochCollective(
// [=] { grp_proxy[this_node].template invoke<&Reducer::partFour>(); });
// }
// }
}};
if constexpr (is_std_container<DataT>::value) {
using Reducer =
vt::collective::reduce::allreduce::Rabenseifner<DataT, Op, ObjT, f>;
return allreduce<Reducer, f, ObjT, Op>(proxy, data);
} else {
using Reducer =
vt::collective::reduce::allreduce::DistanceDoubling<DataT, Op, ObjT, f>;
return allreduce<Reducer, f, ObjT, Op>(proxy, data);
}
}

template <typename ObjT, typename MsgT, ActiveTypedFnType<MsgT> *f>
Expand Down
6 changes: 4 additions & 2 deletions tests/perf/allreduce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,10 @@ VT_PERF_TEST(MyTest, test_allreduce_recursive_doubling) {
auto grp_proxy = vt::theObjGroup()->makeCollective<Reducer>(
"allreduce_recursive_doubling", proxy, num_nodes_, data);
grp_proxy[my_node_].get()->proxy_ = grp_proxy;
vt::runInEpochCollective(
[=] { grp_proxy[my_node_].template invoke<&Reducer::allreduce>(); });

theCollective()->barrier();
StartTimer(proxy[theContext()->getNode()].get()->timer_name_);
grp_proxy[my_node_].template invoke<&Reducer::allreduce>();
}

VT_PERF_TEST_MAIN()
43 changes: 43 additions & 0 deletions tests/unit/objgroup/test_objgroup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,49 @@ TEST_F(TestObjGroup, test_proxy_reduce) {
}
}

TEST_F(TestObjGroup, test_proxy_allreduce) {
using namespace vt::collective;

auto const my_node = vt::theContext()->getNode();

TestObjGroup::total_verify_expected_ = 0;
auto proxy = vt::theObjGroup()->makeCollective<MyObjA>("test_proxy_reduce");

vt::theCollective()->barrier();

runInEpochCollective(
[&] { proxy.allreduce_h<&MyObjA::verifyAllred<1>, PlusOp>(my_node); }
);

EXPECT_EQ(MyObjA::total_verify_expected_, 1);

runInEpochCollective(
[&] { proxy.allreduce_h<&MyObjA::verifyAllred<2>, PlusOp>(4); }
);

EXPECT_EQ(MyObjA::total_verify_expected_, 2);

runInEpochCollective(
[&] { proxy.allreduce_h<&MyObjA::verifyAllred<3>, MaxOp>(my_node); }
);

EXPECT_EQ(MyObjA::total_verify_expected_, 3);

runInEpochCollective([&] {
using Reducer =
vt::collective::reduce::allreduce::Rabenseifner<std::vector<int>, PlusOp, MyObjA, &MyObjA::verifyAllredVec>;
std::vector<int> payload(256, my_node);
theObjGroup()->allreduce<Reducer, &MyObjA::verifyAllredVec, MyObjA, PlusOp>(
proxy, payload
);
theObjGroup()->allreduce<Reducer, &MyObjA::verifyAllredVec, MyObjA, PlusOp>(
proxy, payload
);
});

EXPECT_EQ(MyObjA::total_verify_expected_, 5);
}

TEST_F(TestObjGroup, test_proxy_invoke) {
auto const& this_node = theContext()->getNode();

Expand Down
Loading

0 comments on commit 80d6712

Please sign in to comment.