From 3eb49daf202e23875dc6dae3ae6c47341f6a44b0 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Tue, 5 Oct 2021 15:24:13 -0700 Subject: [PATCH] #1554: DR: add index dimension everywhere --- src/vt/datarep/datastore.h | 55 +++++++++++++++++++++++-------------- src/vt/datarep/dr.impl.h | 56 ++++++++++++++++++++++++-------------- 2 files changed, 70 insertions(+), 41 deletions(-) diff --git a/src/vt/datarep/datastore.h b/src/vt/datarep/datastore.h index dee6e752e4..d0a2370fc8 100644 --- a/src/vt/datarep/datastore.h +++ b/src/vt/datarep/datastore.h @@ -51,49 +51,64 @@ namespace vt { namespace datarep { struct DataStoreBase { virtual ~DataStoreBase() = default; - virtual void const* get(DataVersionType version) const = 0; - virtual bool hasVersion(DataVersionType version) const = 0; - virtual void unpublishVersion(DataVersionType version) = 0; }; -template +template struct DataStore final : DataStoreBase { + using VersionMapType = std::unordered_map>; explicit DataStore( - bool in_is_master, DataVersionType version, std::shared_ptr data + bool in_is_master, IndexT idx, DataVersionType version, + std::shared_ptr data ) : is_master(in_is_master) { - cache_[version] = data; + cache_[idx][version] = data; } - void const* get(DataVersionType version) const override { - auto iter = cache_.find(version); - return static_cast(iter->second.get()); + void const* get(IndexT idx, DataVersionType version) const { + auto iter = cache_.find(idx); + return static_cast(iter->second.find(version)->second.get()); } - std::shared_ptr getSharedPtr(DataVersionType version) const { - auto iter = cache_.find(version); - return iter->second; + std::shared_ptr getSharedPtr(IndexT idx, DataVersionType version) const { + auto iter = cache_.find(idx); + if (iter != cache_.end()) { + auto iter2 = iter->second.find(version); + if (iter2 != iter->second.end()) { + return iter2->second; + } + } + return nullptr; } - void publishVersion(DataVersionType version, std::shared_ptr data) { - cache_[version] = data; + void publishVersion( + IndexT idx, DataVersionType version, std::shared_ptr data + ) { + cache_[idx][version] = data; } - void unpublishVersion(DataVersionType version) override { - auto iter = cache_.find(version); + void unpublishVersion(IndexT idx, DataVersionType version) { + auto iter = cache_.find(idx); if (iter != cache_.end()) { - cache_.erase(iter); + auto iter2 = iter->second.find(version); + if (iter2 != iter->second.end()) { + iter2->second.erase(iter2); + } + if (iter->second.size() == 0) { + cache_.erase(iter); + } } } - bool hasVersion(DataVersionType version) const override { - return cache_.find(version) != cache_.end(); + bool hasVersion(IndexT idx, DataVersionType version) const { + auto iter = cache_.find(idx); + return iter != cache_.end() and + iter->second.find(version) != iter->second.end(); } private: bool is_master = false; - std::unordered_map> cache_ = {}; + std::unordered_map cache_ = {}; }; }} /* end namespace vt::datarep */ diff --git a/src/vt/datarep/dr.impl.h b/src/vt/datarep/dr.impl.h index d7af089cfb..26e9161afc 100644 --- a/src/vt/datarep/dr.impl.h +++ b/src/vt/datarep/dr.impl.h @@ -98,6 +98,7 @@ void DataReplicator::publishVersion( ) { auto handle = dr_base.getHandleID(); auto tag = dr_base.getTag(); + auto idx = dr_base.getIndex(); auto id = detail::DataIdentifier{handle, tag}; vt_debug_print( normal, gen, @@ -110,14 +111,14 @@ void DataReplicator::publishVersion( std::piecewise_construct, std::forward_as_tuple(id), std::forward_as_tuple( - std::make_unique>( - true, version, std::make_shared(std::forward(data)) + std::make_unique>( + true, idx, version, std::make_shared(std::forward(data)) ) ) ); } else { - auto ds = static_cast*>(iter->second.get()); - ds->publishVersion(version, std::make_shared(std::forward(data))); + auto ds = static_cast*>(iter->second.get()); + ds->publishVersion(idx, version, std::make_shared(std::forward(data))); } } @@ -127,6 +128,7 @@ void DataReplicator::unpublishVersion( ) { auto handle = dr_base.getHandleID(); auto tag = dr_base.getTag(); + auto idx = dr_base.getIndex(); auto id = detail::DataIdentifier{handle, tag}; vt_debug_print( normal, gen, @@ -135,7 +137,8 @@ void DataReplicator::unpublishVersion( ); auto iter = local_store_.find(id); vtAssert(iter != local_store_.end(), "Handle must exist"); - iter->second->unpublishVersion(version); + auto ds = static_cast*>(iter->second.get()); + ds->unpublishVersion(idx, version); } template @@ -161,11 +164,17 @@ bool DataReplicator::requestData( detail::DR_Base dr_base, DataVersionType version, detail::ReaderBase* reader ) { - auto handle = dr_base.getHandleID(); - auto tag = dr_base.getTag(); - auto id = detail::DataIdentifier{handle, tag}; + auto const handle = dr_base.getHandleID(); + auto const tag = dr_base.getTag(); + auto const idx = dr_base.getIndex(); + auto const id = detail::DataIdentifier{handle, tag}; auto iter = local_store_.find(id); - if (iter != local_store_.end() && iter->second->hasVersion(version)) { + if ( + iter != local_store_.end() and + static_cast*>(iter->second.get())->hasVersion( + idx, version + ) + ) { vt_debug_print( normal, gen, "requestData: handle_id={} found locally\n", handle @@ -174,9 +183,9 @@ bool DataReplicator::requestData( // deliver to the Reader // nothing to do data is here if (reader) { - auto ds = static_cast*>(iter->second.get()); - auto tr = static_cast*>(reader); - tr->data_ = ds->getSharedPtr(version); + auto ds = static_cast*>(iter->second.get()); + auto tr = static_cast*>(reader); + tr->data_ = ds->getSharedPtr(idx, version); tr->ready_ = true; } return true; @@ -213,16 +222,18 @@ template T const& DataReplicator::getDataRef( detail::DR_Base dr_base, DataVersionType version ) const { - auto handle = dr_base.getHandleID(); - auto tag = dr_base.getTag(); - auto id = detail::DataIdentifier{handle, tag}; + auto const handle = dr_base.getHandleID(); + auto const tag = dr_base.getTag(); + auto const idx = dr_base.getIndex(); + auto const id = detail::DataIdentifier{handle, tag}; auto iter = local_store_.find(id); vtAssert(iter != local_store_.end(), "Must exist at this point"); vt_debug_print( normal, gen, - "getDataRef: handle_id={}, version={}\n", handle, version + "getDataRef: handle_id={}, version={}, idx={}\n", handle, version, idx ); - return *static_cast(iter->second->get(version)); + auto ds = static_cast*>(iter->second.get()); + return *static_cast(ds->get(idx, version)); } template @@ -232,6 +243,7 @@ void DataReplicator::dataIncomingHandler( auto const dr_base = msg->dr_base_; auto const handle = dr_base.getHandleID(); auto const tag = dr_base.getTag(); + auto const idx = dr_base.getIndex(); auto const id = detail::DataIdentifier{handle, tag}; auto const version = msg->version_; vt_debug_print( @@ -244,18 +256,20 @@ void DataReplicator::dataIncomingHandler( std::piecewise_construct, std::forward_as_tuple(id), std::forward_as_tuple( - std::make_unique>( - false, version, std::make_shared(std::move(*msg->data_.get())) + std::make_unique>( + false, idx, version, std::make_shared(std::move(*msg->data_.get())) ) ) ); - auto ds = static_cast*>(local_store_.find(id)->second.get()); + auto ds = static_cast*>( + local_store_.find(id)->second.get() + ); // Inform that the data is ready auto witer = waiting_.find(id); if (witer != waiting_.end()) { for (auto&& elm : witer->second) { auto tr = static_cast*>(elm); - tr->data_ = ds->getSharedPtr(version); + tr->data_ = ds->getSharedPtr(idx, version); tr->ready_ = true; } waiting_.erase(witer);