Skip to content

Commit

Permalink
Add New SelectiveMetadataAggregation method
Browse files Browse the repository at this point in the history
  • Loading branch information
eisenhauer committed Nov 4, 2024
1 parent c3cc3a2 commit ac6f84b
Show file tree
Hide file tree
Showing 9 changed files with 894 additions and 38 deletions.
1 change: 1 addition & 0 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ add_library(adios2_core
toolkit/format/bp5/BP5Deserializer.cpp
toolkit/format/bp5/BP5Deserializer.tcc
toolkit/format/bp5/BP5Serializer.cpp
toolkit/format/bp5/BP5Helper.cpp

toolkit/profiling/iochrono/Timer.cpp
toolkit/profiling/iochrono/IOChrono.cpp
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ class BP5Engine
MACRO(StatsBlockSize, SizeBytes, size_t, DefaultStatsBlockSize) \
MACRO(Threads, UInt, unsigned int, 0) \
MACRO(UseOneTimeAttributes, Bool, bool, true) \
MACRO(UseSelectiveMetadataAggregation, Bool, bool, true) \
MACRO(OneLevelGatherSizeLimit, Int, int, 6000) \
MACRO(FlattenSteps, Bool, bool, false) \
MACRO(IgnoreFlattenSteps, Bool, bool, false) \
MACRO(RemoteDataPath, String, std::string, "") \
Expand Down
220 changes: 184 additions & 36 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "BP5Writer.h"
#include "BP5Writer.tcc"
#include "adios2/toolkit/format/bp5/BP5Helper.h"

#include "adios2/common/ADIOSMacros.h"
#include "adios2/core/IO.h"
Expand Down Expand Up @@ -183,13 +184,66 @@ uint64_t BP5Writer::WriteMetadata(const std::vector<core::iovec> &MetaDataBlocks
m_FileMetadataManager.WriteFiles((char *)AttrSizeVector.data(),
sizeof(uint64_t) * AttrSizeVector.size());
MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size();
m_Profiler.Start("MetadataBlockWrite");
for (auto &b : MetaDataBlocks)
{
if (!b.iov_base)
continue;
m_FileMetadataManager.WriteFiles((char *)b.iov_base, b.iov_len);
MetaDataSize += b.iov_len;
}
m_Profiler.Stop("MetadataBlockWrite");

for (auto &b : AttributeBlocks)
{
if (!b.iov_base)
continue;
m_FileMetadataManager.WriteFiles((char *)b.iov_base, b.iov_len);
MetaDataSize += b.iov_len;
}

m_FileMetadataManager.FlushFiles();

m_MetaDataPos += MetaDataSize;
return MetaDataSize;
}

uint64_t BP5Writer::WriteMetadata(const std::vector<char> &ContigMetaData,
const std::vector<size_t> &SizeVector,
const std::vector<core::iovec> &AttributeBlocks)
{
size_t MDataTotalSize = std::accumulate(SizeVector.begin(), SizeVector.end(), size_t(0));
uint64_t MetaDataSize = 0;
std::vector<uint64_t> AttrSizeVector;

MDataTotalSize += SizeVector.size() * sizeof(size_t);
for (size_t a = 0; a < SizeVector.size(); a++)
{
if (a < AttributeBlocks.size())
{
auto &b = AttributeBlocks[a];
MDataTotalSize += sizeof(uint64_t) + b.iov_len;
AttrSizeVector.push_back(b.iov_len);
}
else
{
AttrSizeVector.push_back(0);
MDataTotalSize += sizeof(uint64_t);
}
}
MetaDataSize = 0;
m_FileMetadataManager.WriteFiles((char *)&MDataTotalSize, sizeof(uint64_t));
MetaDataSize += sizeof(uint64_t);
m_FileMetadataManager.WriteFiles((char *)SizeVector.data(),
sizeof(uint64_t) * SizeVector.size());
MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size();
m_FileMetadataManager.WriteFiles((char *)AttrSizeVector.data(),
sizeof(uint64_t) * AttrSizeVector.size());
MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size();
m_Profiler.Start("MetadataBlockWrite");
m_FileMetadataManager.WriteFiles(ContigMetaData.data(), ContigMetaData.size());
m_Profiler.Stop("MetadataBlockWrite");
MetaDataSize += ContigMetaData.size();

for (auto &b : AttributeBlocks)
{
Expand Down Expand Up @@ -534,8 +588,6 @@ void BP5Writer::ComputeDerivedVariables()
if (mvi->BlocksInfo.size() == 0)
{
computeDerived = false;
std::cout << "Variable " << itVariable->first << " not written in this step";
std::cout << " .. skip derived variable " << (*it).second->m_Name << std::endl;
break;
}
nameToVarInfo.insert({varName, std::unique_ptr<MinVarInfo>(mvi)});
Expand Down Expand Up @@ -568,41 +620,93 @@ void BP5Writer::ComputeDerivedVariables()
}
#endif

void BP5Writer::EndStep()
void BP5Writer::SelectiveAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo)
{
#ifdef ADIOS2_HAVE_DERIVED_VARIABLE
ComputeDerivedVariables();
#endif
m_BetweenStepPairs = false;
PERFSTUBS_SCOPED_TIMER("BP5Writer::EndStep");
m_Profiler.Start("ES");

m_Profiler.Start("ES_close");
MarshalAttributes();

// true: advances step
auto TSInfo = m_BP5Serializer.CloseTimestep((int)m_WriterStep,
m_Parameters.AsyncWrite || m_Parameters.DirectIO);

/* TSInfo includes NewMetaMetaBlocks, the MetaEncodeBuffer, the
* AttributeEncodeBuffer and the data encode Vector */

m_ThisTimestepDataSize += TSInfo.DataBuffer->Size();
m_Profiler.Stop("ES_close");

m_Profiler.Start("ES_AWD");
// TSInfo destructor would delete the DataBuffer so we need to save it
// for async IO and let the writer free it up when not needed anymore
m_AsyncWriteLock.lock();
m_flagRush = false;
m_AsyncWriteLock.unlock();
std::vector<format::BP5Base::MetaMetaInfoBlock> UniqueMetaMetaBlocks;
std::vector<uint64_t> DataSizes;
std::vector<core::iovec> AttributeBlocks;
std::vector<size_t> MetaEncodeSize;
m_WriterDataPos.resize(0);
m_WriterDataPos.push_back(m_StartDataPos);
UniqueMetaMetaBlocks = TSInfo.NewMetaMetaBlocks;
if (TSInfo.AttributeEncodeBuffer)
AttributeBlocks.push_back(
{TSInfo.AttributeEncodeBuffer->Data(), TSInfo.AttributeEncodeBuffer->m_FixedSize});
size_t AlignedMetadataSize = (TSInfo.MetaEncodeBuffer->m_FixedSize + 7) & ~0x7;
MetaEncodeSize.push_back(AlignedMetadataSize);

// WriteData will free TSInfo.DataBuffer
WriteData(TSInfo.DataBuffer);
TSInfo.DataBuffer = NULL;
m_Profiler.Start("ES_aggregate_info");
BP5Helper::BP5AggregateInformation(m_Comm, m_Profiler, UniqueMetaMetaBlocks, AttributeBlocks,
MetaEncodeSize, m_WriterDataPos);

m_Profiler.Stop("ES_AWD");
m_Profiler.Stop("ES_aggregate_info");
m_Profiler.Start("ES_gather_write_meta");
if (m_Comm.Rank() == 0)
{
m_Profiler.Start("ES_AGG1");
size_t MetadataTotalSize =
std::accumulate(MetaEncodeSize.begin(), MetaEncodeSize.end(), size_t(0));
assert(m_WriterDataPos.size() == static_cast<size_t>(m_Comm.Size()));
WriteMetaMetadata(UniqueMetaMetaBlocks);
for (auto &mm : UniqueMetaMetaBlocks)
{
free((void *)mm.MetaMetaInfo);
free((void *)mm.MetaMetaID);
}
m_LatestMetaDataPos = m_MetaDataPos;
std::vector<char> ContigMetadata;
ContigMetadata.resize(MetadataTotalSize);
auto AlignedCounts = MetaEncodeSize;
for (auto &C : AlignedCounts)
C /= 8;
m_Profiler.Stop("ES_AGG1");
m_Profiler.Start("ES_GatherMetadataBlocks");
if (m_Comm.Size() > m_Parameters.OneLevelGatherSizeLimit)
{
BP5Helper::GathervArraysTwoLevel(
m_AggregatorMetadata.m_Comm, m_CommMetadataAggregators, m_Profiler,
(uint64_t *)TSInfo.MetaEncodeBuffer->Data(), AlignedMetadataSize / 8,
AlignedCounts.data(), AlignedCounts.size(), (uint64_t *)ContigMetadata.data(), 0);
}
else
{
m_Comm.GathervArrays((uint64_t *)TSInfo.MetaEncodeBuffer->Data(),
AlignedMetadataSize / 8, AlignedCounts.data(),
AlignedCounts.size(), (uint64_t *)ContigMetadata.data(), 0);
}
m_Profiler.Stop("ES_GatherMetadataBlocks");
m_Profiler.Start("ES_write_metadata");
m_LatestMetaDataSize = WriteMetadata(ContigMetadata, MetaEncodeSize, AttributeBlocks);

m_Profiler.Stop("ES_write_metadata");
for (auto &a : AttributeBlocks)
free((void *)a.iov_base);
if (!m_Parameters.AsyncWrite)
{
WriteMetadataFileIndex(m_LatestMetaDataPos, m_LatestMetaDataSize);
}
}
else
{
if (m_Comm.Size() > m_Parameters.OneLevelGatherSizeLimit)
{
BP5Helper::GathervArraysTwoLevel(
m_AggregatorMetadata.m_Comm, m_CommMetadataAggregators, m_Profiler,
(uint64_t *)TSInfo.MetaEncodeBuffer->Data(), AlignedMetadataSize / 8, NULL,
0, // AlignedCounts.data(), AlignedCounts.size(),
nullptr, 0);
}
else
{
m_Comm.GathervArrays(TSInfo.MetaEncodeBuffer->Data(), AlignedMetadataSize,
MetaEncodeSize.data(), MetaEncodeSize.size(), (char *)nullptr, 0);
}
}
m_Profiler.Stop("ES_gather_write_meta");
}

void BP5Writer::TwoLevelAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo)
{
/*
* Two-step metadata aggregation
*/
Expand All @@ -629,9 +733,6 @@ void BP5Writer::EndStep()
for (auto &n : RecvCounts)
TotalSize += n;
RecvBuffer.resize(TotalSize);
/*std::cout << "MD Lvl-1: rank " << m_Comm.Rank() << " gather "
<< TotalSize << " bytes from aggregator group"
<< std::endl;*/
}
m_AggregatorMetadata.m_Comm.GathervArrays(MetaBuffer.data(), LocalSize, RecvCounts.data(),
RecvCounts.size(), RecvBuffer.data(), 0);
Expand Down Expand Up @@ -698,14 +799,61 @@ void BP5Writer::EndStep()
assert(m_WriterDataPos.size() == static_cast<size_t>(m_Comm.Size()));
WriteMetaMetadata(UniqueMetaMetaBlocks);
m_LatestMetaDataPos = m_MetaDataPos;
m_Profiler.Start("ES_write_metadata");
m_LatestMetaDataSize = WriteMetadata(Metadata, AttributeBlocks);
m_Profiler.Stop("ES_write_metadata");
if (!m_Parameters.AsyncWrite)
{
WriteMetadataFileIndex(m_LatestMetaDataPos, m_LatestMetaDataSize);
}
}
} // level 2
m_Profiler.Stop("ES_meta2");
}

