Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add New SelectiveMetadatAggregation method #4387

Merged
merged 2 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ add_library(adios2_core
toolkit/format/bp5/BP5Deserializer.cpp
toolkit/format/bp5/BP5Deserializer.tcc
toolkit/format/bp5/BP5Serializer.cpp
toolkit/format/bp5/BP5Helper.cpp

toolkit/profiling/iochrono/Timer.cpp
toolkit/profiling/iochrono/IOChrono.cpp
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ class BP5Engine
MACRO(StatsBlockSize, SizeBytes, size_t, DefaultStatsBlockSize) \
MACRO(Threads, UInt, unsigned int, 0) \
MACRO(UseOneTimeAttributes, Bool, bool, true) \
MACRO(UseSelectiveMetadataAggregation, Bool, bool, true) \
MACRO(OneLevelGatherSizeLimit, Int, int, 6000) \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OneLevelGatherSizeLimit: Size of what? To me, it first suggests a metadata size limit. Propose new names: OneLevelGatherMPISizeLimit, OneLevelGatherNprocLimit
Change could go into another PR with lines in the documentation ;-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are already the 2nd and 3rd longest parameter names in BP5, beaten only by "BeginStepPollingFrequencySecs", so I'm a little hesitant to make it longer. But they do need to be added to the docs, one way or another. Happy specifying that in the documentation? If not I probably like MPISizeLimit better than Nproc...

Probably time to clean these up a bit. I see at least a couple of params that aren't used (StatsBlockSize and CollectiveMetadata).

Copy link
Contributor

@anagainaru anagainaru Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would actually call it OneLevelGatherRanksLimit since the threshold is given by the number of ranks that are part of the gather right?

Later edit: I like Norbert's OneLevelGatherNprocLimit as well and it doesn't add more words

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like OneLevelGatherRanksLimit. I've changed to that, eliminated a couple of unused (and undocumented) parameters and added some docs on the parameters introduced here. Turns out I miscounted and UseSelectiveMetadataAggregation was longer than anything and I had to reformat the table a bit to get it to fit.

MACRO(FlattenSteps, Bool, bool, false) \
MACRO(IgnoreFlattenSteps, Bool, bool, false) \
MACRO(RemoteDataPath, String, std::string, "") \
Expand Down
220 changes: 184 additions & 36 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "BP5Writer.h"
#include "BP5Writer.tcc"
#include "adios2/toolkit/format/bp5/BP5Helper.h"

#include "adios2/common/ADIOSMacros.h"
#include "adios2/core/IO.h"
Expand Down Expand Up @@ -183,13 +184,66 @@ uint64_t BP5Writer::WriteMetadata(const std::vector<core::iovec> &MetaDataBlocks
m_FileMetadataManager.WriteFiles((char *)AttrSizeVector.data(),
sizeof(uint64_t) * AttrSizeVector.size());
MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size();
m_Profiler.Start("MetadataBlockWrite");
for (auto &b : MetaDataBlocks)
{
if (!b.iov_base)
continue;
m_FileMetadataManager.WriteFiles((char *)b.iov_base, b.iov_len);
MetaDataSize += b.iov_len;
}
m_Profiler.Stop("MetadataBlockWrite");

for (auto &b : AttributeBlocks)
{
if (!b.iov_base)
continue;
m_FileMetadataManager.WriteFiles((char *)b.iov_base, b.iov_len);
MetaDataSize += b.iov_len;
}

m_FileMetadataManager.FlushFiles();

m_MetaDataPos += MetaDataSize;
return MetaDataSize;
}

uint64_t BP5Writer::WriteMetadata(const std::vector<char> &ContigMetaData,
const std::vector<size_t> &SizeVector,
const std::vector<core::iovec> &AttributeBlocks)
{
size_t MDataTotalSize = std::accumulate(SizeVector.begin(), SizeVector.end(), size_t(0));
uint64_t MetaDataSize = 0;
std::vector<uint64_t> AttrSizeVector;

MDataTotalSize += SizeVector.size() * sizeof(size_t);
for (size_t a = 0; a < SizeVector.size(); a++)
{
if (a < AttributeBlocks.size())
{
auto &b = AttributeBlocks[a];
MDataTotalSize += sizeof(uint64_t) + b.iov_len;
AttrSizeVector.push_back(b.iov_len);
}
else
{
AttrSizeVector.push_back(0);
MDataTotalSize += sizeof(uint64_t);
}
}
MetaDataSize = 0;
m_FileMetadataManager.WriteFiles((char *)&MDataTotalSize, sizeof(uint64_t));
MetaDataSize += sizeof(uint64_t);
m_FileMetadataManager.WriteFiles((char *)SizeVector.data(),
sizeof(uint64_t) * SizeVector.size());
MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size();
m_FileMetadataManager.WriteFiles((char *)AttrSizeVector.data(),
sizeof(uint64_t) * AttrSizeVector.size());
MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size();
m_Profiler.Start("MetadataBlockWrite");
m_FileMetadataManager.WriteFiles(ContigMetaData.data(), ContigMetaData.size());
m_Profiler.Stop("MetadataBlockWrite");
MetaDataSize += ContigMetaData.size();

for (auto &b : AttributeBlocks)
{
Expand Down Expand Up @@ -534,8 +588,6 @@ void BP5Writer::ComputeDerivedVariables()
if (mvi->BlocksInfo.size() == 0)
{
computeDerived = false;
std::cout << "Variable " << itVariable->first << " not written in this step";
std::cout << " .. skip derived variable " << (*it).second->m_Name << std::endl;
break;
}
nameToVarInfo.insert({varName, std::unique_ptr<MinVarInfo>(mvi)});
Expand Down Expand Up @@ -568,41 +620,93 @@ void BP5Writer::ComputeDerivedVariables()
}
#endif

void BP5Writer::EndStep()
void BP5Writer::SelectiveAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo)
{
#ifdef ADIOS2_HAVE_DERIVED_VARIABLE
ComputeDerivedVariables();
#endif
m_BetweenStepPairs = false;
PERFSTUBS_SCOPED_TIMER("BP5Writer::EndStep");
m_Profiler.Start("ES");

m_Profiler.Start("ES_close");
MarshalAttributes();

// true: advances step
auto TSInfo = m_BP5Serializer.CloseTimestep((int)m_WriterStep,
m_Parameters.AsyncWrite || m_Parameters.DirectIO);

/* TSInfo includes NewMetaMetaBlocks, the MetaEncodeBuffer, the
* AttributeEncodeBuffer and the data encode Vector */

m_ThisTimestepDataSize += TSInfo.DataBuffer->Size();
m_Profiler.Stop("ES_close");

m_Profiler.Start("ES_AWD");
// TSInfo destructor would delete the DataBuffer so we need to save it
// for async IO and let the writer free it up when not needed anymore
m_AsyncWriteLock.lock();
m_flagRush = false;
m_AsyncWriteLock.unlock();
std::vector<format::BP5Base::MetaMetaInfoBlock> UniqueMetaMetaBlocks;
std::vector<uint64_t> DataSizes;
std::vector<core::iovec> AttributeBlocks;
std::vector<size_t> MetaEncodeSize;
m_WriterDataPos.resize(0);
m_WriterDataPos.push_back(m_StartDataPos);
UniqueMetaMetaBlocks = TSInfo.NewMetaMetaBlocks;
if (TSInfo.AttributeEncodeBuffer)
AttributeBlocks.push_back(
{TSInfo.AttributeEncodeBuffer->Data(), TSInfo.AttributeEncodeBuffer->m_FixedSize});
size_t AlignedMetadataSize = (TSInfo.MetaEncodeBuffer->m_FixedSize + 7) & ~0x7;
MetaEncodeSize.push_back(AlignedMetadataSize);

// WriteData will free TSInfo.DataBuffer
WriteData(TSInfo.DataBuffer);
TSInfo.DataBuffer = NULL;
m_Profiler.Start("ES_aggregate_info");
BP5Helper::BP5AggregateInformation(m_Comm, m_Profiler, UniqueMetaMetaBlocks, AttributeBlocks,
MetaEncodeSize, m_WriterDataPos);

m_Profiler.Stop("ES_AWD");
m_Profiler.Stop("ES_aggregate_info");
m_Profiler.Start("ES_gather_write_meta");
if (m_Comm.Rank() == 0)
{
m_Profiler.Start("ES_AGG1");
size_t MetadataTotalSize =
std::accumulate(MetaEncodeSize.begin(), MetaEncodeSize.end(), size_t(0));
assert(m_WriterDataPos.size() == static_cast<size_t>(m_Comm.Size()));
WriteMetaMetadata(UniqueMetaMetaBlocks);
for (auto &mm : UniqueMetaMetaBlocks)
{
free((void *)mm.MetaMetaInfo);
free((void *)mm.MetaMetaID);
}
m_LatestMetaDataPos = m_MetaDataPos;
std::vector<char> ContigMetadata;
ContigMetadata.resize(MetadataTotalSize);
auto AlignedCounts = MetaEncodeSize;
for (auto &C : AlignedCounts)
C /= 8;
m_Profiler.Stop("ES_AGG1");
m_Profiler.Start("ES_GatherMetadataBlocks");
if (m_Comm.Size() > m_Parameters.OneLevelGatherSizeLimit)
{
BP5Helper::GathervArraysTwoLevel(
m_AggregatorMetadata.m_Comm, m_CommMetadataAggregators, m_Profiler,
(uint64_t *)TSInfo.MetaEncodeBuffer->Data(), AlignedMetadataSize / 8,
AlignedCounts.data(), AlignedCounts.size(), (uint64_t *)ContigMetadata.data(), 0);
}
else
{
m_Comm.GathervArrays((uint64_t *)TSInfo.MetaEncodeBuffer->Data(),
AlignedMetadataSize / 8, AlignedCounts.data(),
AlignedCounts.size(), (uint64_t *)ContigMetadata.data(), 0);
}
m_Profiler.Stop("ES_GatherMetadataBlocks");
m_Profiler.Start("ES_write_metadata");
m_LatestMetaDataSize = WriteMetadata(ContigMetadata, MetaEncodeSize, AttributeBlocks);

m_Profiler.Stop("ES_write_metadata");
for (auto &a : AttributeBlocks)
free((void *)a.iov_base);
if (!m_Parameters.AsyncWrite)
{
WriteMetadataFileIndex(m_LatestMetaDataPos, m_LatestMetaDataSize);
}
}
else
{
if (m_Comm.Size() > m_Parameters.OneLevelGatherSizeLimit)
{
BP5Helper::GathervArraysTwoLevel(
m_AggregatorMetadata.m_Comm, m_CommMetadataAggregators, m_Profiler,
(uint64_t *)TSInfo.MetaEncodeBuffer->Data(), AlignedMetadataSize / 8, NULL,
0, // AlignedCounts.data(), AlignedCounts.size(),
nullptr, 0);
}
else
{
m_Comm.GathervArrays(TSInfo.MetaEncodeBuffer->Data(), AlignedMetadataSize,
MetaEncodeSize.data(), MetaEncodeSize.size(), (char *)nullptr, 0);
}
}
m_Profiler.Stop("ES_gather_write_meta");
}

void BP5Writer::TwoLevelAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo)
{
/*
* Two-step metadata aggregation
*/
Expand All @@ -629,9 +733,6 @@ void BP5Writer::EndStep()
for (auto &n : RecvCounts)
TotalSize += n;
RecvBuffer.resize(TotalSize);
/*std::cout << "MD Lvl-1: rank " << m_Comm.Rank() << " gather "
<< TotalSize << " bytes from aggregator group"
<< std::endl;*/
}
m_AggregatorMetadata.m_Comm.GathervArrays(MetaBuffer.data(), LocalSize, RecvCounts.data(),
RecvCounts.size(), RecvBuffer.data(), 0);
Expand Down Expand Up @@ -698,14 +799,61 @@ void BP5Writer::EndStep()
assert(m_WriterDataPos.size() == static_cast<size_t>(m_Comm.Size()));
WriteMetaMetadata(UniqueMetaMetaBlocks);
m_LatestMetaDataPos = m_MetaDataPos;
m_Profiler.Start("ES_write_metadata");
m_LatestMetaDataSize = WriteMetadata(Metadata, AttributeBlocks);
m_Profiler.Stop("ES_write_metadata");
if (!m_Parameters.AsyncWrite)
{
WriteMetadataFileIndex(m_LatestMetaDataPos, m_LatestMetaDataSize);
}
}
} // level 2
m_Profiler.Stop("ES_meta2");
}

