Skip to content

Commit

Permalink
Correctly handle the cache in the presence of SPARQL UPDATE (ad-freib…
Browse files Browse the repository at this point in the history
…urg#1646)

An update can invalidate a cached query result in the sense that if one would run the query again after the update, the result may be different. This was ignored so far, and is now considered as follows: Each `LocatedTriplesSnapshot` gets its own "index" (starting from zero and then incremented for each new snaphot). That index becomes part of the cache key. That way, a query will make use of a cached result if and only if there was no update between the time of the query and the time when the cached result was computed.
  • Loading branch information
joka921 authored and realHannes committed Dec 2, 2024
1 parent b242221 commit 792dd39
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 21 deletions.
9 changes: 5 additions & 4 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ ProtoResult Operation::runComputation(const ad_utility::Timer& timer,
// _____________________________________________________________________________
CacheValue Operation::runComputationAndPrepareForCache(
const ad_utility::Timer& timer, ComputationMode computationMode,
const std::string& cacheKey, bool pinned) {
const QueryCacheKey& cacheKey, bool pinned) {
auto& cache = _executionContext->getQueryTreeCache();
auto result = runComputation(timer, computationMode);
if (!result.isFullyMaterialized() &&
Expand Down Expand Up @@ -235,7 +235,8 @@ std::shared_ptr<const Result> Operation::getResult(
signalQueryUpdate();
}
auto& cache = _executionContext->getQueryTreeCache();
const string cacheKey = getCacheKey();
const QueryCacheKey cacheKey = {
getCacheKey(), _executionContext->locatedTriplesSnapshot().index_};
const bool pinFinalResultButNotSubtrees =
_executionContext->_pinResult && isRoot;
const bool pinResult =
Expand Down Expand Up @@ -455,8 +456,8 @@ void Operation::createRuntimeInfoFromEstimates(
}
_runtimeInfo->multiplicityEstimates_ = multiplicityEstimates;

auto cachedResult =
_executionContext->getQueryTreeCache().getIfContained(getCacheKey());
auto cachedResult = _executionContext->getQueryTreeCache().getIfContained(
{getCacheKey(), locatedTriplesSnapshot().index_});
if (cachedResult.has_value()) {
const auto& [resultPointer, cacheStatus] = cachedResult.value();
_runtimeInfo->cacheStatus_ = cacheStatus;
Expand Down
2 changes: 1 addition & 1 deletion src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class Operation {
// into the cache.
CacheValue runComputationAndPrepareForCache(const ad_utility::Timer& timer,
ComputationMode computationMode,
const std::string& cacheKey,
const QueryCacheKey& cacheKey,
bool pinned);

// Create and store the complete runtime information for this operation after
Expand Down
22 changes: 21 additions & 1 deletion src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "util/ConcurrentCache.h"
#include "util/Synchronized.h"

// The value of the `QueryResultCache` below. It consists of a `Result` together
// with its `RuntimeInfo`.
class CacheValue {
private:
std::shared_ptr<Result> result_;
Expand Down Expand Up @@ -61,11 +63,29 @@ class CacheValue {
};
};

// The key for the `QueryResultCache` below. It consists of a `string` (the
// actual cache key of a `QueryExecutionTree` and the index of the
// `LocatedTriplesSnapshot` that was used to create the corresponding value.
// That way, two identical trees with different snapshot indices will have a
// different cache key. This has the (desired!) effect that UPDATE requests
// correctly invalidate preexisting cache results.
struct QueryCacheKey {
std::string key_;
size_t locatedTriplesSnapshotIndex_;

bool operator==(const QueryCacheKey&) const = default;

template <typename H>
friend H AbslHashValue(H h, const QueryCacheKey& key) {
return H::combine(std::move(h), key.key_, key.locatedTriplesSnapshotIndex_);
}
};

// Threadsafe LRU cache for (partial) query results, that
// checks on insertion, if the result is currently being computed
// by another query.
using QueryResultCache = ad_utility::ConcurrentCache<
ad_utility::LRUCache<string, CacheValue, CacheValue::SizeGetter>>;
ad_utility::LRUCache<QueryCacheKey, CacheValue, CacheValue::SizeGetter>>;

// Execution context for queries.
// Holds references to index and engine, implements caching.
Expand Down
3 changes: 2 additions & 1 deletion src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ void QueryExecutionTree::readFromCache() {
return;
}
auto& cache = qec_->getQueryTreeCache();
auto res = cache.getIfContained(getCacheKey());
auto res = cache.getIfContained(
{getCacheKey(), qec_->locatedTriplesSnapshot().index_});
if (res.has_value()) {
cachedResult_ = res->_resultPointer->resultTablePtr();
}
Expand Down
5 changes: 5 additions & 0 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,11 @@ void Server::processUpdateImpl(
<< std::endl;
LOG(DEBUG) << "Runtime Info:\n"
<< qet.getRootOperation()->runtimeInfo().toString() << std::endl;

// Clear the cache, because all cache entries have been invalidated by the
// update anyway (The index of the located triples snapshot is part of the
// cache key).
cache_.clearAll();
}

// ____________________________________________________________________________
Expand Down
8 changes: 5 additions & 3 deletions src/index/DeltaTriples.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,14 @@ LocatedTriplesSnapshot::getLocatedTriplesForPermutation(
}

// ____________________________________________________________________________
SharedLocatedTriplesSnapshot DeltaTriples::getSnapshot() const {
SharedLocatedTriplesSnapshot DeltaTriples::getSnapshot() {
// NOTE: Both members of the `LocatedTriplesSnapshot` are copied, but the
// `localVocab_` has no copy constructor (in order to avoid accidental
// copies), hence the explicit `clone`.
auto snapshotIndex = nextSnapshotIndex_;
++nextSnapshotIndex_;
return SharedLocatedTriplesSnapshot{std::make_shared<LocatedTriplesSnapshot>(
locatedTriples(), localVocab_.clone())};
locatedTriples(), localVocab_.clone(), snapshotIndex)};
}

// ____________________________________________________________________________
Expand All @@ -193,7 +195,7 @@ DeltaTriples::DeltaTriples(const Index& index)
// ____________________________________________________________________________
DeltaTriplesManager::DeltaTriplesManager(const IndexImpl& index)
: deltaTriples_{index},
currentLocatedTriplesSnapshot_{deltaTriples_.rlock()->getSnapshot()} {}
currentLocatedTriplesSnapshot_{deltaTriples_.wlock()->getSnapshot()} {}

// _____________________________________________________________________________
void DeltaTriplesManager::modify(
Expand Down
5 changes: 4 additions & 1 deletion src/index/DeltaTriples.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ using LocatedTriplesPerBlockAllPermutations =
struct LocatedTriplesSnapshot {
LocatedTriplesPerBlockAllPermutations locatedTriplesPerBlock_;
LocalVocab localVocab_;
// A unique index for this snapshot that is used in the query cache.
size_t index_;
// Get `TripleWithPosition` objects for given permutation.
const LocatedTriplesPerBlock& getLocatedTriplesForPermutation(
Permutation::Enum permutation) const;
Expand Down Expand Up @@ -63,6 +65,7 @@ class DeltaTriples {
private:
// The index to which these triples are added.
const IndexImpl& index_;
size_t nextSnapshotIndex_ = 0;

// The located triples for all the 6 permutations.
LocatedTriplesPerBlockAllPermutations locatedTriples_;
Expand Down Expand Up @@ -140,7 +143,7 @@ class DeltaTriples {
// Return a deep copy of the `LocatedTriples` and the corresponding
// `LocalVocab` which form a snapshot of the current status of this
// `DeltaTriples` object.
SharedLocatedTriplesSnapshot getSnapshot() const;
SharedLocatedTriplesSnapshot getSnapshot();

// Register the original `metadata` for the given `permutation`. This has to
// be called before any updates are processed.
Expand Down
2 changes: 1 addition & 1 deletion src/index/Permutation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ const Permutation& Permutation::getActualPermutation(Id id) const {
// ______________________________________________________________________
const LocatedTriplesPerBlock& Permutation::getLocatedTriplesForPermutation(
const LocatedTriplesSnapshot& locatedTriplesSnapshot) const {
static const LocatedTriplesSnapshot emptySnapshot;
static const LocatedTriplesSnapshot emptySnapshot{{}, {}, 0};
const auto& actualSnapshot =
isInternalPermutation_ ? emptySnapshot : locatedTriplesSnapshot;
return actualSnapshot.getLocatedTriplesForPermutation(permutation_);
Expand Down
33 changes: 24 additions & 9 deletions test/OperationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,12 @@ TEST(Operation, verifyLimitIsProperlyAppliedAndUpdatesRuntimeInfoCorrectly) {
expectRtiHasDimensions(childRti, 2, 3);
}

namespace {
QueryCacheKey makeQueryCacheKey(std::string s) {
return {std::move(s), 102394857};
}
} // namespace

// _____________________________________________________________________________
TEST(Operation, ensureLazyOperationIsCachedIfSmallEnough) {
auto qec = getQec();
Expand All @@ -536,14 +542,17 @@ TEST(Operation, ensureLazyOperationIsCachedIfSmallEnough) {
ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started};

auto cacheValue = valuesForTesting.runComputationAndPrepareForCache(
timer, ComputationMode::LAZY_IF_SUPPORTED, "test", false);
EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
timer, ComputationMode::LAZY_IF_SUPPORTED, makeQueryCacheKey("test"),
false);
EXPECT_FALSE(
qec->getQueryTreeCache().cacheContains(makeQueryCacheKey("test")));

for ([[maybe_unused]] Result::IdTableVocabPair& _ :
cacheValue.resultTable().idTables()) {
}

auto aggregatedValue = qec->getQueryTreeCache().getIfContained("test");
auto aggregatedValue =
qec->getQueryTreeCache().getIfContained(makeQueryCacheKey("test"));
ASSERT_TRUE(aggregatedValue.has_value());

ASSERT_TRUE(aggregatedValue.value()._resultPointer);
Expand Down Expand Up @@ -588,15 +597,18 @@ TEST(Operation, checkLazyOperationIsNotCachedIfTooLarge) {
qec->getQueryTreeCache().setMaxSizeSingleEntry(1_B);

auto cacheValue = valuesForTesting.runComputationAndPrepareForCache(
timer, ComputationMode::LAZY_IF_SUPPORTED, "test", false);
EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
timer, ComputationMode::LAZY_IF_SUPPORTED, makeQueryCacheKey("test"),
false);
EXPECT_FALSE(
qec->getQueryTreeCache().cacheContains(makeQueryCacheKey("test")));
qec->getQueryTreeCache().setMaxSizeSingleEntry(originalSize);

for ([[maybe_unused]] Result::IdTableVocabPair& _ :
cacheValue.resultTable().idTables()) {
}

EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
EXPECT_FALSE(
qec->getQueryTreeCache().cacheContains(makeQueryCacheKey("test")));
}

// _____________________________________________________________________________
Expand All @@ -612,12 +624,15 @@ TEST(Operation, checkLazyOperationIsNotCachedIfUnlikelyToFitInCache) {
ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started};

auto cacheValue = valuesForTesting.runComputationAndPrepareForCache(
timer, ComputationMode::LAZY_IF_SUPPORTED, "test", false);
EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
timer, ComputationMode::LAZY_IF_SUPPORTED, makeQueryCacheKey("test"),
false);
EXPECT_FALSE(
qec->getQueryTreeCache().cacheContains(makeQueryCacheKey("test")));

for ([[maybe_unused]] Result::IdTableVocabPair& _ :
cacheValue.resultTable().idTables()) {
}

EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
EXPECT_FALSE(
qec->getQueryTreeCache().cacheContains(makeQueryCacheKey("test")));
}

0 comments on commit 792dd39

Please sign in to comment.