Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BucketList cache #4565

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/bucket/BucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ class BucketIndex : public NonMovableOrCopyable
IndividualIndex::const_iterator>;

inline static const std::string DB_BACKEND_STATE = "bl";
inline static const uint32_t BUCKET_INDEX_VERSION = 4;
inline static const uint32_t BUCKET_INDEX_VERSION = 5;
inline static const uint32_t CACHE_SIZE = 1'000'000;
inline static const uint64_t INDIVIDUAL_CACHE_CUTOFF_SIZE = 100'000'000'000;

// Returns true if LedgerEntryType not supported by BucketListDB
static bool typeNotSupported(LedgerEntryType t);
Expand Down Expand Up @@ -130,13 +132,21 @@ class BucketIndex : public NonMovableOrCopyable
virtual std::optional<std::pair<std::streamoff, std::streamoff>>
getOfferRange() const = 0;

// Returns true if cache hit occurred
virtual std::pair<std::shared_ptr<BucketEntry>, bool>
getFromCache(LedgerKey const& k) const = 0;

virtual void addToCache(std::shared_ptr<BucketEntry> be) const = 0;

// Returns page size for index. InidividualIndex returns 0 for page size
virtual std::streamoff getPageSize() const = 0;

virtual Iterator begin() const = 0;

virtual Iterator end() const = 0;

virtual bool isFullyCached() const = 0;

virtual void markBloomMiss() const = 0;
virtual void markBloomLookup() const = 0;
virtual BucketEntryCounters const& getBucketEntryCounters() const = 0;
Expand Down
78 changes: 75 additions & 3 deletions src/bucket/BucketIndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "util/LogSlowExecution.h"
#include "util/Logging.h"
#include "util/XDRStream.h"
#include "xdr/Stellar-ledger-entries.h"

#include <Tracy.hpp>
#include <cereal/archives/binary.hpp>
Expand All @@ -28,6 +29,8 @@
#include <fmt/format.h>

#include <memory>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <type_traits>
#include <xdrpp/marshal.h>
Expand Down Expand Up @@ -91,12 +94,15 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
auto timer = LogSlowExecution("Indexing bucket");
mData.pageSize = pageSize;

auto fileSize = std::filesystem::file_size(filename);
bool inMemoryMap = fileSize < INDIVIDUAL_CACHE_CUTOFF_SIZE &&
std::is_same_v<BucketEntryT, BucketEntry>;

// We don't have a good way of estimating IndividualIndex size since
// keys are variable size, so only reserve range indexes since we know
// the page size ahead of time
if constexpr (std::is_same<IndexT, RangeIndex>::value)
if (std::is_same_v<IndexT, RangeIndex>)
{
auto fileSize = std::filesystem::file_size(filename);
auto estimatedIndexEntries = fileSize / mData.pageSize;
mData.keysToOffset.reserve(estimatedIndexEntries);
}
Expand Down Expand Up @@ -187,6 +193,15 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
}
}

if constexpr (std::is_same_v<BucketEntryT, LiveBucket::EntryT>)
{
if (inMemoryMap)
{
mData.inMemoryMap[key] =
std::make_shared<BucketEntry>(be);
}
}

if constexpr (std::is_same_v<IndexT, RangeIndex>)
{
auto keyBuf = xdr::xdr_to_opaque(key);
Expand Down Expand Up @@ -299,7 +314,8 @@ template <class IndexT>
template <class Archive>
BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager const& bm, Archive& ar,
std::streamoff pageSize)
: mBloomMissMeter(bm.getBloomMissMeter())
: mData()
, mBloomMissMeter(bm.getBloomMissMeter())
, mBloomLookupMeter(bm.getBloomLookupMeter())
{
mData.pageSize = pageSize;
Expand Down Expand Up @@ -572,6 +588,48 @@ BucketIndexImpl<IndexT>::getOfferRange() const
return getOffsetBounds(lowerBound, upperBound);
}

template <class IndexT>
std::pair<std::shared_ptr<BucketEntry>, bool>
BucketIndexImpl<IndexT>::getFromCache(LedgerKey const& k) const
{
if (mData.inMemoryMap.empty())
{
std::lock_guard<std::shared_mutex> lock(mData.cacheLock);
auto* ptr = mData.inMemoryCache.maybeGet(k);
if (ptr)
{
return {*ptr, true};
}
else
{
return {nullptr, false};
}
}
else
{
auto iter = mData.inMemoryMap.find(k);
if (iter == mData.inMemoryMap.end())
{
return {nullptr, true};
}
else
{
return {iter->second, true};
}
}
}

