diff --git a/src/bucket/BucketIndex.h b/src/bucket/BucketIndex.h index 7dd34fc999..39e15fd0fb 100644 --- a/src/bucket/BucketIndex.h +++ b/src/bucket/BucketIndex.h @@ -15,6 +15,11 @@ #include +namespace asio +{ +class io_context; +} + namespace stellar { @@ -86,7 +91,7 @@ class BucketIndex : public NonMovableOrCopyable // Otherwise range index is used, with the range defined by pageSize. static std::unique_ptr createIndex(BucketManager& bm, std::filesystem::path const& filename, - Hash const& hash); + Hash const& hash, asio::io_context& ctx); // Loads index from given file. If file does not exist or if saved // index does not have same parameters as current config, return null diff --git a/src/bucket/BucketIndexImpl.cpp b/src/bucket/BucketIndexImpl.cpp index f4108d0fb3..f3aa69f301 100644 --- a/src/bucket/BucketIndexImpl.cpp +++ b/src/bucket/BucketIndexImpl.cpp @@ -10,6 +10,7 @@ #include "ledger/LedgerTypeUtils.h" #include "main/Config.h" #include "util/BinaryFuseFilter.h" +#include "util/BufferedAsioCerealOutputArchive.h" #include "util/Fs.h" #include "util/LogSlowExecution.h" #include "util/Logging.h" @@ -69,7 +70,8 @@ template BucketIndexImpl::BucketIndexImpl(BucketManager& bm, std::filesystem::path const& filename, std::streamoff pageSize, - Hash const& hash) + Hash const& hash, + asio::io_context& ctx) : mBloomMissMeter(bm.getBloomMissMeter()) , mBloomLookupMeter(bm.getBloomLookupMeter()) { @@ -96,7 +98,7 @@ BucketIndexImpl::BucketIndexImpl(BucketManager& bm, std::streamoff pageUpperBound = 0; BucketEntry be; size_t iter = 0; - size_t count = 0; + [[maybe_unused]] size_t count = 0; std::vector keyHashes; auto seed = shortHash::getShortHashInitKey(); @@ -205,7 +207,7 @@ BucketIndexImpl::BucketIndexImpl(BucketManager& bm, if (bm.getConfig().isPersistingBucketListDBIndexes()) { - saveToDisk(bm, hash); + saveToDisk(bm, hash, ctx); } } @@ -214,14 +216,14 @@ BucketIndexImpl::BucketIndexImpl(BucketManager& bm, template <> void BucketIndexImpl::saveToDisk( - BucketManager& bm, Hash const& hash) const + BucketManager& bm, Hash const& hash, asio::io_context& ctx) const { } template <> void -BucketIndexImpl::saveToDisk(BucketManager& bm, - Hash const& hash) const +BucketIndexImpl::saveToDisk( + BucketManager& bm, Hash const& hash, asio::io_context& ctx) const { ZoneScoped; releaseAssert(bm.getConfig().isPersistingBucketListDBIndexes()); @@ -235,10 +237,9 @@ BucketIndexImpl::saveToDisk(BucketManager& bm, tmpFilename); { - std::ofstream out; - out.exceptions(std::ios::failbit | std::ios::badbit); - out.open(tmpFilename, std::ios_base::binary | std::ios_base::trunc); - cereal::BinaryOutputArchive ar(out); + OutputFileStream out(ctx, !bm.getConfig().DISABLE_XDR_FSYNC); + out.open(tmpFilename); + cereal::BufferedAsioOutputArchive ar(out); ar(mData); } @@ -331,7 +332,7 @@ upper_bound_pred(LedgerKey const& key, IndexEntryT const& indexEntry) std::unique_ptr BucketIndex::createIndex(BucketManager& bm, std::filesystem::path const& filename, - Hash const& hash) + Hash const& hash, asio::io_context& ctx) { ZoneScoped; auto const& cfg = bm.getConfig(); @@ -348,7 +349,8 @@ BucketIndex::createIndex(BucketManager& bm, "bucket {}", filename); return std::unique_ptr const>( - new BucketIndexImpl(bm, filename, 0, hash)); + new BucketIndexImpl(bm, filename, 0, hash, + ctx)); } else { @@ -358,7 +360,8 @@ BucketIndex::createIndex(BucketManager& bm, "{} in bucket {}", pageSize, filename); return std::unique_ptr const>( - new BucketIndexImpl(bm, filename, pageSize, hash)); + new BucketIndexImpl(bm, filename, pageSize, hash, + ctx)); } } // BucketIndexImpl throws if BucketManager shuts down before index finishes, diff --git a/src/bucket/BucketIndexImpl.h b/src/bucket/BucketIndexImpl.h index d34155b055..74890ec3a9 100644 --- a/src/bucket/BucketIndexImpl.h +++ b/src/bucket/BucketIndexImpl.h @@ -10,6 +10,7 @@ #include "util/BinaryFuseFilter.h" #include "xdr/Stellar-types.h" +#include "util/BufferedAsioCerealOutputArchive.h" #include #include #include @@ -60,14 +61,16 @@ template class BucketIndexImpl : public BucketIndex medida::Meter& mBloomLookupMeter; BucketIndexImpl(BucketManager& bm, std::filesystem::path const& filename, - std::streamoff pageSize, Hash const& hash); + std::streamoff pageSize, Hash const& hash, + asio::io_context& ctx); template BucketIndexImpl(BucketManager const& bm, Archive& ar, std::streamoff pageSize); // Saves index to disk, overwriting any preexisting file for this index - void saveToDisk(BucketManager& bm, Hash const& hash) const; + void saveToDisk(BucketManager& bm, Hash const& hash, + asio::io_context& ctx) const; // Returns [lowFileOffset, highFileOffset) that contain the key ranges // [lowerBound, upperBound]. If no file offsets exist, returns [0, 0] diff --git a/src/bucket/BucketOutputIterator.cpp b/src/bucket/BucketOutputIterator.cpp index 412cfad724..aeb5dac49b 100644 --- a/src/bucket/BucketOutputIterator.cpp +++ b/src/bucket/BucketOutputIterator.cpp @@ -24,6 +24,7 @@ BucketOutputIterator::BucketOutputIterator(std::string const& tmpDir, asio::io_context& ctx, bool doFsync) : mFilename(Bucket::randomBucketName(tmpDir)) , mOut(ctx, doFsync) + , mCtx(ctx) , mBuf(nullptr) , mKeepDeadEntries(keepDeadEntries) , mMeta(meta) @@ -131,7 +132,9 @@ BucketOutputIterator::getBucket(BucketManager& bucketManager, if (auto b = bucketManager.getBucketIfExists(hash); !b || !b->isIndexed()) { - index = BucketIndex::createIndex(bucketManager, mFilename, hash); + index = + BucketIndex::createIndex(bucketManager, mFilename, hash, mCtx); + releaseAssertOrThrow(index); } } diff --git a/src/bucket/BucketOutputIterator.h b/src/bucket/BucketOutputIterator.h index 2b035f5f11..20aed133c5 100644 --- a/src/bucket/BucketOutputIterator.h +++ b/src/bucket/BucketOutputIterator.h @@ -26,6 +26,7 @@ class BucketOutputIterator std::filesystem::path mFilename; XDROutputFileStream mOut; BucketEntryIdCmp mCmp; + asio::io_context& mCtx; std::unique_ptr mBuf; SHA256 mHasher; size_t mBytesPut{0}; diff --git a/src/catchup/IndexBucketsWork.cpp b/src/catchup/IndexBucketsWork.cpp index 5019b48757..40ff5d21f9 100644 --- a/src/catchup/IndexBucketsWork.cpp +++ b/src/catchup/IndexBucketsWork.cpp @@ -23,13 +23,12 @@ IndexBucketsWork::IndexWork::IndexWork(Application& app, BasicWork::State IndexBucketsWork::IndexWork::onRun() { - if (mDone) + if (mState == State::WORK_WAITING) { - return State::WORK_SUCCESS; + postWork(); } - postWork(); - return State::WORK_WAITING; + return mState; } bool @@ -38,15 +37,22 @@ IndexBucketsWork::IndexWork::onAbort() return true; }; +void +IndexBucketsWork::IndexWork::onReset() +{ + mState = BasicWork::State::WORK_WAITING; +} + void IndexBucketsWork::IndexWork::postWork() { Application& app = this->mApp; + asio::io_context& ctx = app.getWorkerIOContext(); std::weak_ptr weak( std::static_pointer_cast(shared_from_this())); app.postOnBackgroundThread( - [&app, weak]() { + [&app, &ctx, weak]() { auto self = weak.lock(); if (!self || self->isAborting()) { @@ -80,8 +86,9 @@ IndexBucketsWork::IndexWork::postWork() if (!self->mIndex) { - self->mIndex = BucketIndex::createIndex( - bm, self->mBucket->getFilename(), self->mBucket->getHash()); + self->mIndex = + BucketIndex::createIndex(bm, self->mBucket->getFilename(), + self->mBucket->getHash(), ctx); } app.postOnMainThread( @@ -89,11 +96,18 @@ IndexBucketsWork::IndexWork::postWork() auto self = weak.lock(); if (self) { - self->mDone = true; - if (!self->isAborting()) + if (self->mIndex) + { + self->mState = BasicWork::State::WORK_SUCCESS; + if (!self->isAborting()) + { + self->mApp.getBucketManager().maybeSetIndex( + self->mBucket, std::move(self->mIndex)); + } + } + else { - self->mApp.getBucketManager().maybeSetIndex( - self->mBucket, std::move(self->mIndex)); + self->mState = BasicWork::State::WORK_FAILURE; } self->wakeUp(); } diff --git a/src/catchup/IndexBucketsWork.h b/src/catchup/IndexBucketsWork.h index 65a0f0e18a..5473c1c7d7 100644 --- a/src/catchup/IndexBucketsWork.h +++ b/src/catchup/IndexBucketsWork.h @@ -4,6 +4,7 @@ #pragma once +#include "work/BasicWork.h" #include "work/Work.h" #include @@ -20,7 +21,7 @@ class IndexBucketsWork : public Work { std::shared_ptr mBucket; std::unique_ptr mIndex; - bool mDone{false}; + BasicWork::State mState{BasicWork::State::WORK_WAITING}; void postWork(); @@ -30,6 +31,7 @@ class IndexBucketsWork : public Work protected: State onRun() override; bool onAbort() override; + void onReset() override; }; std::vector> const& mBuckets; diff --git a/src/util/BufferedAsioCerealOutputArchive.h b/src/util/BufferedAsioCerealOutputArchive.h new file mode 100644 index 0000000000..f79913ddfc --- /dev/null +++ b/src/util/BufferedAsioCerealOutputArchive.h @@ -0,0 +1,81 @@ +#pragma once + +#include "util/XDRStream.h" +#include +#include + +namespace cereal +{ + +// Mirrors CEREAL_ARCHIVE_RESTRICT from cereal/details/traits.hpp for single +// types +#define CEREAL_ARCHIVE_RESTRICT_SINGLE_TYPE(TYPE) \ + typename std::enable_if< \ + cereal::traits::is_same_archive::value, void>::type + +// This is a basic reimplementation of BinaryOutputArchive +// (cereal/archives/binary.hpp) that uses our own OutputFileStream instead of +// std::ofstream for writes in order to support fsync. For input we can just use +// cereal's BinaryInputArchive because we don't care about fsync for reads. +class BufferedAsioOutputArchive + : public OutputArchive +{ + public: + // Construct, outputting to the provided stream + // @param stream The stream to output to. Can be a stringstream, a file + // stream, or even cout! + BufferedAsioOutputArchive(stellar::OutputFileStream& stream) + : OutputArchive(this) + , itsStream(stream) + { + } + + ~BufferedAsioOutputArchive() CEREAL_NOEXCEPT = default; + + // Writes size bytes of data to the output stream + void + saveBinary(const void* data, std::streamsize size) + { + itsStream.writeBytes(static_cast(data), size); + } + + private: + stellar::OutputFileStream& itsStream; +}; + +// Saving for POD types to binary +template +inline typename std::enable_if::value, void>::type +CEREAL_SAVE_FUNCTION_NAME(BufferedAsioOutputArchive& ar, T const& t) +{ + ar.saveBinary(std::addressof(t), sizeof(t)); +} + +// Serializing NVP types to binary +template + +inline CEREAL_ARCHIVE_RESTRICT_SINGLE_TYPE(BufferedAsioOutputArchive) + CEREAL_SERIALIZE_FUNCTION_NAME(Archive& ar, NameValuePair& t) +{ + ar(t.value); +} + +// Serializing SizeTags to binary +template +inline CEREAL_ARCHIVE_RESTRICT_SINGLE_TYPE(BufferedAsioOutputArchive) + CEREAL_SERIALIZE_FUNCTION_NAME(Archive& ar, SizeTag& t) +{ + ar(t.size); +} + +// Saving binary data +template +inline void +CEREAL_SAVE_FUNCTION_NAME(BufferedAsioOutputArchive& ar, + BinaryData const& bd) +{ + ar.saveBinary(bd.data, static_cast(bd.size)); +} +} + +CEREAL_REGISTER_ARCHIVE(cereal::BufferedAsioOutputArchive) diff --git a/src/util/XDRStream.h b/src/util/XDRStream.h index 8187d6d191..69ab747057 100644 --- a/src/util/XDRStream.h +++ b/src/util/XDRStream.h @@ -224,10 +224,11 @@ class XDRInputFileStream } }; -// XDROutputFileStream needs access to a file descriptor to do fsync, so we use +// OutputFileStream needs access to a file descriptor to do fsync, so we use // asio's synchronous stream types here rather than fstreams. -class XDROutputFileStream +class OutputFileStream { + protected: std::vector mBuf; const bool mFsyncOnClose; @@ -241,7 +242,7 @@ class XDROutputFileStream #endif public: - XDROutputFileStream(asio::io_context& ctx, bool fsyncOnClose) + OutputFileStream(asio::io_context& ctx, bool fsyncOnClose) : mFsyncOnClose(fsyncOnClose) #ifndef WIN32 , mBufferedWriteStream(ctx, stellar::fs::bufsz()) @@ -249,7 +250,7 @@ class XDROutputFileStream { } - ~XDROutputFileStream() + ~OutputFileStream() { if (isOpen()) { @@ -377,50 +378,21 @@ class XDROutputFileStream return isOpen(); } - template - void - durableWriteOne(T const& t, SHA256* hasher = nullptr, - size_t* bytesPut = nullptr) - { - writeOne(t, hasher, bytesPut); - flush(); - fs::flushFileChanges(getHandle()); - } - - template void - writeOne(T const& t, SHA256* hasher = nullptr, size_t* bytesPut = nullptr) + writeBytes(char const* buf, size_t const sizeBytes) { ZoneScoped; if (!isOpen()) { FileSystemException::failWith( - "XDROutputFileStream::writeOne() on non-open stream"); - } - - uint32_t sz = (uint32_t)xdr::xdr_size(t); - releaseAssertOrThrow(sz < 0x80000000); - - if (mBuf.size() < sz + 4) - { - mBuf.resize(sz + 4); + "OutputFileStream::writeBytes() on non-open stream"); } - // Write 4 bytes of size, big-endian, with XDR 'continuation' bit set on - // high bit of high byte. - mBuf[0] = static_cast((sz >> 24) & 0xFF) | '\x80'; - mBuf[1] = static_cast((sz >> 16) & 0xFF); - mBuf[2] = static_cast((sz >> 8) & 0xFF); - mBuf[3] = static_cast(sz & 0xFF); - xdr::xdr_put p(mBuf.data() + 4, mBuf.data() + 4 + sz); - xdr_argpack_archive(p, t); - - size_t const to_write = sz + 4; size_t written = 0; - while (written < to_write) + while (written < sizeBytes) { #ifdef WIN32 - auto w = fwrite(mBuf.data() + written, 1, to_write - written, mOut); + auto w = fwrite(buf + written, 1, sizeBytes - written, mOut); if (w == 0) { FileSystemException::failWith( @@ -429,8 +401,8 @@ class XDROutputFileStream written += w; #else asio::error_code ec; - auto buf = asio::buffer(mBuf.data() + written, to_write - written); - written += asio::write(mBufferedWriteStream, buf, ec); + auto asioBuf = asio::buffer(buf + written, sizeBytes - written); + written += asio::write(mBufferedWriteStream, asioBuf, ec); if (ec) { if (ec == asio::error::interrupted) @@ -447,9 +419,56 @@ class XDROutputFileStream } #endif } + } +}; + +class XDROutputFileStream : public OutputFileStream +{ + public: + XDROutputFileStream(asio::io_context& ctx, bool fsyncOnClose) + : OutputFileStream(ctx, fsyncOnClose) + { + } + + template + void + durableWriteOne(T const& t, SHA256* hasher = nullptr, + size_t* bytesPut = nullptr) + { + writeOne(t, hasher, bytesPut); + flush(); + fs::flushFileChanges(getHandle()); + } + + template + void + writeOne(T const& t, SHA256* hasher = nullptr, size_t* bytesPut = nullptr) + { + ZoneScoped; + uint32_t sz = (uint32_t)xdr::xdr_size(t); + releaseAssertOrThrow(sz < 0x80000000); + + if (mBuf.size() < sz + 4) + { + mBuf.resize(sz + 4); + } + + // Write 4 bytes of size, big-endian, with XDR 'continuation' bit set on + // high bit of high byte. + mBuf[0] = static_cast((sz >> 24) & 0xFF) | '\x80'; + mBuf[1] = static_cast((sz >> 16) & 0xFF); + mBuf[2] = static_cast((sz >> 8) & 0xFF); + mBuf[3] = static_cast(sz & 0xFF); + xdr::xdr_put p(mBuf.data() + 4, mBuf.data() + 4 + sz); + xdr_argpack_archive(p, t); + + // Buffer is 4 bytes of encoded size, followed by encoded object + size_t const toWrite = sz + 4; + writeBytes(mBuf.data(), toWrite); + if (hasher) { - hasher->add(ByteSlice(mBuf.data(), sz + 4)); + hasher->add(ByteSlice(mBuf.data(), toWrite)); } if (bytesPut) {