diff --git a/docs/user_guide/source/engines/bp5.rst b/docs/user_guide/source/engines/bp5.rst index 76696d427a..b404044d95 100644 --- a/docs/user_guide/source/engines/bp5.rst +++ b/docs/user_guide/source/engines/bp5.rst @@ -62,7 +62,8 @@ This engine allows the user to fine tune the buffering operations through the fo #. Aggregation - #. **AggregationType**: *TwoLevelShm*, *EveryoneWritesSerial* and *EveryoneWrites* are three aggregation strategies. See :ref:`Aggregation in BP5`. The default is *TwoLevelShm*. + #. **AggregationType**: *TwoLevelShm*, *EveryoneWritesSerial* and + *EveryoneWrites* are three data aggregation strategies. See :ref:`Aggregation in BP5`. The default is *TwoLevelShm*. #. **NumAggregators**: The number of processes that will ever write data directly to storage. The default is set to the number of compute nodes the application is running on (i.e. one process per compute node). TwoLevelShm will select a fixed number of processes *per compute-node* to get close to the intention of the user but does not guarantee the exact number of aggregators. @@ -74,7 +75,23 @@ This engine allows the user to fine tune the buffering operations through the fo #. **MaxShmSize**: Upper limit for how much shared memory an aggregator process in *TwoLevelShm* can allocate. For optimum performance, this should be at least *2xM +1KB* where *M* is the maximum size any process writes in a single step. However, there is no point in allowing for more than 4GB. The default is 4GB. - + #. **UseSelectiveMetadataAggregation**: There are two metadata + aggregation strategies in BP5. If this parameter is true (the default), + SelectiveMetadataAggregation is employed, which uses a multi-phase approach + to limit the amount of data exchanged. If false, a less + complex two-level metadata aggregation is performed. In most + cases the default is more efficient. + + #. **OneLevelGatherRankLimit**: For the + SelectiveMetadataAggregation method, this parameter specifies an + MPI cohort size above which it resorts to a two-stage + aggregation process rather than gathering all metadata to rank 0 + in one MPI collective operation. Some HPC machines have + unpredictable behaviour with gatherv at both large numbers of + ranks and large amounts of data. The default value (6000) + avoids this behaviour on ORNL's Frontier. Higher or lower values may + be useful on other machines. + #. Buffering #. **BufferVType**: *chunk* or *malloc*, default is chunking. Chunking maintains the buffer as a list of memory blocks, either ADIOS-owned for sync-ed Puts and small Puts, and user-owned pointers of deferred Puts. Malloc maintains a single memory block and extends it (reallocates) whenever more data is buffered. Chunking incurs extra cost in I/O by having to write data in chunks (multiple write system calls), which can be helped by increasing *BufferChunkSize* and *MinDeferredSize*. Malloc incurs extra cost by reallocating memory whenever more data is buffered (by Put()), which can be helped by increasing *InitialBufferSize*. @@ -138,35 +155,37 @@ This engine allows the user to fine tune the buffering operations through the fo tells the reader to ignore any FlattenSteps parameter supplied to the writer. -============================== ===================== =========================================================== - **Key** **Value Format** **Default** and Examples -============================== ===================== =========================================================== - OpenTimeoutSecs float **0** for *ReadRandomAccess* mode, **3600** for *Read* mode, ``10.0``, ``5`` - BeginStepPollingFrequencySecs float **1**, 10.0 - AggregationType string **TwoLevelShm**, EveryoneWritesSerial, EveryoneWrites - NumAggregators integer >= 1 **0 (one file per compute node)** - AggregatorRatio integer >= 1 not used unless set - NumSubFiles integer >= 1 **=NumAggregators**, only used when *AggregationType=TwoLevelShm* - StripeSize integer+units **4KB** - MaxShmSize integer+units **4294762496** - BufferVType string **chunk**, malloc - BufferChunkSize integer+units **128MB**, worth increasing up to min(2GB, datasize/process/step) - MinDeferredSize integer+units **4MB** - InitialBufferSize float+units >= 16Kb **16Kb**, 10Mb, 0.5Gb - GrowthFactor float > 1 **1.05**, 1.01, 1.5, 2 - AppendAfterSteps integer >= 0 **INT_MAX** - SelectSteps string "0 6 3 2", "1:5", "0:n:3 10:n:5" - AsyncOpen string On/Off **On**, Off, true, false - AsyncWrite string On/Off **Off**, On, true, false - DirectIO string On/Off **Off**, On, true, false - DirectIOAlignOffset integer >= 0 **512** - DirectIOAlignBuffer integer >= 0 set to DirectIOAlignOffset if unset - StatsLevel integer, 0 or 1 **1**, 0 - MaxOpenFilesAtOnce integer >= 0 **UINT_MAX**, 1024, 1 - Threads integer >= 0 **0**, 1, 32 - FlattenSteps boolean **off**, on, true, false - IgnoreFlattenSteps boolean **off**, on, true, false -============================== ===================== =========================================================== +=============================== ===================== =========================================================== + **Key** **Value Format** **Default** and Examples +=============================== ===================== =========================================================== + OpenTimeoutSecs float **0** for *ReadRandomAccess* mode, **3600** for *Read* mode, ``10.0``, ``5`` + BeginStepPollingFrequencySecs float **1**, 10.0 + AggregationType string **TwoLevelShm**, EveryoneWritesSerial, EveryoneWrites + NumAggregators integer >= 1 **0 (one file per compute node)** + AggregatorRatio integer >= 1 not used unless set + NumSubFiles integer >= 1 **=NumAggregators**, only used when *AggregationType=TwoLevelShm* + StripeSize integer+units **4KB** + MaxShmSize integer+units **4294762496** + BufferVType string **chunk**, malloc + BufferChunkSize integer+units **128MB**, worth increasing up to min(2GB, datasize/process/step) + MinDeferredSize integer+units **4MB** + InitialBufferSize float+units >= 16Kb **16Kb**, 10Mb, 0.5Gb + GrowthFactor float > 1 **1.05**, 1.01, 1.5, 2 + AppendAfterSteps integer >= 0 **INT_MAX** + SelectSteps string "0 6 3 2", "1:5", "0:n:3 10:n:5" + AsyncOpen string On/Off **On**, Off, true, false + AsyncWrite string On/Off **Off**, On, true, false + DirectIO string On/Off **Off**, On, true, false + DirectIOAlignOffset integer >= 0 **512** + DirectIOAlignBuffer integer >= 0 set to DirectIOAlignOffset if unset + UseSelectiveMetadataAggregation boolean **On**, Off, true, false + OneLevelGatherRanksLimit integer **6000** + StatsLevel integer, 0 or 1 **1**, 0 + MaxOpenFilesAtOnce integer >= 0 **UINT_MAX**, 1024, 1 + Threads integer >= 0 **0**, 1, 32 + FlattenSteps boolean **off**, on, true, false + IgnoreFlattenSteps boolean **off**, on, true, false +=============================== ===================== =========================================================== Only file transport types are supported. Optional parameters for ``IO::AddTransport`` or in runtime config file transport field: diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index cbc59f5cae..d8a9d01c33 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -99,6 +99,7 @@ add_library(adios2_core toolkit/format/bp5/BP5Deserializer.cpp toolkit/format/bp5/BP5Deserializer.tcc toolkit/format/bp5/BP5Serializer.cpp + toolkit/format/bp5/BP5Helper.cpp toolkit/profiling/iochrono/Timer.cpp toolkit/profiling/iochrono/IOChrono.cpp diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index f5c1ab88d9..71bf3124f5 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -23,13 +23,6 @@ namespace core namespace engine { -/** - * sub-block size for min/max calculation of large arrays in number of - * elements (not bytes). The default big number per Put() default will - * result in the original single min/max value-pair per block - */ -constexpr size_t DefaultStatsBlockSize = 1125899906842624ULL; - class BP5Engine { public: @@ -148,7 +141,6 @@ class BP5Engine MACRO(BurstBufferPath, String, std::string, "") \ MACRO(NodeLocal, Bool, bool, false) \ MACRO(verbose, Int, int, 0) \ - MACRO(CollectiveMetadata, Bool, bool, true) \ MACRO(NumAggregators, UInt, unsigned int, 0) \ MACRO(AggregatorRatio, UInt, unsigned int, 0) \ MACRO(NumSubFiles, UInt, unsigned int, 0) \ @@ -169,9 +161,10 @@ class BP5Engine MACRO(SelectSteps, String, std::string, "") \ MACRO(ReaderShortCircuitReads, Bool, bool, false) \ MACRO(StatsLevel, UInt, unsigned int, 1) \ - MACRO(StatsBlockSize, SizeBytes, size_t, DefaultStatsBlockSize) \ MACRO(Threads, UInt, unsigned int, 0) \ MACRO(UseOneTimeAttributes, Bool, bool, true) \ + MACRO(UseSelectiveMetadataAggregation, Bool, bool, true) \ + MACRO(OneLevelGatherRanksLimit, Int, int, 6000) \ MACRO(FlattenSteps, Bool, bool, false) \ MACRO(IgnoreFlattenSteps, Bool, bool, false) \ MACRO(RemoteDataPath, String, std::string, "") \ diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index ef26b2b4c0..590c891a63 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -8,6 +8,7 @@ #include "BP5Writer.h" #include "BP5Writer.tcc" +#include "adios2/toolkit/format/bp5/BP5Helper.h" #include "adios2/common/ADIOSMacros.h" #include "adios2/core/IO.h" @@ -183,6 +184,7 @@ uint64_t BP5Writer::WriteMetadata(const std::vector &MetaDataBlocks m_FileMetadataManager.WriteFiles((char *)AttrSizeVector.data(), sizeof(uint64_t) * AttrSizeVector.size()); MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size(); + m_Profiler.Start("MetadataBlockWrite"); for (auto &b : MetaDataBlocks) { if (!b.iov_base) @@ -190,6 +192,58 @@ uint64_t BP5Writer::WriteMetadata(const std::vector &MetaDataBlocks m_FileMetadataManager.WriteFiles((char *)b.iov_base, b.iov_len); MetaDataSize += b.iov_len; } + m_Profiler.Stop("MetadataBlockWrite"); + + for (auto &b : AttributeBlocks) + { + if (!b.iov_base) + continue; + m_FileMetadataManager.WriteFiles((char *)b.iov_base, b.iov_len); + MetaDataSize += b.iov_len; + } + + m_FileMetadataManager.FlushFiles(); + + m_MetaDataPos += MetaDataSize; + return MetaDataSize; +} + +uint64_t BP5Writer::WriteMetadata(const std::vector &ContigMetaData, + const std::vector &SizeVector, + const std::vector &AttributeBlocks) +{ + size_t MDataTotalSize = std::accumulate(SizeVector.begin(), SizeVector.end(), size_t(0)); + uint64_t MetaDataSize = 0; + std::vector AttrSizeVector; + + MDataTotalSize += SizeVector.size() * sizeof(size_t); + for (size_t a = 0; a < SizeVector.size(); a++) + { + if (a < AttributeBlocks.size()) + { + auto &b = AttributeBlocks[a]; + MDataTotalSize += sizeof(uint64_t) + b.iov_len; + AttrSizeVector.push_back(b.iov_len); + } + else + { + AttrSizeVector.push_back(0); + MDataTotalSize += sizeof(uint64_t); + } + } + MetaDataSize = 0; + m_FileMetadataManager.WriteFiles((char *)&MDataTotalSize, sizeof(uint64_t)); + MetaDataSize += sizeof(uint64_t); + m_FileMetadataManager.WriteFiles((char *)SizeVector.data(), + sizeof(uint64_t) * SizeVector.size()); + MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size(); + m_FileMetadataManager.WriteFiles((char *)AttrSizeVector.data(), + sizeof(uint64_t) * AttrSizeVector.size()); + MetaDataSize += sizeof(uint64_t) * AttrSizeVector.size(); + m_Profiler.Start("MetadataBlockWrite"); + m_FileMetadataManager.WriteFiles(ContigMetaData.data(), ContigMetaData.size()); + m_Profiler.Stop("MetadataBlockWrite"); + MetaDataSize += ContigMetaData.size(); for (auto &b : AttributeBlocks) { @@ -534,8 +588,6 @@ void BP5Writer::ComputeDerivedVariables() if (mvi->BlocksInfo.size() == 0) { computeDerived = false; - std::cout << "Variable " << itVariable->first << " not written in this step"; - std::cout << " .. skip derived variable " << (*it).second->m_Name << std::endl; break; } nameToVarInfo.insert({varName, std::unique_ptr(mvi)}); @@ -568,41 +620,93 @@ void BP5Writer::ComputeDerivedVariables() } #endif -void BP5Writer::EndStep() +void BP5Writer::SelectiveAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo) { -#ifdef ADIOS2_HAVE_DERIVED_VARIABLE - ComputeDerivedVariables(); -#endif - m_BetweenStepPairs = false; - PERFSTUBS_SCOPED_TIMER("BP5Writer::EndStep"); - m_Profiler.Start("ES"); - - m_Profiler.Start("ES_close"); - MarshalAttributes(); - - // true: advances step - auto TSInfo = m_BP5Serializer.CloseTimestep((int)m_WriterStep, - m_Parameters.AsyncWrite || m_Parameters.DirectIO); - - /* TSInfo includes NewMetaMetaBlocks, the MetaEncodeBuffer, the - * AttributeEncodeBuffer and the data encode Vector */ - - m_ThisTimestepDataSize += TSInfo.DataBuffer->Size(); - m_Profiler.Stop("ES_close"); - - m_Profiler.Start("ES_AWD"); - // TSInfo destructor would delete the DataBuffer so we need to save it - // for async IO and let the writer free it up when not needed anymore - m_AsyncWriteLock.lock(); - m_flagRush = false; - m_AsyncWriteLock.unlock(); + std::vector UniqueMetaMetaBlocks; + std::vector DataSizes; + std::vector AttributeBlocks; + std::vector MetaEncodeSize; + m_WriterDataPos.resize(0); + m_WriterDataPos.push_back(m_StartDataPos); + UniqueMetaMetaBlocks = TSInfo.NewMetaMetaBlocks; + if (TSInfo.AttributeEncodeBuffer) + AttributeBlocks.push_back( + {TSInfo.AttributeEncodeBuffer->Data(), TSInfo.AttributeEncodeBuffer->m_FixedSize}); + size_t AlignedMetadataSize = (TSInfo.MetaEncodeBuffer->m_FixedSize + 7) & ~0x7; + MetaEncodeSize.push_back(AlignedMetadataSize); - // WriteData will free TSInfo.DataBuffer - WriteData(TSInfo.DataBuffer); - TSInfo.DataBuffer = NULL; + m_Profiler.Start("ES_aggregate_info"); + BP5Helper::BP5AggregateInformation(m_Comm, m_Profiler, UniqueMetaMetaBlocks, AttributeBlocks, + MetaEncodeSize, m_WriterDataPos); - m_Profiler.Stop("ES_AWD"); + m_Profiler.Stop("ES_aggregate_info"); + m_Profiler.Start("ES_gather_write_meta"); + if (m_Comm.Rank() == 0) + { + m_Profiler.Start("ES_AGG1"); + size_t MetadataTotalSize = + std::accumulate(MetaEncodeSize.begin(), MetaEncodeSize.end(), size_t(0)); + assert(m_WriterDataPos.size() == static_cast(m_Comm.Size())); + WriteMetaMetadata(UniqueMetaMetaBlocks); + for (auto &mm : UniqueMetaMetaBlocks) + { + free((void *)mm.MetaMetaInfo); + free((void *)mm.MetaMetaID); + } + m_LatestMetaDataPos = m_MetaDataPos; + std::vector ContigMetadata; + ContigMetadata.resize(MetadataTotalSize); + auto AlignedCounts = MetaEncodeSize; + for (auto &C : AlignedCounts) + C /= 8; + m_Profiler.Stop("ES_AGG1"); + m_Profiler.Start("ES_GatherMetadataBlocks"); + if (m_Comm.Size() > m_Parameters.OneLevelGatherRanksLimit) + { + BP5Helper::GathervArraysTwoLevel( + m_AggregatorMetadata.m_Comm, m_CommMetadataAggregators, m_Profiler, + (uint64_t *)TSInfo.MetaEncodeBuffer->Data(), AlignedMetadataSize / 8, + AlignedCounts.data(), AlignedCounts.size(), (uint64_t *)ContigMetadata.data(), 0); + } + else + { + m_Comm.GathervArrays((uint64_t *)TSInfo.MetaEncodeBuffer->Data(), + AlignedMetadataSize / 8, AlignedCounts.data(), + AlignedCounts.size(), (uint64_t *)ContigMetadata.data(), 0); + } + m_Profiler.Stop("ES_GatherMetadataBlocks"); + m_Profiler.Start("ES_write_metadata"); + m_LatestMetaDataSize = WriteMetadata(ContigMetadata, MetaEncodeSize, AttributeBlocks); + + m_Profiler.Stop("ES_write_metadata"); + for (auto &a : AttributeBlocks) + free((void *)a.iov_base); + if (!m_Parameters.AsyncWrite) + { + WriteMetadataFileIndex(m_LatestMetaDataPos, m_LatestMetaDataSize); + } + } + else + { + if (m_Comm.Size() > m_Parameters.OneLevelGatherRanksLimit) + { + BP5Helper::GathervArraysTwoLevel( + m_AggregatorMetadata.m_Comm, m_CommMetadataAggregators, m_Profiler, + (uint64_t *)TSInfo.MetaEncodeBuffer->Data(), AlignedMetadataSize / 8, NULL, + 0, // AlignedCounts.data(), AlignedCounts.size(), + nullptr, 0); + } + else + { + m_Comm.GathervArrays(TSInfo.MetaEncodeBuffer->Data(), AlignedMetadataSize, + MetaEncodeSize.data(), MetaEncodeSize.size(), (char *)nullptr, 0); + } + } + m_Profiler.Stop("ES_gather_write_meta"); +} +void BP5Writer::TwoLevelAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo) +{ /* * Two-step metadata aggregation */ @@ -629,9 +733,6 @@ void BP5Writer::EndStep() for (auto &n : RecvCounts) TotalSize += n; RecvBuffer.resize(TotalSize); - /*std::cout << "MD Lvl-1: rank " << m_Comm.Rank() << " gather " - << TotalSize << " bytes from aggregator group" - << std::endl;*/ } m_AggregatorMetadata.m_Comm.GathervArrays(MetaBuffer.data(), LocalSize, RecvCounts.data(), RecvCounts.size(), RecvBuffer.data(), 0); @@ -698,7 +799,9 @@ void BP5Writer::EndStep() assert(m_WriterDataPos.size() == static_cast(m_Comm.Size())); WriteMetaMetadata(UniqueMetaMetaBlocks); m_LatestMetaDataPos = m_MetaDataPos; + m_Profiler.Start("ES_write_metadata"); m_LatestMetaDataSize = WriteMetadata(Metadata, AttributeBlocks); + m_Profiler.Stop("ES_write_metadata"); if (!m_Parameters.AsyncWrite) { WriteMetadataFileIndex(m_LatestMetaDataPos, m_LatestMetaDataSize); @@ -706,6 +809,51 @@ void BP5Writer::EndStep() } } // level 2 m_Profiler.Stop("ES_meta2"); +} + +void BP5Writer::EndStep() +{ +#ifdef ADIOS2_HAVE_DERIVED_VARIABLE + ComputeDerivedVariables(); +#endif + m_BetweenStepPairs = false; + PERFSTUBS_SCOPED_TIMER("BP5Writer::EndStep"); + m_Profiler.Start("ES"); + + m_Profiler.Start("ES_close"); + MarshalAttributes(); + + // true: advances step + auto TSInfo = m_BP5Serializer.CloseTimestep((int)m_WriterStep, + m_Parameters.AsyncWrite || m_Parameters.DirectIO); + + /* TSInfo includes NewMetaMetaBlocks, the MetaEncodeBuffer, the + * AttributeEncodeBuffer and the data encode Vector */ + + m_ThisTimestepDataSize += TSInfo.DataBuffer->Size(); + m_Profiler.Stop("ES_close"); + + m_Profiler.Start("ES_AWD"); + // TSInfo destructor would delete the DataBuffer so we need to save it + // for async IO and let the writer free it up when not needed anymore + m_AsyncWriteLock.lock(); + m_flagRush = false; + m_AsyncWriteLock.unlock(); + + // WriteData will free TSInfo.DataBuffer + WriteData(TSInfo.DataBuffer); + TSInfo.DataBuffer = NULL; + + m_Profiler.Stop("ES_AWD"); + + if (m_Parameters.UseSelectiveMetadataAggregation) + { + SelectiveAggregationMetadata(TSInfo); + } + else + { + TwoLevelAggregationMetadata(TSInfo); + } if (m_Parameters.AsyncWrite) { diff --git a/source/adios2/engine/bp5/BP5Writer.h b/source/adios2/engine/bp5/BP5Writer.h index 41aa5e3e8b..24358963ab 100644 --- a/source/adios2/engine/bp5/BP5Writer.h +++ b/source/adios2/engine/bp5/BP5Writer.h @@ -170,6 +170,13 @@ class BP5Writer : public BP5Engine, public core::Engine uint64_t WriteMetadata(const std::vector &MetaDataBlocks, const std::vector &AttributeBlocks); + uint64_t WriteMetadata(const std::vector &ContigMetaData, + const std::vector &SizeVector, + const std::vector &AttributeBlocks); + + void SelectiveAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo); + void TwoLevelAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo); + void SimpleAggregationMetadata(format::BP5Serializer::TimestepInfo TSInfo); /** Write Data to disk, in an aggregator chain */ void WriteData(format::BufferV *Data); diff --git a/source/adios2/toolkit/format/bp5/BP5Helper.cpp b/source/adios2/toolkit/format/bp5/BP5Helper.cpp new file mode 100644 index 0000000000..f7f6bb6588 --- /dev/null +++ b/source/adios2/toolkit/format/bp5/BP5Helper.cpp @@ -0,0 +1,546 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * BP5Helper.cpp + */ + +#include "BP5Helper.h" +#include "adios2/helper/adiosFunctions.h" +#include // Include the MD5 header +#include // put_time + +#include "fm.h" + +#ifdef _WIN32 +#pragma warning(disable : 4250) +#endif + +namespace adios2 +{ +namespace format +{ + +BP5Helper::digest BP5Helper::HashOfBlock(const void *block, const size_t block_len) +{ + adios2sysMD5 *md5 = adios2sysMD5_New(); + if (!md5) + { + throw std::runtime_error("Failed to create MD5 instance"); + } + + // Initialize the MD5 instance + adios2sysMD5_Initialize(md5); + + // Update the MD5 instance with the input data + adios2sysMD5_Append(md5, reinterpret_cast(block), (int)block_len); + + // Finalize the MD5 digest and get the hash value + BP5Helper::digest ret; + adios2sysMD5_Finalize(md5, (unsigned char *)&ret.x[0]); + + // Clean up the MD5 instance + adios2sysMD5_Delete(md5); + + return ret; +} + +std::vector +BP5Helper::BuildNodeContrib(const digest attrHash, const size_t attrSize, + const std::vector MMBlocks, + const size_t MetaEncodeSize, + const std::vector WriterDataPositions) +{ + std::vector ret; + size_t MMBlocksSize = MMBlocks.size(); + size_t len = sizeof(digest) + 2 * sizeof(size_t) + + MMBlocksSize * (sizeof(size_t) + sizeof(digest)) + sizeof(size_t) + + sizeof(uint64_t); + ret.resize(len); + size_t position = 0; + helper::CopyToBuffer(ret, position, &attrHash.x[0], sizeof(digest)); + helper::CopyToBuffer(ret, position, &attrSize, 1); + helper::CopyToBuffer(ret, position, &MMBlocksSize, 1); + for (auto &MM : MMBlocks) + { + digest D; + std::memcpy(&D.x[0], MM.MetaMetaID, MM.MetaMetaIDLen); + helper::CopyToBuffer(ret, position, &D.x[0], sizeof(digest)); + size_t AlignedSize = ((MM.MetaMetaInfoLen + 7) & ~0x7); + helper::CopyToBuffer(ret, position, &AlignedSize, 1); + } + helper::CopyToBuffer(ret, position, &MetaEncodeSize, 1); + helper::CopyToBuffer(ret, position, &WriterDataPositions[0], 1); + return ret; +} + +std::vector +BP5Helper::BuildFixedNodeContrib(const digest attrHash, const size_t attrSize, + const std::vector MMBlocks, + const size_t MetaEncodeSize, + const std::vector WriterDataPositions) +{ + /* + * WriterDataPositions comes in as a vector, but in current usage, + * that vector only has a single entry + * + * MMBlocks may have more than FIXED_MMB_SLOT_COUNT entries, and + * if so they will be handled in a special phase. + */ + std::vector ret; + size_t len = sizeof(node_contrib); + ret.resize(len); + auto NC = reinterpret_cast(ret.data()); + NC->AttrHash = attrHash; + NC->AttrSize = attrSize; + NC->MMBCount = MMBlocks.size(); + for (size_t i = 0; i < FIXED_MMB_SLOT_COUNT; i++) + { + if (i < MMBlocks.size()) + { + auto MM = &MMBlocks[i]; + std::memcpy(&NC->MMBArray[i].x[0], MM->MetaMetaID, MM->MetaMetaIDLen); + size_t AlignedSize = ((MM->MetaMetaInfoLen + 7) & ~0x7); + NC->MMBSizeArray[i] = AlignedSize; + } + else + { + NC->MMBSizeArray[i] = 0; + } + } + NC->MetaEncodeSize = MetaEncodeSize; + NC->WriterDataPosition = WriterDataPositions[0]; + return ret; +} + +void BP5Helper::BreakdownFixedIncomingMInfo( + const size_t NodeCount, const std::vector RecvBuffer, + std::vector &SecondRecvCounts, std::vector &BcastInfo, + std::vector &WriterDataPositions, std::vector &MetaEncodeSize, + std::vector &AttrSizes, std::vector &MMBSizes, std::vector &MMBIDs) +{ + std::set AttrSet; + std::set MMBSet; + MetaEncodeSize.resize(NodeCount); + WriterDataPositions.resize(NodeCount); + SecondRecvCounts.resize(NodeCount); + BcastInfo.resize(NodeCount); + AttrSizes.resize(NodeCount); + const node_contrib *NCArray = reinterpret_cast(RecvBuffer.data()); + for (size_t node = 0; node < NodeCount; node++) + { + const node_contrib *NC = &NCArray[node]; + digest thisAttrHash; + bool needAttr = false; + size_t MMBlockCount; + size_t SecondRecvSize = 0; + thisAttrHash = NC->AttrHash; + size_t AttrSize = NC->AttrSize; + AttrSizes[node] = AttrSize; + if (AttrSize && !AttrSet.count(thisAttrHash)) + { + AttrSet.insert(thisAttrHash); + needAttr = true; + size_t AlignedSize = ((AttrSize + 7) & ~0x7); + SecondRecvSize += AlignedSize; + } + + size_t MMsNeeded = 0; + MMBlockCount = NC->MMBCount; + if (MMBlockCount > FIXED_MMB_SLOT_COUNT) + { + BcastInfo[0] = (size_t)-1; + // we can't finish this, fallback + return; + } + for (size_t block = 0; block < MMBlockCount; block++) + { + digest thisMMB = NC->MMBArray[block]; + size_t thisMMBSize = NC->MMBSizeArray[block]; + if (thisMMBSize && (!MMBSet.count(thisMMB))) + { + MMBSet.insert(thisMMB); + MMsNeeded += (((size_t)1) << block); + size_t AlignedSize = ((thisMMBSize + 7) & ~0x7); + MMBSizes.push_back(AlignedSize); + MMBIDs.push_back(thisMMB); + SecondRecvSize += AlignedSize; + } + } + MetaEncodeSize[node] = NC->MetaEncodeSize; + size_t WDP = NC->WriterDataPosition; + WriterDataPositions[node] = WDP; + SecondRecvCounts[node] = SecondRecvSize; + BcastInfo[node] = needAttr ? ((uint64_t)1 << 63) : 0; + BcastInfo[node] |= MMsNeeded; + } +} + +void BP5Helper::BreakdownIncomingMInfo( + const std::vector RecvCounts, const std::vector RecvBuffer, + std::vector &SecondRecvCounts, std::vector &BcastInfo, + std::vector &WriterDataPositions, std::vector &MetaEncodeSize, + std::vector &AttrSizes, std::vector &MMBSizes, std::vector &MMBIDs) +{ + std::set AttrSet; + std::set MMBSet; + MetaEncodeSize.resize(RecvCounts.size()); + WriterDataPositions.resize(RecvCounts.size()); + SecondRecvCounts.resize(RecvCounts.size()); + BcastInfo.resize(RecvCounts.size()); + AttrSizes.resize(RecvCounts.size()); + size_t pos = 0, sum = 0; + for (size_t node = 0; node < RecvCounts.size(); node++) + { + digest thisAttrHash; + bool needAttr = false; + size_t MMBlockCount; + size_t SecondRecvSize = 0; + helper::ReadArray(RecvBuffer, pos, &thisAttrHash.x[0], sizeof(thisAttrHash.x), false); + size_t AttrSize = helper::ReadValue(RecvBuffer, pos, false); + AttrSizes[node] = AttrSize; + if (AttrSize && !AttrSet.count(thisAttrHash)) + { + AttrSet.insert(thisAttrHash); + needAttr = true; + size_t AlignedSize = ((AttrSize + 7) & ~0x7); + SecondRecvSize += AlignedSize; + } + + size_t MMsNeeded = 0; + MMBlockCount = helper::ReadValue(RecvBuffer, pos, false); + for (size_t block = 0; block < MMBlockCount; block++) + { + digest thisMMB; + helper::ReadArray(RecvBuffer, pos, &thisMMB.x[0], sizeof(thisMMB.x), false); + size_t thisMMBSize = helper::ReadValue(RecvBuffer, pos, false); + if (thisMMBSize && (!MMBSet.count(thisMMB))) + { + MMBSet.insert(thisMMB); + MMsNeeded += (((size_t)1) << block); + size_t AlignedSize = ((thisMMBSize + 7) & ~0x7); + MMBSizes.push_back(AlignedSize); + MMBIDs.push_back(thisMMB); + SecondRecvSize += AlignedSize; + } + } + MetaEncodeSize[node] = helper::ReadValue(RecvBuffer, pos, false); + size_t WDP = helper::ReadValue(RecvBuffer, pos, false); + WriterDataPositions[node] = WDP; + SecondRecvCounts[node] = SecondRecvSize; + BcastInfo[node] = needAttr ? ((uint64_t)1 << 63) : 0; + BcastInfo[node] |= MMsNeeded; + + // end of loop check + sum += RecvCounts[node]; + if (pos != sum) + throw std::logic_error("Bad deserialization"); + } +} + +void BP5Helper::BreakdownIncomingMData(const std::vector &RecvCounts, + std::vector &BcastInfo, + const std::vector &IncomingMMA, + std::vector &NewMetaMetaBlocks, + std::vector &AttributeEncodeBuffers, + std::vector AttrSize, std::vector MMBSizes, + std::vector MMBIDs) +{ + size_t pos = 0, sum = 0; + AttributeEncodeBuffers.clear(); + NewMetaMetaBlocks.clear(); + for (size_t node = 0; node < RecvCounts.size(); node++) + { + if (BcastInfo[node] & ((uint64_t)1 << 63)) + { + void *buffer = malloc(AttrSize[node]); + helper::ReadArray(IncomingMMA, pos, (char *)buffer, AttrSize[node]); + AttributeEncodeBuffers.push_back({buffer, AttrSize[node]}); + BcastInfo[node] &= ~((uint64_t)1 << 63); + } + size_t b = 0; + while (BcastInfo[node]) + { + if (BcastInfo[node] & ((uint64_t)1 << b)) + { + MetaMetaInfoBlock mmib; + size_t index = NewMetaMetaBlocks.size(); + mmib.MetaMetaInfoLen = MMBSizes[index]; + mmib.MetaMetaInfo = (char *)malloc(MMBSizes[index]); + helper::ReadArray(IncomingMMA, pos, mmib.MetaMetaInfo, mmib.MetaMetaInfoLen); + mmib.MetaMetaIDLen = FMformatID_len((char *)&MMBIDs[index]); + mmib.MetaMetaID = (char *)malloc(mmib.MetaMetaIDLen); + std::memcpy(mmib.MetaMetaID, (char *)&MMBIDs[index], mmib.MetaMetaIDLen); + NewMetaMetaBlocks.push_back(mmib); + BcastInfo[node] &= ~((uint64_t)1 << b); + b++; + } + } + // end of loop check + sum += RecvCounts[node]; + if (pos != sum) + throw std::logic_error("Bad deserialization"); + } +} + +void BP5Helper::GathervArraysTwoLevel(helper::Comm &groupComm, helper::Comm &groupLeaderComm, + adios2::profiling::JSONProfiler &Profiler, + uint64_t *MyContrib, size_t LocalSize, + size_t *OverallRecvCounts, size_t OverallRecvCountsSize, + uint64_t *OverallRecvBuffer, size_t DestRank) +{ + /* + * Two-step aggregation of data that requires no intermediate processing + */ + Profiler.Start("ES_meta1"); + std::vector RecvBuffer; + if (groupComm.Size() > 1) + { // level 1 + Profiler.Start("ES_meta1_gather"); + std::vector RecvCounts = groupComm.GatherValues(LocalSize, 0); + if (groupComm.Rank() == 0) + { + uint64_t TotalSize = 0; + for (auto &n : RecvCounts) + TotalSize += n; + RecvBuffer.resize(TotalSize); + /*std::cout << "MD Lvl-1: rank " << m_Comm.Rank() << " gather " + << TotalSize << " bytes from aggregator group" + << std::endl;*/ + } + groupComm.GathervArrays(MyContrib, LocalSize, RecvCounts.data(), RecvCounts.size(), + RecvBuffer.data(), 0); + Profiler.Stop("ES_meta1_gather"); + } // level 1 + Profiler.Stop("ES_meta1"); + Profiler.Start("ES_meta2"); + // level 2 + if (groupComm.Rank() == 0) + { + std::vector RecvCounts; + size_t LocalSize = RecvBuffer.size(); + if (groupLeaderComm.Size() > 1) + { + Profiler.Start("ES_meta2_gather"); + RecvCounts = groupLeaderComm.GatherValues(LocalSize, 0); + groupLeaderComm.GathervArrays(RecvBuffer.data(), LocalSize, RecvCounts.data(), + RecvCounts.size(), OverallRecvBuffer, 0); + Profiler.Stop("ES_meta2_gather"); + } + else + { + std::cout << "This should never happen" << std::endl; + } + } // level 2 + Profiler.Stop("ES_meta2"); +} + +// clang-format off +/* + * BP5AggregateInformation + * + * Here we want to avoid some of the problems with prior approaches + * to metadata aggregation by being more selective up front, possibly + * at the cost of involving more collective operations but hopefully + * with smaller data sizes. In particular, in a first phase we're + * only aggreggating MetaMetadata IDs (not bodies), and the hashes of + * attribute blocks. This is has more back-and-forth than prior + * methods, but it avoids moving duplicate attributes or metametadata + * blocks. + * + * + * Basic protocol of this aggregation method: + * First 3 steps below occur in the helper function + * 1) Fixed-size (144 bytes) gather + * This gathers AttrHash, AttrSize, MetaDataEncodeSize, + * WriterDataPosition, MetaMetaBlockCount and first 4 + * MetaMetaBlockIDs + * + * 2) Fixed-size (8 byte) bcast + * This Broadcasts a bitmap of what attributes and metametablocks + * are unique and required at rank 0 + * + * IF some rank has more than 4 MetaMetaBlocks, we know after the + * gather above and an indicator is included in the bcast so that all + * ranks fallback to a dynamic gather that includes all the + * metametablock IDs. This should be VERY rare (only on the first + * timestep after multiple new structured types are defined). + * + * 2a) Fixed-size (8 byte) size gather + * 2b) Variable-sized gatherv + * 2c) Fixed-size (8 byte) bcast + * + * IF there are new Attribute blocks or new MetaMetaBlocks Step 3 + * gathers them. This may never happen after timestep 0 + * + * 3) Variable-size gatherv to gather new Attrs or MetaMetaBlocks + * + * The last step (4) occurs in the engine, not the helper function, + * but size information is provided by the helper + * 4) variable sized gather of the actual metadata blocks + * + */ +// clang-format on + +void BP5Helper::BP5AggregateInformation(helper::Comm &mpiComm, + adios2::profiling::JSONProfiler &Profiler, + std::vector &NewMetaMetaBlocks, + std::vector &AttributeEncodeBuffers, + std::vector &MetaEncodeSize, + std::vector &WriterDataPositions) +{ + /* + * Incoming param info: We expect potentially many + * NewMetaMetaBlocks if structures are used heavily, but if not, + * just zero to two. We expect at most one AttributeEncodeBuffer + * in the vector, one MetaEncodeSize entry and one + * WriterDataPosition. (These are vectors for historical + * reasons.) + */ + BP5Helper::digest attrHash; + size_t attrLen = 0; + // If there's an AttributeEncodeBuffer, get its hash + if (AttributeEncodeBuffers.size() > 0) + { + size_t AlignedSize = ((AttributeEncodeBuffers[0].iov_len + 7) & ~0x7); + attrLen = AlignedSize; + } + if ((attrLen > 0) && AttributeEncodeBuffers[0].iov_base) + attrHash = HashOfBlock(AttributeEncodeBuffers[0].iov_base, attrLen); + std::vector RecvBuffer; + std::vector BcastInfo; + std::vector SecondRecvCounts; + std::vector AttrSize; + std::vector MMBSizes; + std::vector MMBIDs; + auto myFixedContrib = BuildFixedNodeContrib(attrHash, attrLen, NewMetaMetaBlocks, + MetaEncodeSize[0], WriterDataPositions); + bool NeedDynamic = false; + + if (mpiComm.Rank() == 0) + { + RecvBuffer.resize(mpiComm.Size() * sizeof(node_contrib)); + Profiler.Start("FixedMetaInfoGather"); + mpiComm.GatherArrays(myFixedContrib.data(), myFixedContrib.size(), RecvBuffer.data(), 0); + Profiler.Stop("FixedMetaInfoGather"); + BreakdownFixedIncomingMInfo(mpiComm.Size(), RecvBuffer, SecondRecvCounts, BcastInfo, + WriterDataPositions, MetaEncodeSize, AttrSize, MMBSizes, + MMBIDs); + Profiler.Start("MetaInfoBcast"); + mpiComm.Bcast(BcastInfo.data(), BcastInfo.size(), 0, ""); + Profiler.Stop("MetaInfoBcast"); + } + else + { + Profiler.Start("FixedMetaInfoGather"); + mpiComm.GatherArrays(myFixedContrib.data(), myFixedContrib.size(), RecvBuffer.data(), 0); + Profiler.Stop("FixedMetaInfoGather"); + BcastInfo.resize(mpiComm.Size()); + Profiler.Start("MetaInfoBcast"); + mpiComm.Bcast(BcastInfo.data(), BcastInfo.size(), 0, ""); + Profiler.Stop("MetaInfoBcast"); + } + + NeedDynamic = BcastInfo[0] == (size_t)-1; + if (NeedDynamic) + { + /* + * This portion only happens if some node has more than FIXED_MBB_SLOT_COUNT metameta blocks + * which should be almost never in practice. + */ + auto myContrib = BuildNodeContrib(attrHash, attrLen, NewMetaMetaBlocks, MetaEncodeSize[0], + WriterDataPositions); + std::vector RecvCounts = mpiComm.GatherValues(myContrib.size(), 0); + + if (mpiComm.Rank() == 0) + { + uint64_t TotalSize = 0; + TotalSize = std::accumulate(RecvCounts.begin(), RecvCounts.end(), size_t(0)); + RecvBuffer.resize(TotalSize); + Profiler.Start("DynamicInfo"); + mpiComm.GathervArrays(myContrib.data(), myContrib.size(), RecvCounts.data(), + RecvCounts.size(), RecvBuffer.data(), 0); + BreakdownIncomingMInfo(RecvCounts, RecvBuffer, SecondRecvCounts, BcastInfo, + WriterDataPositions, MetaEncodeSize, AttrSize, MMBSizes, MMBIDs); + mpiComm.Bcast(BcastInfo.data(), BcastInfo.size(), 0, ""); + Profiler.Stop("DynamicInfo"); + } + else + { + mpiComm.GathervArrays(myContrib.data(), myContrib.size(), RecvCounts.data(), + RecvCounts.size(), RecvBuffer.data(), 0); + BcastInfo.resize(mpiComm.Size()); + mpiComm.Bcast(BcastInfo.data(), BcastInfo.size(), 0, ""); + } + } + uint64_t MMASummary = std::accumulate(BcastInfo.begin(), BcastInfo.end(), size_t(0)); + + if (MMASummary == 0) + { + // Nobody has anything new to contribute WRT attributes or metametadata + // All mpiranks have the same info and will make the same decision + return; + } + // assemble my contribution to mm and attr gather + std::vector myMMAcontrib; + size_t pos = 0; + if (BcastInfo[mpiComm.Rank()] & ((uint64_t)1 << 63)) + { + // Need attr block + size_t AlignedSize = ((AttributeEncodeBuffers[0].iov_len + 7) & ~0x7); + size_t pad = AlignedSize - AttributeEncodeBuffers[0].iov_len; + myMMAcontrib.resize(AlignedSize); + helper::CopyToBuffer(myMMAcontrib, pos, (char *)AttributeEncodeBuffers[0].iov_base, + AttributeEncodeBuffers[0].iov_len); + if (pad) + { + uint64_t zero = 0; + helper::CopyToBuffer(myMMAcontrib, pos, (char *)&zero, pad); + } + } + for (size_t b = 0; b < NewMetaMetaBlocks.size(); b++) + { + if (BcastInfo[mpiComm.Rank()] & (((size_t)1) << b)) + { + size_t AlignedSize = ((NewMetaMetaBlocks[b].MetaMetaInfoLen + 7) & ~0x7); + size_t pad = AlignedSize - NewMetaMetaBlocks[b].MetaMetaInfoLen; + myMMAcontrib.resize(pos + AlignedSize); + helper::CopyToBuffer(myMMAcontrib, pos, NewMetaMetaBlocks[b].MetaMetaInfo, + NewMetaMetaBlocks[b].MetaMetaInfoLen); + if (pad) + { + uint64_t zero = 0; + helper::CopyToBuffer(myMMAcontrib, pos, (char *)&zero, pad); + } + } + } + // per above, is 8-byte aligned + uint64_t AlignedContribCount = myMMAcontrib.size() / 8; + uint64_t *AlignedContrib = reinterpret_cast(myMMAcontrib.data()); + uint64_t TotalSize = + std::accumulate(SecondRecvCounts.begin(), SecondRecvCounts.end(), size_t(0)); + auto AlignedCounts = SecondRecvCounts; + for (auto &C : AlignedCounts) + C /= 8; + if (mpiComm.Rank() == 0) + { + std::vector IncomingMMA(TotalSize); + uint64_t *AlignedIncomingData = reinterpret_cast(IncomingMMA.data()); + Profiler.Start("SelectMetaInfoGather"); + mpiComm.GathervArrays(AlignedContrib, AlignedContribCount, AlignedCounts.data(), + AlignedCounts.size(), AlignedIncomingData, 0); + Profiler.Stop("SelectMetaInfoGather"); + BreakdownIncomingMData(SecondRecvCounts, BcastInfo, IncomingMMA, NewMetaMetaBlocks, + AttributeEncodeBuffers, AttrSize, MMBSizes, MMBIDs); + } + else + { + Profiler.Start("SelectMetaInfoGather"); + mpiComm.GathervArrays(AlignedContrib, AlignedContribCount, AlignedCounts.data(), + AlignedCounts.size(), (uint64_t *)nullptr, 0); + Profiler.Stop("SelectMetaInfoGather"); + } +} + +} // namespace format +} // namespace adios diff --git a/source/adios2/toolkit/format/bp5/BP5Helper.h b/source/adios2/toolkit/format/bp5/BP5Helper.h new file mode 100644 index 0000000000..565a3831d1 --- /dev/null +++ b/source/adios2/toolkit/format/bp5/BP5Helper.h @@ -0,0 +1,106 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * BP5Helper.h + */ + +#ifndef ADIOS2_TOOLKIT_FORMAT_BP5_BP5HELPER_H_ +#define ADIOS2_TOOLKIT_FORMAT_BP5_BP5HELPER_H_ + +#include "BP5Base.h" +#include "adios2/core/Attribute.h" +#include "adios2/core/CoreTypes.h" +#include "adios2/toolkit/profiling/iochrono/IOChrono.h" +#ifdef _WIN32 +#pragma warning(disable : 4250) +#endif +#include +#include +#include + +namespace adios2 +{ +namespace format +{ + +class BP5Helper : virtual public BP5Base +{ +public: + static void BP5AggregateInformation(helper::Comm &mpiComm, + adios2::profiling::JSONProfiler &Profiler, + std::vector &NewMetaMetaBlocks, + std::vector &AttributeEncodeBuffers, + std::vector &MetaEncodeSize, + std::vector &WriterDataPositions); + + static void GathervArraysTwoLevel(helper::Comm &groupComm, helper::Comm &groupLeaderComm, + adios2::profiling::JSONProfiler &Profiler, + uint64_t *MyContrib, size_t LocalSize, + size_t *OverallRecvCounts, size_t OverallRecvCountsSize, + uint64_t *OverallRecvBuffer, size_t DestRank); + struct digest + { + uint64_t x[2] = {0, 0}; + // compare for order + bool operator<(const digest &dg) const + { + if (x[0] != dg.x[0]) + return (x[0] < dg.x[0]); + return (x[1] < dg.x[1]); + }; + + bool IsZero() { return ((x[0] == 0) && (x[1] == 0)); }; + friend std::ostream &operator<<(std::ostream &os, const digest &d) + { + std::cout << "0x" << std::setw(8) << std::setfill('0') << std::hex << d.x[0] << d.x[1]; + return os; + }; + }; + + static digest HashOfBlock(const void *block, const size_t block_len); + +private: +#define FIXED_MMB_SLOT_COUNT 4 + struct node_contrib + { + digest AttrHash; + size_t AttrSize; + size_t MMBCount; + digest MMBArray[FIXED_MMB_SLOT_COUNT]; + size_t MMBSizeArray[FIXED_MMB_SLOT_COUNT]; + size_t MetaEncodeSize; + uint64_t WriterDataPosition; + }; + static std::vector + BuildNodeContrib(const digest attrHash, size_t attrSize, + const std::vector NewMetaMetaBlocks, + const size_t MetaEncodeSize, const std::vector WriterDataPositions); + static std::vector + BuildFixedNodeContrib(const digest attrHash, size_t attrSize, + const std::vector NewMetaMetaBlocks, + const size_t MetaEncodeSize, + const std::vector WriterDataPositions); + static void + BreakdownIncomingMInfo(const std::vector RecvCounts, const std::vector RecvBuffer, + std::vector &SecondRecvCounts, std::vector &BcastInfo, + std::vector &WriterDataPositions, + std::vector &MetaEncodeSize, std::vector &AttrSizes, + std::vector &MMBSizes, std::vector &MBBIDs); + static void BreakdownFixedIncomingMInfo( + const size_t NodeCount, const std::vector RecvBuffer, + std::vector &SecondRecvCounts, std::vector &BcastInfo, + std::vector &WriterDataPositions, std::vector &MetaEncodeSize, + std::vector &AttrSizes, std::vector &MMBSizes, std::vector &MBBIDs); + static void BreakdownIncomingMData(const std::vector &RecvCounts, + std::vector &BcastInfo, + const std::vector &IncomingMMA, + std::vector &NewMetaMetaBlocks, + std::vector &AttributeEncodeBuffers, + std::vector AttrSize, std::vector MMBSizes, + std::vector MMBIDs); +}; +} // end namespace format +} // end namespace adios2 + +#endif /* ADIOS2_TOOLKIT_FORMAT_BP5_BP5HELPER_H_ */ diff --git a/source/adios2/toolkit/format/bp5/BP5Serializer.cpp b/source/adios2/toolkit/format/bp5/BP5Serializer.cpp index 6cfbdf8956..0d13c829cd 100644 --- a/source/adios2/toolkit/format/bp5/BP5Serializer.cpp +++ b/source/adios2/toolkit/format/bp5/BP5Serializer.cpp @@ -6,6 +6,7 @@ * */ +#include "BP5Helper.h" #include "adios2/core/Attribute.h" #include "adios2/core/Engine.h" #include "adios2/core/IO.h" @@ -1474,6 +1475,8 @@ std::vector BP5Serializer::CopyMetadataToContiguous( const uint64_t ABCount = AttributeEncodeBuffers.size(); const uint64_t DSCount = DataSizes.size(); const uint64_t WDPCount = WriterDataPositions.size(); + std::set AttrSet; + std::vector NeedThisAttr(AttributeEncodeBuffers.size()); // count sizes RetSize += sizeof(NMMBCount); // NMMB count @@ -1490,11 +1493,28 @@ std::vector BP5Serializer::CopyMetadataToContiguous( RetSize += AlignedSize; } RetSize += sizeof(ABCount); // Number of attr blocks + size_t attrnum = 0; + for (auto &a : AttributeEncodeBuffers) + { + if (a.iov_base && (a.iov_len > 0)) + { + auto thisAttrHash = BP5Helper::HashOfBlock(a.iov_base, a.iov_len); + NeedThisAttr[attrnum] = (AttrSet.count(thisAttrHash) == 0); + if (NeedThisAttr[attrnum]) + { + AttrSet.insert(thisAttrHash); + } + } + attrnum++; + } + attrnum = 0; for (auto &a : AttributeEncodeBuffers) { RetSize += sizeof(uint64_t); // AttrEncodeLen size_t AlignedSize = ((a.iov_len + 7) & ~0x7); - RetSize += AlignedSize; + if (NeedThisAttr[attrnum]) + RetSize += AlignedSize; + attrnum++; } RetSize += sizeof(DSCount); RetSize += DataSizes.size() * sizeof(uint64_t); @@ -1528,9 +1548,10 @@ std::vector BP5Serializer::CopyMetadataToContiguous( } helper::CopyToBuffer(Ret, Position, &ABCount); + attrnum = 0; for (auto &a : AttributeEncodeBuffers) { - if (a.iov_base) + if (a.iov_base && NeedThisAttr[attrnum]) { size_t AlignedSize = ((a.iov_len + 7) & ~0x7); helper::CopyToBuffer(Ret, Position, &AlignedSize); @@ -1546,6 +1567,7 @@ std::vector BP5Serializer::CopyMetadataToContiguous( size_t ZeroSize = 0; helper::CopyToBuffer(Ret, Position, &ZeroSize); } + attrnum++; } helper::CopyToBuffer(Ret, Position, &DSCount); diff --git a/source/adios2/toolkit/profiling/iochrono/IOChrono.cpp b/source/adios2/toolkit/profiling/iochrono/IOChrono.cpp index 8e7ed456b1..ba81ba179f 100644 --- a/source/adios2/toolkit/profiling/iochrono/IOChrono.cpp +++ b/source/adios2/toolkit/profiling/iochrono/IOChrono.cpp @@ -44,10 +44,22 @@ JSONProfiler::JSONProfiler(helper::Comm const &comm) : m_Comm(comm) AddTimerWatch("PP"); AddTimerWatch("ES_meta1_gather", false); AddTimerWatch("ES_meta2_gather", false); + AddTimerWatch("ES_write_metadata", false); + AddTimerWatch("MetadataBlockWrite", false); + AddTimerWatch("ES_AGG1", false); + AddTimerWatch("ES_GatherMetadataBlocks", false); + AddTimerWatch("ES_simple_meta", false); + AddTimerWatch("ES_simple_gather", false); AddTimerWatch("ES_meta1"); AddTimerWatch("ES_meta2"); + AddTimerWatch("ES_aggregate_info", false); + AddTimerWatch("ES_gather_write_meta", false); + AddTimerWatch("FixedMetaInfoGather", false); + AddTimerWatch("MetaInfoBcast", false); + AddTimerWatch("SelectMetaInfoGather", false); + AddTimerWatch("ES_close"); AddTimerWatch("ES_AWD"); AddTimerWatch("WaitOnAsync"); diff --git a/testing/adios2/performance/metadata/PerfMetaData.cpp b/testing/adios2/performance/metadata/PerfMetaData.cpp index afd1f4415f..733299fecf 100644 --- a/testing/adios2/performance/metadata/PerfMetaData.cpp +++ b/testing/adios2/performance/metadata/PerfMetaData.cpp @@ -28,6 +28,7 @@ int NSteps = 10; int NumVars = 100; int NumArrays = 100; int NumAttrs = 100; +bool AttributesEverywhere = false; int NumBlocks = 1; int ReaderDelay = 0; int WriterSize; @@ -150,6 +151,10 @@ static void ParseArgs(int argc, char **argv) argv++; argc--; } + else if (std::string(argv[1]) == "--attributes_everywhere") + { + AttributesEverywhere = true; + } else if (std::string(argv[1]) == "--num_blocks") { std::istringstream ss(argv[2]); @@ -301,6 +306,9 @@ void DoWriter(adios2::Params writerParams) std::string varname = "Variable" + std::to_string(i); Floats[i] = io.DefineVariable(varname); } + } + if ((mpiRank == 0) || AttributesEverywhere) + { for (int i = 0; i < NumAttrs; i++) { std::string varname = "Attribute" + std::to_string(i); @@ -530,7 +538,11 @@ int main(int argc, char **argv) { DoWriter(engineParams); if (key == 0) + { std::cout << "File Writer Time " << elapsed.count() << " seconds." << std::endl; + std::cout << "Per step writer time " << elapsed.count() / (double)NSteps + << " seconds." << std::endl; + } } else {