template <class IndexT>
void
BucketIndexImpl<IndexT>::addToCache(std::shared_ptr<BucketEntry> be) const
{
if (mData.inMemoryMap.empty())
{
std::unique_lock<std::shared_mutex> lock(mData.cacheLock);
mData.inMemoryCache.put(getBucketLedgerKey(*be), be);
}
}

#ifdef BUILD_TESTS
template <class IndexT>
bool
Expand Down Expand Up @@ -620,6 +678,20 @@ BucketIndexImpl<IndexT>::operator==(BucketIndex const& inRaw) const
return false;
}

if (mData.inMemoryMap.size() != in.mData.inMemoryMap.size())
{
return false;
}

for (auto const& [key, entry] : mData.inMemoryMap)
{
auto iter = in.mData.inMemoryMap.find(key);
if (iter == in.mData.inMemoryMap.end() || !(*entry == *iter->second))
{
return false;
}
}

return true;
}
#endif
Expand Down
29 changes: 26 additions & 3 deletions src/bucket/BucketIndexImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@

#include "bucket/BucketIndex.h"
#include "bucket/LiveBucket.h"
#include "ledger/LedgerHashUtils.h"
#include "medida/meter.h"
#include "util/BinaryFuseFilter.h"
#include "util/RandomEvictionCache.h"
#include "xdr/Stellar-ledger-entries.h"
#include "xdr/Stellar-types.h"

#include "util/BufferedAsioCerealOutputArchive.h"
#include <cereal/types/map.hpp>
#include <map>
#include <memory>
#include <shared_mutex>

namespace stellar
{
Expand All @@ -34,15 +38,19 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
std::streamoff pageSize{};
std::unique_ptr<BinaryFuseFilter16> filter{};
std::map<Asset, std::vector<PoolID>> assetToPoolID{};
std::map<LedgerKey, std::shared_ptr<BucketEntry>> inMemoryMap;
mutable RandomEvictionCache<LedgerKey, std::shared_ptr<BucketEntry>>
inMemoryCache;
mutable std::shared_mutex cacheLock;
BucketEntryCounters counters{};

template <class Archive>
void
save(Archive& ar) const
{
auto version = BUCKET_INDEX_VERSION;
ar(version, pageSize, assetToPoolID, keysToOffset, filter,
counters);
ar(version, pageSize, assetToPoolID, keysToOffset, filter, counters,
inMemoryMap);
}

// Note: version and pageSize must be loaded before this function is
Expand All @@ -53,7 +61,11 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
void
load(Archive& ar)
{
ar(assetToPoolID, keysToOffset, filter, counters);
ar(assetToPoolID, keysToOffset, filter, counters, inMemoryMap);
}

SerializableBucketIndex() : inMemoryCache(CACHE_SIZE)
{
}
} mData;

Expand Down Expand Up @@ -100,6 +112,11 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
virtual std::optional<std::pair<std::streamoff, std::streamoff>>
getOfferRange() const override;

virtual std::pair<std::shared_ptr<BucketEntry>, bool>
getFromCache(LedgerKey const& k) const override;

virtual void addToCache(std::shared_ptr<BucketEntry> be) const override;

virtual std::streamoff
getPageSize() const override
{
Expand All @@ -118,6 +135,12 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
return mData.keysToOffset.end();
}

virtual bool
isFullyCached() const override
{
return !mData.inMemoryMap.empty();
}

virtual void markBloomMiss() const override;
virtual void markBloomLookup() const override;
virtual BucketEntryCounters const& getBucketEntryCounters() const override;
Expand Down
100 changes: 64 additions & 36 deletions src/bucket/BucketSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ BucketSnapshotBase<BucketT>::getEntryAtOffset(LedgerKey const& k,
}
else if (stream.readPage(be, k, pageSize))
{
return {std::make_shared<typename BucketT::EntryT>(be), false};
auto ret = std::make_shared<typename BucketT::EntryT>(be);
if constexpr (std::is_same_v<BucketT, LiveBucket>)
{
mBucket->getIndex().addToCache(ret);
}
return {ret, false};
}

