Skip to content

Commit

Permalink
#1554: DR: more versioning
Browse files Browse the repository at this point in the history
  • Loading branch information
lifflander committed Oct 19, 2021
1 parent 4f99dd1 commit acc1a4d
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 10 deletions.
18 changes: 18 additions & 0 deletions src/vt/datarep/datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@
#define INCLUDED_VT_DATAREP_DATASTORE_H

#include <memory>
#include <unordered_map>

namespace vt { namespace datarep {

struct DataStoreBase {
virtual ~DataStoreBase() = default;
virtual void const* get(DataVersionType version) const = 0;
virtual bool hasVersion(DataVersionType version) const = 0;
virtual void unpublishVersion(DataVersionType version) = 0;
};

template <typename T>
Expand All @@ -69,6 +71,22 @@ struct DataStore final : DataStoreBase {
return static_cast<void const*>(iter->second.get());
}

std::shared_ptr<T> getSharedPtr(DataVersionType version) const {
auto iter = cache_.find(version);
return iter->second;
}

void publishVersion(DataVersionType version, std::shared_ptr<T> data) {
cache_[version] = data;
}

void unpublishVersion(DataVersionType version) override {
auto iter = cache_.find(version);
if (iter != cache_.end()) {
cache_.erase(iter);
}
}

bool hasVersion(DataVersionType version) const override {
return cache_.find(version) != cache_.end();
}
Expand Down
10 changes: 8 additions & 2 deletions src/vt/datarep/dr.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ struct DataReplicator : runtime::component::Component<DataReplicator> {
template <typename T>
DR<T> makeHandle(DataVersionType version, T&& data);

template <typename T>
void publishVersion(DataRepIDType handle, DataVersionType version, T&& data);

template <typename T>
void unpublishVersion(DataRepIDType handle, DataVersionType version);

template <typename T>
void migrateHandle(DR<T>& handle, vt::NodeType migrated_to);

Expand All @@ -93,7 +99,7 @@ struct DataReplicator : runtime::component::Component<DataReplicator> {

template <typename T>
bool requestData(
DataVersionType version, DataRepIDType handle_id, bool* ready_ptr
DataVersionType version, DataRepIDType handle_id, ReaderBase* reader
);

template <typename T>
Expand All @@ -117,7 +123,7 @@ struct DataReplicator : runtime::component::Component<DataReplicator> {
DataRepIDType identifier_ = 1;
ObjGroupProxyType proxy_ = no_obj_group;
std::unordered_map<DataRepIDType, std::unique_ptr<DataStoreBase>> local_store_;
std::unordered_map<DataRepIDType, std::vector<bool*>> waiting_;
std::unordered_map<DataRepIDType, std::vector<ReaderBase*>> waiting_;
};

}} /* end namespace vt::datarep */
Expand Down
46 changes: 41 additions & 5 deletions src/vt/datarep/dr.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "vt/topos/location/manager.h"
#include "vt/datarep/dr.h"
#include "vt/datarep/msg.h"
#include "vt/datarep/datastore.h"
#include "vt/objgroup/manager.h"

namespace vt { namespace datarep {
Expand Down Expand Up @@ -82,6 +83,35 @@ DR<T> DataReplicator::makeHandle(DataVersionType version, T&& data) {
return DR<T>{typename DR<T>::DR_TAG_CONSTRUCT{}, handle_id};
}

template <typename T>
void DataReplicator::publishVersion(
DataRepIDType handle, DataVersionType version, T&& data
) {
vt_debug_print(
normal, gen,
"publishVersion handle_id={}, version={}\n",
handle, version
);
auto iter = local_store_.find(handle);
vtAssert(iter != local_store_.end(), "Handle must exist");
auto ds = static_cast<DataStore<T>*>(iter->second.get());
ds->publishVersion(version, std::make_shared<T>(std::forward<T>(data)));
}

template <typename T>
void DataReplicator::unpublishVersion(
DataRepIDType handle, DataVersionType version
) {
vt_debug_print(
normal, gen,
"unpublishVersion handle_id={}, version={}\n",
handle, version
);
auto iter = local_store_.find(handle);
vtAssert(iter != local_store_.end(), "Handle must exist");
iter->second->unpublishVersion(version);
}

template <typename T>
void DataReplicator::migrateHandle(DR<T>& handle, vt::NodeType migrated_to) {
theLocMan()->dataRep->entityEmigrated(handle.handle_, migrated_to);
Expand All @@ -102,7 +132,7 @@ void DataReplicator::unregisterHandle(DataRepIDType handle_id) {

template <typename T>
bool DataReplicator::requestData(
DataVersionType version, DataRepIDType handle_id, bool* ready_ptr
DataVersionType version, DataRepIDType handle_id, ReaderBase* reader
) {
auto iter = local_store_.find(handle_id);
if (iter != local_store_.end() && iter->second->hasVersion(version)) {
Expand All @@ -113,16 +143,19 @@ bool DataReplicator::requestData(
// found in cache
// deliver to the Reader
// nothing to do data is here
if (ready_ptr) {
*ready_ptr = true;
if (reader) {
auto ds = static_cast<DataStore<T>*>(iter->second.get());
auto tr = static_cast<Reader<T>*>(reader);
tr->data_ = ds->getSharedPtr(version);
tr->ready_ = true;
}
return true;
} else {
vt_debug_print(
normal, gen,
"requestData: handle_id={} remote request\n", handle_id
);
waiting_[handle_id].push_back(ready_ptr);
waiting_[handle_id].push_back(reader);

using MsgType = detail::DataRequestMsg<T>;
auto const this_node = theContext()->getNode();
Expand Down Expand Up @@ -178,11 +211,14 @@ void DataReplicator::dataIncomingHandler(detail::DataResponseMsg<T>* msg) {
)
)
);
auto ds = static_cast<DataStore<T>*>(local_store_.find(han_id)->second.get());
// Inform that the data is ready
auto witer = waiting_.find(han_id);
if (witer != waiting_.end()) {
for (auto&& elm : witer->second) {
*elm = true;
auto tr = static_cast<Reader<T>*>(elm);
tr->data_ = ds->getSharedPtr(version);
tr->ready_ = true;
}
waiting_.erase(witer);
}
Expand Down
5 changes: 5 additions & 0 deletions src/vt/datarep/handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ struct DR {

DataRepIDType getHandleID() const { return handle_; }

template <typename U>
void publish(DataVersionType version, U&& data);

void unpublish(DataVersionType version);

private:
struct DR_TAG_CONSTRUCT {};

Expand Down
12 changes: 12 additions & 0 deletions src/vt/datarep/handle.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ DR<T>::~DR() {
}
}

template <typename T>
template <typename U>
void DR<T>::publish(DataVersionType version, U&& data) {
theDR()->publishVersion(handle_, version, std::forward<U>(data));
}

template <typename T>
void DR<T>::unpublish(DataVersionType version) {

}


}} /* end namespace vt::datarep */

#endif /*INCLUDED_VT_DATAREP_HANDLE_IMPL_H*/
11 changes: 10 additions & 1 deletion src/vt/datarep/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,16 @@

namespace vt { namespace datarep {

struct ReaderBase {};

template <typename T>
struct Reader {
struct Reader : ReaderBase {
explicit Reader(DataRepIDType handle_id)
: handle_(handle_id)
{ }
Reader(Reader const&) = default;
Reader(Reader&&) = default;
Reader& operator=(Reader const&) = default;

public:
bool isReady() const { return ready_; }
Expand All @@ -63,8 +68,12 @@ struct Reader {
T const& get(DataVersionType version) const;

private:
friend struct DataReplicator;

DataRepIDType handle_ = no_datarep;
DataVersionType version_ = -1;
bool ready_ = false;
std::shared_ptr<T> data_ = nullptr;
};

}} /* end namespace vt::datarep */
Expand Down
5 changes: 3 additions & 2 deletions src/vt/datarep/reader.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@ namespace vt { namespace datarep {

template <typename T>
void Reader<T>::fetch(DataVersionType version) {
theDR()->requestData<T>(version, handle_, &ready_);
theDR()->requestData<T>(version, handle_, this);
}

template <typename T>
T const& Reader<T>::get(DataVersionType version) const {
vtAssert(ready_, "Data must be ready to get it");
return theDR()->getDataRef<T>(version, handle_);
vtAssert(data_ != nullptr, "Must have data");
return *data_;
}

}} /* end namespace vt::datarep */
Expand Down

0 comments on commit acc1a4d

Please sign in to comment.