Skip to content

Commit

Permalink
#1554: DR: add versions
Browse files Browse the repository at this point in the history
  • Loading branch information
lifflander committed Oct 14, 2021
1 parent b43eaa6 commit 3e93131
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 42 deletions.
6 changes: 3 additions & 3 deletions examples/hello_world/hello_world.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ static void hello_world(HelloMsg* msg) {
fmt::print("{}: Hello from node {} (start)\n", this_node, msg->from);
auto han_id = msg->handle_id_;
vt::datarep::Reader<std::vector<double>> my_reader{han_id};
my_reader.fetch();
my_reader.fetch(0);
vt::theSched()->runSchedulerWhile([&]{ return not my_reader.isReady(); });
auto const& vec = my_reader.get();
auto const& vec = my_reader.get(0);
for (auto&& elm : vec) {
vt_print(gen, "elm={}\n", elm);
}
Expand All @@ -83,7 +83,7 @@ int main(int argc, char** argv) {
for (int i = 0; i < 10; i++) {
my_vec.push_back(i*10);
}
auto my_dr = vt::theDR()->makeHandle(std::move(my_vec));
auto my_dr = vt::theDR()->makeHandle(0, std::move(my_vec));
auto msg = vt::makeMessage<HelloMsg>(this_node, my_dr.getHandleID());
vt::theMsg()->broadcastMsg<HelloMsg, hello_world>(msg);
}
Expand Down
2 changes: 2 additions & 0 deletions src/vt/configs/types/types_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ using ComponentIDType = uint32_t;
using ThreadIDType = uint64_t;
/// Used to hold a data replication handle
using DataRepIDType = uint64_t;
/// Used to hold a version of a data replication handle
using DataVersionType = int64_t;

// Action types for attaching a closure to a runtime function
/// Used for generically store an action to perform
Expand Down
24 changes: 17 additions & 7 deletions src/vt/datarep/datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,32 @@ namespace vt { namespace datarep {

struct DataStoreBase {
virtual ~DataStoreBase() = default;
virtual void const* get() const = 0;
virtual void const* get(DataVersionType version) const = 0;
virtual bool hasVersion(DataVersionType version) const = 0;
};

template <typename T>
struct DataStore final : DataStoreBase {

explicit DataStore(std::unique_ptr<T> data)
: cache_(std::move(data))
{ }
explicit DataStore(
bool in_is_master, DataVersionType version, std::shared_ptr<T> data
) : is_master(in_is_master)
{
cache_[version] = data;
}

void const* get(DataVersionType version) const override {
auto iter = cache_.find(version);
return static_cast<void const*>(iter->second.get());
}

void const* get() const override {
return static_cast<void const*>(cache_.get());
bool hasVersion(DataVersionType version) const override {
return cache_.find(version) != cache_.end();
}

private:
std::unique_ptr<T> cache_ = nullptr;
bool is_master = false;
std::unordered_map<DataVersionType, std::shared_ptr<T>> cache_ = {};
};

}} /* end namespace vt::datarep */
Expand Down
9 changes: 5 additions & 4 deletions src/vt/datarep/dr.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ struct DataRequestMsg;
} /* end namespace detail */

struct DataReplicator : runtime::component::Component<DataReplicator> {

std::string name() override { return "DataReplicator"; }

static std::unique_ptr<DataReplicator> construct();
Expand All @@ -81,7 +80,7 @@ struct DataReplicator : runtime::component::Component<DataReplicator> {
Reader<T> makeReader(DataRepIDType handle);

template <typename T>
DR<T> makeHandle(T&& data);
DR<T> makeHandle(DataVersionType version, T&& data);

template <typename T>
void migrateHandle(DR<T>& handle, vt::NodeType migrated_to);
Expand All @@ -93,10 +92,12 @@ struct DataReplicator : runtime::component::Component<DataReplicator> {
void unregisterHandle(DataRepIDType handle_id);

template <typename T>
bool requestData(DataRepIDType handle_id, bool* ready_ptr);
bool requestData(
DataVersionType version, DataRepIDType handle_id, bool* ready_ptr
);

template <typename T>
T const& getDataRef(DataRepIDType handle_id) const;
T const& getDataRef(DataVersionType version, DataRepIDType handle_id) const;

private:
template <typename T>
Expand Down
44 changes: 28 additions & 16 deletions src/vt/datarep/dr.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,21 @@ Reader<T> DataReplicator::makeReader(DataRepIDType handle) {
}

template <typename T>
DR<T> DataReplicator::makeHandle(T&& data) {
DR<T> DataReplicator::makeHandle(DataVersionType version, T&& data) {
auto const handle_id = registerHandle<T>();
vt_debug_print(
normal, gen,
"makeHandle: handle_id={}\n",
handle_id
"makeHandle: handle_id={}, version={}\n",
handle_id, version
);
theLocMan()->dataRep->registerEntity(handle_id, theContext()->getNode());
local_store_.emplace(
std::piecewise_construct,
std::forward_as_tuple(handle_id),
std::forward_as_tuple(
std::make_unique<DataStore<T>>(std::make_unique<T>(std::forward<T>(data)))
std::make_unique<DataStore<T>>(
true, version, std::make_shared<T>(std::forward<T>(data))
)
)
);
return DR<T>{typename DR<T>::DR_TAG_CONSTRUCT{}, handle_id};
Expand All @@ -99,9 +101,11 @@ void DataReplicator::unregisterHandle(DataRepIDType handle_id) {
}

template <typename T>
bool DataReplicator::requestData(DataRepIDType handle_id, bool* ready_ptr) {
bool DataReplicator::requestData(
DataVersionType version, DataRepIDType handle_id, bool* ready_ptr
) {
auto iter = local_store_.find(handle_id);
if (iter != local_store_.end()) {
if (iter != local_store_.end() && iter->second->hasVersion(version)) {
vt_debug_print(
normal, gen,
"requestData: handle_id={} found locally\n", handle_id
Expand All @@ -122,7 +126,7 @@ bool DataReplicator::requestData(DataRepIDType handle_id, bool* ready_ptr) {

using MsgType = detail::DataRequestMsg<T>;
auto const this_node = theContext()->getNode();
auto msg = makeMessage<MsgType>(this_node, handle_id);
auto msg = makeMessage<MsgType>(this_node, handle_id, version);
theLocMan()->dataRep->routeMsgHandler<MsgType, staticRequestHandler<T>>(
handle_id, getHomeNode(handle_id), msg.get()
);
Expand All @@ -143,29 +147,36 @@ template <typename T>
}

template <typename T>
T const& DataReplicator::getDataRef(DataRepIDType handle_id) const {
T const& DataReplicator::getDataRef(
DataVersionType version, DataRepIDType handle_id
) const {
auto iter = local_store_.find(handle_id);
vtAssert(iter != local_store_.end(), "Must exist at this point");
vt_debug_print(
normal, gen,
"getDataRef: handle_id={}\n", handle_id
"getDataRef: handle_id={}, version={}\n", handle_id, version
);
return *static_cast<T const*>(iter->second->get());
return *static_cast<T const*>(iter->second->get(version));
}

template <typename T>
void DataReplicator::dataIncomingHandler(detail::DataResponseMsg<T>* msg) {
auto const han_id = msg->handle_id_;
auto const version = msg->version_;
vt_debug_print(
normal, gen,
"dataIncomingHandler: han_id={}\n", han_id
"dataIncomingHandler: han_id={}, version={}\n", han_id, version
);
auto iter = local_store_.find(han_id);
vtAssert(iter == local_store_.end(), "Must not exist if requested");
local_store_.emplace(
std::piecewise_construct,
std::forward_as_tuple(han_id),
std::forward_as_tuple(std::make_unique<DataStore<T>>(std::move(msg->data_)))
std::forward_as_tuple(
std::make_unique<DataStore<T>>(
false, version, std::make_shared<T>(std::move(*msg->data_.get()))
)
)
);
// Inform that the data is ready
auto witer = waiting_.find(han_id);
Expand All @@ -181,17 +192,18 @@ template <typename T>
void DataReplicator::dataRequestHandler(detail::DataRequestMsg<T>* msg) {
auto const requestor = msg->requestor_;
auto const handle_id = msg->handle_id_;
auto const found = requestData<T>(handle_id, nullptr);
auto const version = msg->version_;
auto const found = requestData<T>(version, handle_id, nullptr);
vt_debug_print(
normal, gen,
"dataRequestHandle: handle_id={}, requestor={}, found={}\n",
handle_id, requestor, found
"dataRequestHandle: handle_id={}, requestor={}, version={}, found={}\n",
handle_id, requestor, version, found
);
if (found) {
auto proxy = objgroup::proxy::Proxy<DataReplicator>{proxy_};
proxy[requestor].template send<
detail::DataResponseMsg<T>, &DataReplicator::dataIncomingHandler<T>
>(handle_id, getDataRef<T>(handle_id));
>(handle_id, getDataRef<T>(version, handle_id), version);
}
}

Expand Down
20 changes: 14 additions & 6 deletions src/vt/datarep/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,17 @@ struct DataRequestMsg : LocationRoutedMsg<DataRepIDType, vt::Message> {
using MessageParentType = vt::Message;
vt_msg_serialize_prohibited();

DataRequestMsg(NodeType in_requestor, DataRepIDType in_handle_id)
: requestor_(in_requestor),
handle_id_(in_handle_id)
DataRequestMsg(
NodeType in_requestor, DataRepIDType in_handle_id,
DataVersionType in_version
) : requestor_(in_requestor),
handle_id_(in_handle_id),
version_(in_version)
{ }

NodeType requestor_ = uninitialized_destination;
DataRepIDType handle_id_ = no_datarep;
DataVersionType version_ = -1;
};

template <typename T>
Expand All @@ -68,20 +72,24 @@ struct DataResponseMsg : vt::Message {
vt_msg_serialize_if_needed_by_parent_or_type1(T);

DataResponseMsg() = default; // for serializer
DataResponseMsg(DataRepIDType in_handle_id, T const& data)
: handle_id_(in_handle_id),
data_(std::make_unique<T>(data))
DataResponseMsg(
DataRepIDType in_handle_id, T const& data, DataVersionType in_version
) : handle_id_(in_handle_id),
data_(std::make_unique<T>(data)),
version_(in_version)
{ }

template <typename SerializerT>
void serialize(SerializerT& s) {
MessageParentType::serialize(s);
s | handle_id_;
s | data_;
s | version_;
}

DataRepIDType handle_id_ = no_datarep;
std::unique_ptr<T> data_;
DataVersionType version_ = -1;
};


Expand Down
4 changes: 2 additions & 2 deletions src/vt/datarep/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ struct Reader {
public:
bool isReady() const { return ready_; }

void fetch();
void fetch(DataVersionType version);

T const& get() const;
T const& get(DataVersionType version) const;

private:
DataRepIDType handle_ = no_datarep;
Expand Down
8 changes: 4 additions & 4 deletions src/vt/datarep/reader.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@
namespace vt { namespace datarep {

template <typename T>
void Reader<T>::fetch() {
theDR()->requestData<T>(handle_, &ready_);
void Reader<T>::fetch(DataVersionType version) {
theDR()->requestData<T>(version, handle_, &ready_);
}

template <typename T>
T const& Reader<T>::get() const {
T const& Reader<T>::get(DataVersionType version) const {
vtAssert(ready_, "Data must be ready to get it");
return theDR()->getDataRef<T>(handle_);
return theDR()->getDataRef<T>(version, handle_);
}

}} /* end namespace vt::datarep */
Expand Down

0 comments on commit 3e93131

Please sign in to comment.