Skip to content

Commit

Permalink
fsync on BucketIndex writes (#4549)
Browse files Browse the repository at this point in the history
# Description

Resolves #4546

This change calls fsync after writing `BucketIndex` files. This change
was slightly more involved than expected. There's no way to fall fsync
on files opened via `std::fstream`, so I had to implement a new cereal
archive type that accepts our own
`asio::buffered_write_streamasio::buffered_write_stream` backed stream
type.

# Checklist
- [x] Reviewed the
[contributing](https://github.com/stellar/stellar-core/blob/master/CONTRIBUTING.md#submitting-changes)
document
- [x] Rebased on top of master (no merge commits)
- [x] Ran `clang-format` v8.0.0 (via `make format` or the Visual Studio
extension)
- [x] Compiles
- [x] Ran all tests
- [ ] If change impacts performance, include supporting evidence per the
[performance
document](https://github.com/stellar/stellar-core/blob/master/performance-eval/performance-eval.md)
  • Loading branch information
marta-lokhova authored Nov 20, 2024
2 parents 5305f78 + 92ea68c commit 1f5ab3e
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 69 deletions.
7 changes: 6 additions & 1 deletion src/bucket/BucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@

#include <cereal/archives/binary.hpp>

namespace asio
{
class io_context;
}

namespace stellar
{

Expand Down Expand Up @@ -86,7 +91,7 @@ class BucketIndex : public NonMovableOrCopyable
// Otherwise range index is used, with the range defined by pageSize.
static std::unique_ptr<BucketIndex const>
createIndex(BucketManager& bm, std::filesystem::path const& filename,
Hash const& hash);
Hash const& hash, asio::io_context& ctx);

// Loads index from given file. If file does not exist or if saved
// index does not have same parameters as current config, return null
Expand Down
29 changes: 16 additions & 13 deletions src/bucket/BucketIndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "ledger/LedgerTypeUtils.h"
#include "main/Config.h"
#include "util/BinaryFuseFilter.h"
#include "util/BufferedAsioCerealOutputArchive.h"
#include "util/Fs.h"
#include "util/LogSlowExecution.h"
#include "util/Logging.h"
Expand Down Expand Up @@ -69,7 +70,8 @@ template <class IndexT>
BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
std::filesystem::path const& filename,
std::streamoff pageSize,
Hash const& hash)
Hash const& hash,
asio::io_context& ctx)
: mBloomMissMeter(bm.getBloomMissMeter())
, mBloomLookupMeter(bm.getBloomLookupMeter())
{
Expand All @@ -96,7 +98,7 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
std::streamoff pageUpperBound = 0;
BucketEntry be;
size_t iter = 0;
size_t count = 0;
[[maybe_unused]] size_t count = 0;

std::vector<uint64_t> keyHashes;
auto seed = shortHash::getShortHashInitKey();
Expand Down Expand Up @@ -205,7 +207,7 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,

if (bm.getConfig().isPersistingBucketListDBIndexes())
{
saveToDisk(bm, hash);
saveToDisk(bm, hash, ctx);
}
}

Expand All @@ -214,14 +216,14 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
template <>
void
BucketIndexImpl<BucketIndex::IndividualIndex>::saveToDisk(
BucketManager& bm, Hash const& hash) const
BucketManager& bm, Hash const& hash, asio::io_context& ctx) const
{
}

template <>
void
BucketIndexImpl<BucketIndex::RangeIndex>::saveToDisk(BucketManager& bm,
Hash const& hash) const
BucketIndexImpl<BucketIndex::RangeIndex>::saveToDisk(
BucketManager& bm, Hash const& hash, asio::io_context& ctx) const
{
ZoneScoped;
releaseAssert(bm.getConfig().isPersistingBucketListDBIndexes());
Expand All @@ -235,10 +237,9 @@ BucketIndexImpl<BucketIndex::RangeIndex>::saveToDisk(BucketManager& bm,
tmpFilename);

{
std::ofstream out;
out.exceptions(std::ios::failbit | std::ios::badbit);
out.open(tmpFilename, std::ios_base::binary | std::ios_base::trunc);
cereal::BinaryOutputArchive ar(out);
OutputFileStream out(ctx, !bm.getConfig().DISABLE_XDR_FSYNC);
out.open(tmpFilename);
cereal::BufferedAsioOutputArchive ar(out);
ar(mData);
}

Expand Down Expand Up @@ -331,7 +332,7 @@ upper_bound_pred(LedgerKey const& key, IndexEntryT const& indexEntry)
std::unique_ptr<BucketIndex const>
BucketIndex::createIndex(BucketManager& bm,
std::filesystem::path const& filename,
Hash const& hash)
Hash const& hash, asio::io_context& ctx)
{
ZoneScoped;
auto const& cfg = bm.getConfig();
Expand All @@ -348,7 +349,8 @@ BucketIndex::createIndex(BucketManager& bm,
"bucket {}",
filename);
return std::unique_ptr<BucketIndexImpl<IndividualIndex> const>(
new BucketIndexImpl<IndividualIndex>(bm, filename, 0, hash));
new BucketIndexImpl<IndividualIndex>(bm, filename, 0, hash,
ctx));
}
else
{
Expand All @@ -358,7 +360,8 @@ BucketIndex::createIndex(BucketManager& bm,
"{} in bucket {}",
pageSize, filename);
return std::unique_ptr<BucketIndexImpl<RangeIndex> const>(
new BucketIndexImpl<RangeIndex>(bm, filename, pageSize, hash));
new BucketIndexImpl<RangeIndex>(bm, filename, pageSize, hash,
ctx));
}
}
// BucketIndexImpl throws if BucketManager shuts down before index finishes,
Expand Down
7 changes: 5 additions & 2 deletions src/bucket/BucketIndexImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "util/BinaryFuseFilter.h"
#include "xdr/Stellar-types.h"

