Skip to content

Commit

Permalink
#1554: DR: add index dimension everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
lifflander committed Oct 14, 2021
1 parent 4b0625c commit e5d03df
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 41 deletions.
55 changes: 35 additions & 20 deletions src/vt/datarep/datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,49 +51,64 @@ namespace vt { namespace datarep {

struct DataStoreBase {
virtual ~DataStoreBase() = default;
virtual void const* get(DataVersionType version) const = 0;
virtual bool hasVersion(DataVersionType version) const = 0;
virtual void unpublishVersion(DataVersionType version) = 0;
};

template <typename T>
template <typename T, typename IndexT>
struct DataStore final : DataStoreBase {
using VersionMapType = std::unordered_map<DataVersionType, std::shared_ptr<T>>;

explicit DataStore(
bool in_is_master, DataVersionType version, std::shared_ptr<T> data
bool in_is_master, IndexT idx, DataVersionType version,
std::shared_ptr<T> data
) : is_master(in_is_master)
{
cache_[version] = data;
cache_[idx][version] = data;
}

void const* get(DataVersionType version) const override {
auto iter = cache_.find(version);
return static_cast<void const*>(iter->second.get());
void const* get(IndexT idx, DataVersionType version) const {
auto iter = cache_.find(idx);
return static_cast<void const*>(iter->second.find(version)->second.get());
}

std::shared_ptr<T> getSharedPtr(DataVersionType version) const {
auto iter = cache_.find(version);
return iter->second;
std::shared_ptr<T> getSharedPtr(IndexT idx, DataVersionType version) const {
auto iter = cache_.find(idx);
if (iter != cache_.end()) {
auto iter2 = iter->second.find(version);
if (iter2 != iter->second.end()) {
return iter2->second;
}
}
return nullptr;
}

void publishVersion(DataVersionType version, std::shared_ptr<T> data) {
cache_[version] = data;
void publishVersion(
IndexT idx, DataVersionType version, std::shared_ptr<T> data
) {
cache_[idx][version] = data;
}

void unpublishVersion(DataVersionType version) override {
auto iter = cache_.find(version);
void unpublishVersion(IndexT idx, DataVersionType version) {
auto iter = cache_.find(idx);
if (iter != cache_.end()) {
cache_.erase(iter);
auto iter2 = iter->second.find(version);
if (iter2 != iter->second.end()) {
iter2->second.erase(iter2);
}
if (iter->second.size() == 0) {
cache_.erase(iter);
}
}
}

bool hasVersion(DataVersionType version) const override {
return cache_.find(version) != cache_.end();
bool hasVersion(IndexT idx, DataVersionType version) const {
auto iter = cache_.find(idx);
return iter != cache_.end() and
iter->second.find(version) != iter->second.end();
}

private:
bool is_master = false;
std::unordered_map<DataVersionType, std::shared_ptr<T>> cache_ = {};
std::unordered_map<IndexT, VersionMapType> cache_ = {};
};

}} /* end namespace vt::datarep */
Expand Down
56 changes: 35 additions & 21 deletions src/vt/datarep/dr.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ void DataReplicator::publishVersion(
) {
auto handle = dr_base.getHandleID();
auto tag = dr_base.getTag();
auto idx = dr_base.getIndex();
auto id = detail::DataIdentifier{handle, tag};
vt_debug_print(
normal, gen,
Expand All @@ -110,14 +111,14 @@ void DataReplicator::publishVersion(
std::piecewise_construct,
std::forward_as_tuple(id),
std::forward_as_tuple(
std::make_unique<DataStore<T>>(
true, version, std::make_shared<T>(std::forward<T>(data))
std::make_unique<DataStore<T, IndexT>>(
true, idx, version, std::make_shared<T>(std::forward<T>(data))
)
)
);
} else {
auto ds = static_cast<DataStore<T>*>(iter->second.get());
ds->publishVersion(version, std::make_shared<T>(std::forward<T>(data)));
auto ds = static_cast<DataStore<T, IndexT>*>(iter->second.get());
ds->publishVersion(idx, version, std::make_shared<T>(std::forward<T>(data)));
}
}

Expand All @@ -127,6 +128,7 @@ void DataReplicator::unpublishVersion(
) {
auto handle = dr_base.getHandleID();
auto tag = dr_base.getTag();
auto idx = dr_base.getIndex();
auto id = detail::DataIdentifier{handle, tag};
vt_debug_print(
normal, gen,
Expand All @@ -135,7 +137,8 @@ void DataReplicator::unpublishVersion(
);
auto iter = local_store_.find(id);
vtAssert(iter != local_store_.end(), "Handle must exist");
iter->second->unpublishVersion(version);
auto ds = static_cast<DataStore<T, IndexT>*>(iter->second.get());
ds->unpublishVersion(idx, version);
}

template <typename T>
Expand All @@ -161,11 +164,17 @@ bool DataReplicator::requestData(
detail::DR_Base<IndexT> dr_base, DataVersionType version,
detail::ReaderBase* reader
) {
auto handle = dr_base.getHandleID();
auto tag = dr_base.getTag();
auto id = detail::DataIdentifier{handle, tag};
auto const handle = dr_base.getHandleID();
auto const tag = dr_base.getTag();
auto const idx = dr_base.getIndex();
auto const id = detail::DataIdentifier{handle, tag};
auto iter = local_store_.find(id);
if (iter != local_store_.end() && iter->second->hasVersion(version)) {
if (
iter != local_store_.end() and
static_cast<DataStore<T, IndexT>*>(iter->second.get())->hasVersion(
idx, version
)
) {
vt_debug_print(
normal, gen,
"requestData: handle_id={} found locally\n", handle
Expand All @@ -174,9 +183,9 @@ bool DataReplicator::requestData(
// deliver to the Reader
// nothing to do data is here
if (reader) {
auto ds = static_cast<DataStore<T>*>(iter->second.get());
auto tr = static_cast<Reader<T>*>(reader);
tr->data_ = ds->getSharedPtr(version);
auto ds = static_cast<DataStore<T, IndexT>*>(iter->second.get());
auto tr = static_cast<Reader<T, IndexT>*>(reader);
tr->data_ = ds->getSharedPtr(idx, version);
tr->ready_ = true;
}
return true;
Expand Down Expand Up @@ -213,16 +222,18 @@ template <typename T, typename IndexT>
T const& DataReplicator::getDataRef(
detail::DR_Base<IndexT> dr_base, DataVersionType version
) const {
auto handle = dr_base.getHandleID();
auto tag = dr_base.getTag();
auto id = detail::DataIdentifier{handle, tag};
auto const handle = dr_base.getHandleID();
auto const tag = dr_base.getTag();
auto const idx = dr_base.getIndex();
auto const id = detail::DataIdentifier{handle, tag};
auto iter = local_store_.find(id);
vtAssert(iter != local_store_.end(), "Must exist at this point");
vt_debug_print(
normal, gen,
"getDataRef: handle_id={}, version={}\n", handle, version
"getDataRef: handle_id={}, version={}, idx={}\n", handle, version, idx
);
return *static_cast<T const*>(iter->second->get(version));
auto ds = static_cast<DataStore<T, IndexT>*>(iter->second.get());
return *static_cast<T const*>(ds->get(idx, version));
}

template <typename T, typename IndexT>
Expand All @@ -232,6 +243,7 @@ void DataReplicator::dataIncomingHandler(
auto const dr_base = msg->dr_base_;
auto const handle = dr_base.getHandleID();
auto const tag = dr_base.getTag();
auto const idx = dr_base.getIndex();
auto const id = detail::DataIdentifier{handle, tag};
auto const version = msg->version_;
vt_debug_print(
Expand All @@ -244,18 +256,20 @@ void DataReplicator::dataIncomingHandler(
std::piecewise_construct,
std::forward_as_tuple(id),
std::forward_as_tuple(
std::make_unique<DataStore<T>>(
false, version, std::make_shared<T>(std::move(*msg->data_.get()))
std::make_unique<DataStore<T, IndexT>>(
false, idx, version, std::make_shared<T>(std::move(*msg->data_.get()))
)
)
);
auto ds = static_cast<DataStore<T>*>(local_store_.find(id)->second.get());
auto ds = static_cast<DataStore<T, IndexT>*>(
local_store_.find(id)->second.get()
);
// Inform that the data is ready
auto witer = waiting_.find(id);
if (witer != waiting_.end()) {
for (auto&& elm : witer->second) {
auto tr = static_cast<Reader<T>*>(elm);
tr->data_ = ds->getSharedPtr(version);
tr->data_ = ds->getSharedPtr(idx, version);
tr->ready_ = true;
}
waiting_.erase(witer);
Expand Down

0 comments on commit e5d03df

Please sign in to comment.