// Mark entry miss for metrics
Expand All @@ -81,6 +86,15 @@ BucketSnapshotBase<BucketT>::getBucketEntry(LedgerKey const& k) const
return {nullptr, false};
}

if constexpr (std::is_same_v<BucketT, LiveBucket>)
{
auto [entryOp, hit] = mBucket->getIndex().getFromCache(k);
if (hit)
{
return {entryOp, false};
}
}

auto pos = mBucket->getIndex().lookup(k);
if (pos.has_value())
{
Expand Down Expand Up @@ -111,53 +125,67 @@ BucketSnapshotBase<BucketT>::loadKeys(
auto currKeyIt = keys.begin();
auto const& index = mBucket->getIndex();
auto indexIter = index.begin();
while (currKeyIt != keys.end() && indexIter != index.end())

while (currKeyIt != keys.end() &&
(indexIter != index.end() || index.isFullyCached()))
{
auto [offOp, newIndexIter] = index.scan(indexIter, *currKeyIt);
indexIter = newIndexIter;
if (offOp)
std::shared_ptr<typename BucketT::EntryT> entryOp{};
bool cacheHit = false;
if constexpr (std::is_same_v<BucketT, LiveBucket>)
{
std::tie(entryOp, cacheHit) = index.getFromCache(*currKeyIt);
}

if (!cacheHit)
{
auto [entryOp, bloomMiss] = getEntryAtOffset(
std::optional<std::streamoff> offOp{};
auto bloomMiss = false;
std::tie(offOp, indexIter) = index.scan(indexIter, *currKeyIt);
if (!offOp)
{
++currKeyIt;
continue;
}

std::tie(entryOp, bloomMiss) = getEntryAtOffset(
*currKeyIt, *offOp, mBucket->getIndex().getPageSize());
}

if (entryOp)
if (entryOp)
{
// Don't return tombstone entries, as these do not exist wrt
// ledger state
if (!BucketT::isTombstoneEntry(*entryOp))
{
// Don't return tombstone entries, as these do not exist wrt
// ledger state
if (!BucketT::isTombstoneEntry(*entryOp))
// Only live bucket loads can be metered
if constexpr (std::is_same_v<BucketT, LiveBucket>)
{
// Only live bucket loads can be metered
if constexpr (std::is_same_v<BucketT, LiveBucket>)
bool addEntry = true;
if (lkMeter)
{
bool addEntry = true;
if (lkMeter)
{
// Here, we are metering after the entry has been
// loaded. This is because we need to know the size
// of the entry to meter it. Future work will add
// metering at the xdr level.
auto entrySize =
xdr::xdr_size(entryOp->liveEntry());
addEntry = lkMeter->canLoad(*currKeyIt, entrySize);
lkMeter->updateReadQuotasForKey(*currKeyIt,
entrySize);
}
if (addEntry)
{
result.push_back(entryOp->liveEntry());
}
// Here, we are metering after the entry has been
// loaded. This is because we need to know the size
// of the entry to meter it. Future work will add
// metering at the xdr level.
auto entrySize = xdr::xdr_size(entryOp->liveEntry());
addEntry = lkMeter->canLoad(*currKeyIt, entrySize);
lkMeter->updateReadQuotasForKey(*currKeyIt, entrySize);
}
else
if (addEntry)
{
static_assert(std::is_same_v<BucketT, HotArchiveBucket>,
"unexpected bucket type");
result.push_back(*entryOp);
result.push_back(entryOp->liveEntry());
}
}

currKeyIt = keys.erase(currKeyIt);
continue;
else
{
static_assert(std::is_same_v<BucketT, HotArchiveBucket>,
"unexpected bucket type");
result.push_back(*entryOp);
}
}

currKeyIt = keys.erase(currKeyIt);
continue;
}

++currKeyIt;
Expand Down
2 changes: 1 addition & 1 deletion src/main/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ Config::Config() : NODE_SEED(SecretKey::random())
DEPRECATED_SQL_LEDGER_STATE = false;
BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT = 14; // 2^14 == 16 kb
BUCKETLIST_DB_INDEX_CUTOFF = 20; // 20 mb
BUCKETLIST_DB_PERSIST_INDEX = true;
BUCKETLIST_DB_PERSIST_INDEX = false;
BACKGROUND_EVICTION_SCAN = true;
PUBLISH_TO_ARCHIVE_DELAY = std::chrono::seconds{0};
// automatic maintenance settings:
Expand Down
Loading