#include "util/BufferedAsioCerealOutputArchive.h"
#include <cereal/types/map.hpp>
#include <map>
#include <memory>
Expand Down Expand Up @@ -60,14 +61,16 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
medida::Meter& mBloomLookupMeter;

BucketIndexImpl(BucketManager& bm, std::filesystem::path const& filename,
std::streamoff pageSize, Hash const& hash);
std::streamoff pageSize, Hash const& hash,
asio::io_context& ctx);

template <class Archive>
BucketIndexImpl(BucketManager const& bm, Archive& ar,
std::streamoff pageSize);

// Saves index to disk, overwriting any preexisting file for this index
void saveToDisk(BucketManager& bm, Hash const& hash) const;
void saveToDisk(BucketManager& bm, Hash const& hash,
asio::io_context& ctx) const;

// Returns [lowFileOffset, highFileOffset) that contain the key ranges
// [lowerBound, upperBound]. If no file offsets exist, returns [0, 0]
Expand Down
5 changes: 4 additions & 1 deletion src/bucket/BucketOutputIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ BucketOutputIterator::BucketOutputIterator(std::string const& tmpDir,
asio::io_context& ctx, bool doFsync)
: mFilename(Bucket::randomBucketName(tmpDir))
, mOut(ctx, doFsync)
, mCtx(ctx)
, mBuf(nullptr)
, mKeepDeadEntries(keepDeadEntries)
, mMeta(meta)
Expand Down Expand Up @@ -131,7 +132,9 @@ BucketOutputIterator::getBucket(BucketManager& bucketManager,
if (auto b = bucketManager.getBucketIfExists(hash);
!b || !b->isIndexed())
{
index = BucketIndex::createIndex(bucketManager, mFilename, hash);
index =
BucketIndex::createIndex(bucketManager, mFilename, hash, mCtx);
releaseAssertOrThrow(index);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/bucket/BucketOutputIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class BucketOutputIterator
std::filesystem::path mFilename;
XDROutputFileStream mOut;
BucketEntryIdCmp mCmp;
asio::io_context& mCtx;
std::unique_ptr<BucketEntry> mBuf;
SHA256 mHasher;
size_t mBytesPut{0};
Expand Down
36 changes: 25 additions & 11 deletions src/catchup/IndexBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ IndexBucketsWork::IndexWork::IndexWork(Application& app,
BasicWork::State
IndexBucketsWork::IndexWork::onRun()
{
if (mDone)
if (mState == State::WORK_WAITING)
{
return State::WORK_SUCCESS;
postWork();
}

postWork();
return State::WORK_WAITING;
return mState;
}

bool
Expand All @@ -38,15 +37,22 @@ IndexBucketsWork::IndexWork::onAbort()
return true;
};

void
IndexBucketsWork::IndexWork::onReset()
{
mState = BasicWork::State::WORK_WAITING;
}

void
IndexBucketsWork::IndexWork::postWork()
{
Application& app = this->mApp;
asio::io_context& ctx = app.getWorkerIOContext();

std::weak_ptr<IndexWork> weak(
std::static_pointer_cast<IndexWork>(shared_from_this()));
app.postOnBackgroundThread(
[&app, weak]() {
[&app, &ctx, weak]() {
auto self = weak.lock();
if (!self || self->isAborting())
{
Expand Down Expand Up @@ -80,20 +86,28 @@ IndexBucketsWork::IndexWork::postWork()

if (!self->mIndex)
{
self->mIndex = BucketIndex::createIndex(
bm, self->mBucket->getFilename(), self->mBucket->getHash());
self->mIndex =
BucketIndex::createIndex(bm, self->mBucket->getFilename(),
self->mBucket->getHash(), ctx);
}

app.postOnMainThread(
[weak]() {
auto self = weak.lock();
if (self)
{
self->mDone = true;
if (!self->isAborting())
if (self->mIndex)
{
self->mState = BasicWork::State::WORK_SUCCESS;
if (!self->isAborting())
{
self->mApp.getBucketManager().maybeSetIndex(
self->mBucket, std::move(self->mIndex));
}
}
else
{
self->mApp.getBucketManager().maybeSetIndex(
self->mBucket, std::move(self->mIndex));
self->mState = BasicWork::State::WORK_FAILURE;
}
self->wakeUp();
}
Expand Down
4 changes: 3 additions & 1 deletion src/catchup/IndexBucketsWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#pragma once

#include "work/BasicWork.h"
#include "work/Work.h"
#include <memory>

Expand All @@ -20,7 +21,7 @@ class IndexBucketsWork : public Work
{
std::shared_ptr<Bucket> mBucket;
std::unique_ptr<BucketIndex const> mIndex;
bool mDone{false};
BasicWork::State mState{BasicWork::State::WORK_WAITING};

void postWork();

Expand All @@ -30,6 +31,7 @@ class IndexBucketsWork : public Work
protected:
State onRun() override;
bool onAbort() override;
void onReset() override;
};

std::vector<std::shared_ptr<Bucket>> const& mBuckets;
Expand Down
81 changes: 81 additions & 0 deletions src/util/BufferedAsioCerealOutputArchive.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#pragma once

#include "util/XDRStream.h"
#include <cereal/archives/binary.hpp>
#include <cereal/cereal.hpp>

namespace cereal
{

// Mirrors CEREAL_ARCHIVE_RESTRICT from cereal/details/traits.hpp for single
// types
#define CEREAL_ARCHIVE_RESTRICT_SINGLE_TYPE(TYPE) \
typename std::enable_if< \
cereal::traits::is_same_archive<Archive, TYPE>::value, void>::type

// This is a basic reimplementation of BinaryOutputArchive
// (cereal/archives/binary.hpp) that uses our own OutputFileStream instead of
// std::ofstream for writes in order to support fsync. For input we can just use
// cereal's BinaryInputArchive because we don't care about fsync for reads.
class BufferedAsioOutputArchive
: public OutputArchive<BufferedAsioOutputArchive, AllowEmptyClassElision>
{
public:
// Construct, outputting to the provided stream
// @param stream The stream to output to. Can be a stringstream, a file
// stream, or even cout!
BufferedAsioOutputArchive(stellar::OutputFileStream& stream)
: OutputArchive<BufferedAsioOutputArchive, AllowEmptyClassElision>(this)
, itsStream(stream)
{
}

~BufferedAsioOutputArchive() CEREAL_NOEXCEPT = default;

// Writes size bytes of data to the output stream
void
saveBinary(const void* data, std::streamsize size)
{
itsStream.writeBytes(static_cast<char const*>(data), size);
}

private:
stellar::OutputFileStream& itsStream;
};

// Saving for POD types to binary
template <class T>
inline typename std::enable_if<std::is_arithmetic<T>::value, void>::type
CEREAL_SAVE_FUNCTION_NAME(BufferedAsioOutputArchive& ar, T const& t)
{
ar.saveBinary(std::addressof(t), sizeof(t));
}

// Serializing NVP types to binary
template <class Archive, class T>

inline CEREAL_ARCHIVE_RESTRICT_SINGLE_TYPE(BufferedAsioOutputArchive)
CEREAL_SERIALIZE_FUNCTION_NAME(Archive& ar, NameValuePair<T>& t)
{
ar(t.value);
}

// Serializing SizeTags to binary
template <class Archive, class T>
inline CEREAL_ARCHIVE_RESTRICT_SINGLE_TYPE(BufferedAsioOutputArchive)
CEREAL_SERIALIZE_FUNCTION_NAME(Archive& ar, SizeTag<T>& t)
{
ar(t.size);
}

// Saving binary data
template <class T>
inline void
CEREAL_SAVE_FUNCTION_NAME(BufferedAsioOutputArchive& ar,
BinaryData<T> const& bd)
{
ar.saveBinary(bd.data, static_cast<std::streamsize>(bd.size));
}
}

CEREAL_REGISTER_ARCHIVE(cereal::BufferedAsioOutputArchive)
Loading

0 comments on commit 1f5ab3e

Please sign in to comment.