Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly handle the cache in the presence of SPARQL UPDATE #1646

Merged
merged 6 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ ProtoResult Operation::runComputation(const ad_utility::Timer& timer,
// _____________________________________________________________________________
CacheValue Operation::runComputationAndPrepareForCache(
const ad_utility::Timer& timer, ComputationMode computationMode,
const std::string& cacheKey, bool pinned) {
const QueryCacheKey& cacheKey, bool pinned) {
auto& cache = _executionContext->getQueryTreeCache();
auto result = runComputation(timer, computationMode);
if (!result.isFullyMaterialized() &&
Expand Down Expand Up @@ -233,7 +233,8 @@ std::shared_ptr<const Result> Operation::getResult(
signalQueryUpdate();
}
auto& cache = _executionContext->getQueryTreeCache();
const string cacheKey = getCacheKey();
const QueryCacheKey cacheKey = {
getCacheKey(), _executionContext->locatedTriplesSnapshot().index_};
const bool pinFinalResultButNotSubtrees =
_executionContext->_pinResult && isRoot;
const bool pinResult =
Expand Down Expand Up @@ -453,8 +454,8 @@ void Operation::createRuntimeInfoFromEstimates(
}
_runtimeInfo->multiplicityEstimates_ = multiplicityEstimates;

auto cachedResult =
_executionContext->getQueryTreeCache().getIfContained(getCacheKey());
auto cachedResult = _executionContext->getQueryTreeCache().getIfContained(
{getCacheKey(), locatedTriplesSnapshot().index_});
if (cachedResult.has_value()) {
const auto& [resultPointer, cacheStatus] = cachedResult.value();
_runtimeInfo->cacheStatus_ = cacheStatus;
Expand Down
2 changes: 1 addition & 1 deletion src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class Operation {
// into the cache.
CacheValue runComputationAndPrepareForCache(const ad_utility::Timer& timer,
ComputationMode computationMode,
const std::string& cacheKey,
const QueryCacheKey& cacheKey,
bool pinned);

// Create and store the complete runtime information for this operation after
Expand Down
22 changes: 21 additions & 1 deletion src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "util/ConcurrentCache.h"
#include "util/Synchronized.h"

// The value of the `QueryResultCache` below. It consists of a `Result` together
// with its `RuntimeInfo`.
class CacheValue {
private:
std::shared_ptr<Result> result_;
Expand Down Expand Up @@ -61,11 +63,29 @@ class CacheValue {
};
};

// The key for the `QueryResultCache` below. It consists of a `string` (the
// actual cache key of a `QueryExecutionTree` and the index of the
// `LocatedTriplesSnapshot` that was used to create the corresponding value.
// That way, two identical trees with different snapshot indices will have a
// different cache key. This has the (desired!) effect that UPDATE requests
// correctly invalidate preexisting cache results.
struct QueryCacheKey {
std::string key_;
size_t locatedTriplesSnapshotIndex_;

bool operator==(const QueryCacheKey&) const = default;

template <typename H>
friend H AbslHashValue(H h, const QueryCacheKey& key) {
return H::combine(std::move(h), key.key_, key.locatedTriplesSnapshotIndex_);
}
};

// Threadsafe LRU cache for (partial) query results, that
// checks on insertion, if the result is currently being computed
// by another query.
using QueryResultCache = ad_utility::ConcurrentCache<
ad_utility::LRUCache<string, CacheValue, CacheValue::SizeGetter>>;
ad_utility::LRUCache<QueryCacheKey, CacheValue, CacheValue::SizeGetter>>;

// Execution context for queries.
// Holds references to index and engine, implements caching.
Expand Down
3 changes: 2 additions & 1 deletion src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ void QueryExecutionTree::readFromCache() {
return;
}
auto& cache = qec_->getQueryTreeCache();
auto res = cache.getIfContained(getCacheKey());
auto res = cache.getIfContained(
{getCacheKey(), qec_->locatedTriplesSnapshot().index_});
if (res.has_value()) {
cachedResult_ = res->_resultPointer->resultTablePtr();
}
Expand Down
5 changes: 5 additions & 0 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,11 @@
<< std::endl;
LOG(DEBUG) << "Runtime Info:\n"
<< qet.getRootOperation()->runtimeInfo().toString() << std::endl;

// Clear the cache, because all cache entries have been invalidated by the
// update anyway (The index of the located triples snapshot is part of the
// cache key).
cache_.clearAll();

Check warning on line 922 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L922

Added line #L922 was not covered by tests
}

// ____________________________________________________________________________
Expand Down
8 changes: 5 additions & 3 deletions src/index/DeltaTriples.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,14 @@ LocatedTriplesSnapshot::getLocatedTriplesForPermutation(
}

// ____________________________________________________________________________
SharedLocatedTriplesSnapshot DeltaTriples::getSnapshot() const {
SharedLocatedTriplesSnapshot DeltaTriples::getSnapshot() {
// NOTE: Both members of the `LocatedTriplesSnapshot` are copied, but the
// `localVocab_` has no copy constructor (in order to avoid accidental
// copies), hence the explicit `clone`.
auto snapshotIndex = nextSnapshotIndex_;
++nextSnapshotIndex_;
return SharedLocatedTriplesSnapshot{std::make_shared<LocatedTriplesSnapshot>(
locatedTriples(), localVocab_.clone())};
locatedTriples(), localVocab_.clone(), snapshotIndex)};
}

// ____________________________________________________________________________
Expand All @@ -193,7 +195,7 @@ DeltaTriples::DeltaTriples(const Index& index)
// ____________________________________________________________________________
DeltaTriplesManager::DeltaTriplesManager(const IndexImpl& index)
: deltaTriples_{index},
currentLocatedTriplesSnapshot_{deltaTriples_.rlock()->getSnapshot()} {}
currentLocatedTriplesSnapshot_{deltaTriples_.wlock()->getSnapshot()} {}

// _____________________________________________________________________________
void DeltaTriplesManager::modify(
Expand Down
5 changes: 4 additions & 1 deletion src/index/DeltaTriples.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ using LocatedTriplesPerBlockAllPermutations =
struct LocatedTriplesSnapshot {
LocatedTriplesPerBlockAllPermutations locatedTriplesPerBlock_;
LocalVocab localVocab_;
// A unique index for this snapshot that is used in the query cache.
size_t index_;
// Get `TripleWithPosition` objects for given permutation.
const LocatedTriplesPerBlock& getLocatedTriplesForPermutation(
Permutation::Enum permutation) const;
Expand Down Expand Up @@ -63,6 +65,7 @@ class DeltaTriples {
private:
// The index to which these triples are added.
const IndexImpl& index_;
size_t nextSnapshotIndex_ = 0;

// The located triples for all the 6 permutations.
LocatedTriplesPerBlockAllPermutations locatedTriples_;
Expand Down Expand Up @@ -140,7 +143,7 @@ class DeltaTriples {
// Return a deep copy of the `LocatedTriples` and the corresponding
// `LocalVocab` which form a snapshot of the current status of this
// `DeltaTriples` object.
SharedLocatedTriplesSnapshot getSnapshot() const;
SharedLocatedTriplesSnapshot getSnapshot();

// Register the original `metadata` for the given `permutation`. This has to
// be called before any updates are processed.
Expand Down
2 changes: 1 addition & 1 deletion src/index/Permutation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ const Permutation& Permutation::getActualPermutation(Id id) const {
// ______________________________________________________________________
const LocatedTriplesPerBlock& Permutation::getLocatedTriplesForPermutation(
const LocatedTriplesSnapshot& locatedTriplesSnapshot) const {
static const LocatedTriplesSnapshot emptySnapshot;
static const LocatedTriplesSnapshot emptySnapshot{{}, {}, 0};
const auto& actualSnapshot =
isInternalPermutation_ ? emptySnapshot : locatedTriplesSnapshot;
return actualSnapshot.getLocatedTriplesForPermutation(permutation_);
Expand Down
33 changes: 24 additions & 9 deletions test/OperationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,12 @@ TEST(Operation, verifyLimitIsProperlyAppliedAndUpdatesRuntimeInfoCorrectly) {
expectRtiHasDimensions(childRti, 2, 3);
}

namespace {
QueryCacheKey makeQueryCacheKey(std::string s) {
return {std::move(s), 102394857};
}
} // namespace

// _____________________________________________________________________________
TEST(Operation, ensureLazyOperationIsCachedIfSmallEnough) {
auto qec = getQec();
Expand All @@ -536,14 +542,17 @@ TEST(Operation, ensureLazyOperationIsCachedIfSmallEnough) {
ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started};

auto cacheValue = valuesForTesting.runComputationAndPrepareForCache(
timer, ComputationMode::LAZY_IF_SUPPORTED, "test", false);
EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
timer, ComputationMode::LAZY_IF_SUPPORTED, makeQueryCacheKey("test"),
false);
EXPECT_FALSE(
qec->getQueryTreeCache().cacheContains(makeQueryCacheKey("test")));

for ([[maybe_unused]] Result::IdTableVocabPair& _ :
cacheValue.resultTable().idTables()) {
}

auto aggregatedValue = qec->getQueryTreeCache().getIfContained("test");
auto aggregatedValue =
qec->getQueryTreeCache().getIfContained(makeQueryCacheKey("test"));
ASSERT_TRUE(aggregatedValue.has_value());

ASSERT_TRUE(aggregatedValue.value()._resultPointer);
Expand Down Expand Up @@ -588,15 +597,18 @@ TEST(Operation, checkLazyOperationIsNotCachedIfTooLarge) {
qec->getQueryTreeCache().setMaxSizeSingleEntry(1_B);

auto cacheValue = valuesForTesting.runComputationAndPrepareForCache(
timer, ComputationMode::LAZY_IF_SUPPORTED, "test", false);
EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
timer, ComputationMode::LAZY_IF_SUPPORTED, makeQueryCacheKey("test"),
false);
EXPECT_FALSE(
qec->getQueryTreeCache().cacheContains(makeQueryCacheKey("test")));
qec->getQueryTreeCache().setMaxSizeSingleEntry(originalSize);

for ([[maybe_unused]] Result::IdTableVocabPair& _ :
cacheValue.resultTable().idTables()) {
}

EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
EXPECT_FALSE(
qec->getQueryTreeCache().cacheContains(makeQueryCacheKey("test")));
}

// _____________________________________________________________________________
Expand All @@ -612,12 +624,15 @@ TEST(Operation, checkLazyOperationIsNotCachedIfUnlikelyToFitInCache) {
ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started};

auto cacheValue = valuesForTesting.runComputationAndPrepareForCache(
timer, ComputationMode::LAZY_IF_SUPPORTED, "test", false);
EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
timer, ComputationMode::LAZY_IF_SUPPORTED, makeQueryCacheKey("test"),
false);
EXPECT_FALSE(
qec->getQueryTreeCache().cacheContains(makeQueryCacheKey("test")));

for ([[maybe_unused]] Result::IdTableVocabPair& _ :
cacheValue.resultTable().idTables()) {
}

EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
EXPECT_FALSE(
qec->getQueryTreeCache().cacheContains(makeQueryCacheKey("test")));
}
Loading