void BP5Writer::EndStep()
{
#ifdef ADIOS2_HAVE_DERIVED_VARIABLE
ComputeDerivedVariables();
#endif
m_BetweenStepPairs = false;
PERFSTUBS_SCOPED_TIMER("BP5Writer::EndStep");
m_Profiler.Start("ES");

m_Profiler.Start("ES_close");
MarshalAttributes();

// true: advances step
auto TSInfo = m_BP5Serializer.CloseTimestep((int)m_WriterStep,
m_Parameters.AsyncWrite || m_Parameters.DirectIO);

/* TSInfo includes NewMetaMetaBlocks, the MetaEncodeBuffer, the
* AttributeEncodeBuffer and the data encode Vector */

m_ThisTimestepDataSize += TSInfo.DataBuffer->Size();
m_Profiler.Stop("ES_close");

m_Profiler.Start("ES_AWD");
// TSInfo destructor would delete the DataBuffer so we need to save it
// for async IO and let the writer free it up when not needed anymore
m_AsyncWriteLock.lock();
m_flagRush = false;
m_AsyncWriteLock.unlock();

// WriteData will free TSInfo.DataBuffer
WriteData(TSInfo.DataBuffer);
TSInfo.DataBuffer = NULL;

m_Profiler.Stop("ES_AWD");

if (m_Parameters.UseSelectiveMetadataAggregation)
{
SelectiveAggregationMetadata(TSInfo);
}
else
{
TwoLevelAggregationMetadata(TSInfo);
}

if (m_Parameters.AsyncWrite)
{
Expand Down
7 changes: 7 additions & 0 deletions source/adios2/engine/bp5/BP5Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ class BP5Writer : public BP5Engine, public core::Engine

uint64_t WriteMetadata(const std::vector<core::iovec> &MetaDataBlocks,
const std::vector<core::iovec> &AttributeBlocks);
uint64_t WriteMetadata(const std::vector<char> &ContigMetaData,
const std::vector<size_t> &SizeVector,
const std::vector<core::iovec> &AttributeBlocks);

void SelectiveAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo);
void TwoLevelAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo);
void SimpleAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo);

/** Write Data to disk, in an aggregator chain */
void WriteData(format::BufferV *Data);
Expand Down
Loading