void BP5Writer::EndStep()
{
#ifdef ADIOS2_HAVE_DERIVED_VARIABLE
ComputeDerivedVariables();
#endif
m_BetweenStepPairs = false;
PERFSTUBS_SCOPED_TIMER("BP5Writer::EndStep");
m_Profiler.Start("ES");

m_Profiler.Start("ES_close");
MarshalAttributes();

// true: advances step
auto TSInfo = m_BP5Serializer.CloseTimestep((int)m_WriterStep,
m_Parameters.AsyncWrite || m_Parameters.DirectIO);

/* TSInfo includes NewMetaMetaBlocks, the MetaEncodeBuffer, the
* AttributeEncodeBuffer and the data encode Vector */

m_ThisTimestepDataSize += TSInfo.DataBuffer->Size();
m_Profiler.Stop("ES_close");

m_Profiler.Start("ES_AWD");
// TSInfo destructor would delete the DataBuffer so we need to save it
// for async IO and let the writer free it up when not needed anymore
m_AsyncWriteLock.lock();
m_flagRush = false;
m_AsyncWriteLock.unlock();

// WriteData will free TSInfo.DataBuffer
WriteData(TSInfo.DataBuffer);
TSInfo.DataBuffer = NULL;

m_Profiler.Stop("ES_AWD");

if (m_Parameters.UseSelectiveMetadataAggregation)
{
SelectiveAggregationMetadata(TSInfo);
}
else
{
TwoLevelAggregationMetadata(TSInfo);
}

if (m_Parameters.AsyncWrite)
{
Expand Down
7 changes: 7 additions & 0 deletions source/adios2/engine/bp5/BP5Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ class BP5Writer : public BP5Engine, public core::Engine

uint64_t WriteMetadata(const std::vector<core::iovec> &MetaDataBlocks,
const std::vector<core::iovec> &AttributeBlocks);
uint64_t WriteMetadata(const std::vector<char> &ContigMetaData,
const std::vector<size_t> &SizeVector,
const std::vector<core::iovec> &AttributeBlocks);

void SelectiveAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo);
void TwoLevelAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo);
void SimpleAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo);

/** Write Data to disk, in an aggregator chain */
void WriteData(format::BufferV *Data);
Expand Down
Loading

0 comments on commit ac6f84b

Please sign in to comment.