Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly handle the cache in the presence of SPARQL UPDATE #1646

Merged
merged 6 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ ProtoResult Operation::runComputation(const ad_utility::Timer& timer,
// _____________________________________________________________________________
CacheValue Operation::runComputationAndPrepareForCache(
const ad_utility::Timer& timer, ComputationMode computationMode,
const std::string& cacheKey, bool pinned) {
const QueryCacheKey& cacheKey, bool pinned) {
auto& cache = _executionContext->getQueryTreeCache();
auto result = runComputation(timer, computationMode);
if (!result.isFullyMaterialized() &&
Expand Down Expand Up @@ -233,7 +233,8 @@ std::shared_ptr<const Result> Operation::getResult(
signalQueryUpdate();
}
auto& cache = _executionContext->getQueryTreeCache();
const string cacheKey = getCacheKey();
const QueryCacheKey cacheKey = {getCacheKey(),
&_executionContext->locatedTriplesSnapshot()};
const bool pinFinalResultButNotSubtrees =
_executionContext->_pinResult && isRoot;
const bool pinResult =
Expand Down Expand Up @@ -453,8 +454,8 @@ void Operation::createRuntimeInfoFromEstimates(
}
_runtimeInfo->multiplicityEstimates_ = multiplicityEstimates;

auto cachedResult =
_executionContext->getQueryTreeCache().getIfContained(getCacheKey());
auto cachedResult = _executionContext->getQueryTreeCache().getIfContained(
{getCacheKey(), &locatedTriplesSnapshot()});
if (cachedResult.has_value()) {
const auto& [resultPointer, cacheStatus] = cachedResult.value();
_runtimeInfo->cacheStatus_ = cacheStatus;
Expand Down
2 changes: 1 addition & 1 deletion src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class Operation {
// into the cache.
CacheValue runComputationAndPrepareForCache(const ad_utility::Timer& timer,
ComputationMode computationMode,
const std::string& cacheKey,
const QueryCacheKey& cacheKey,
bool pinned);

// Create and store the complete runtime information for this operation after
Expand Down
14 changes: 13 additions & 1 deletion src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,23 @@ class CacheValue {
};
};

struct QueryCacheKey {
std::string key_;
const LocatedTriplesSnapshot* locatedTriplesSnapshotKey_;

bool operator==(const QueryCacheKey&) const = default;

template <typename H>
friend H AbslHashValue(H h, const QueryCacheKey& key) {
return H::combine(std::move(h), key.key_, key.locatedTriplesSnapshotKey_);
}
};

// Threadsafe LRU cache for (partial) query results, that
// checks on insertion, if the result is currently being computed
// by another query.
using QueryResultCache = ad_utility::ConcurrentCache<
ad_utility::LRUCache<string, CacheValue, CacheValue::SizeGetter>>;
ad_utility::LRUCache<QueryCacheKey, CacheValue, CacheValue::SizeGetter>>;

// Execution context for queries.
// Holds references to index and engine, implements caching.
Expand Down
3 changes: 2 additions & 1 deletion src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ void QueryExecutionTree::readFromCache() {
return;
}
auto& cache = qec_->getQueryTreeCache();
auto res = cache.getIfContained(getCacheKey());
auto res =
cache.getIfContained({getCacheKey(), &(qec_->locatedTriplesSnapshot())});
if (res.has_value()) {
cachedResult_ = res->_resultPointer->resultTablePtr();
}
Expand Down
6 changes: 6 additions & 0 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,12 @@
<< std::endl;
LOG(DEBUG) << "Runtime Info:\n"
<< qet.getRootOperation()->runtimeInfo().toString() << std::endl;

// Clear the cache, because all new queries won't benefit from the old cached
// values, which have been invalidated by the UPDATE operation.
// TODO<joka921> Should we do this before or after the above logging?.
// Or even only after we have reported the update as successful?
cache_.clearAll();

Check warning on line 923 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L923

Added line #L923 was not covered by tests
}

// ____________________________________________________________________________
Expand Down
26 changes: 17 additions & 9 deletions test/OperationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,13 @@ TEST(Operation, verifyLimitIsProperlyAppliedAndUpdatesRuntimeInfoCorrectly) {
expectRtiHasDimensions(childRti, 2, 3);
}

namespace {
QueryCacheKey toCacheKey(std::string s) {
return {std::move(s), reinterpret_cast<const LocatedTriplesSnapshot*>(
std::intptr_t{102394857})};
}
} // namespace

// _____________________________________________________________________________
TEST(Operation, ensureLazyOperationIsCachedIfSmallEnough) {
auto qec = getQec();
Expand All @@ -536,14 +543,15 @@ TEST(Operation, ensureLazyOperationIsCachedIfSmallEnough) {
ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started};

auto cacheValue = valuesForTesting.runComputationAndPrepareForCache(
timer, ComputationMode::LAZY_IF_SUPPORTED, "test", false);
EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
timer, ComputationMode::LAZY_IF_SUPPORTED, toCacheKey("test"), false);
EXPECT_FALSE(qec->getQueryTreeCache().cacheContains(toCacheKey("test")));

for ([[maybe_unused]] Result::IdTableVocabPair& _ :
cacheValue.resultTable().idTables()) {
}

auto aggregatedValue = qec->getQueryTreeCache().getIfContained("test");
auto aggregatedValue =
qec->getQueryTreeCache().getIfContained(toCacheKey("test"));
ASSERT_TRUE(aggregatedValue.has_value());

ASSERT_TRUE(aggregatedValue.value()._resultPointer);
Expand Down Expand Up @@ -588,15 +596,15 @@ TEST(Operation, checkLazyOperationIsNotCachedIfTooLarge) {
qec->getQueryTreeCache().setMaxSizeSingleEntry(1_B);

auto cacheValue = valuesForTesting.runComputationAndPrepareForCache(
timer, ComputationMode::LAZY_IF_SUPPORTED, "test", false);
EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
timer, ComputationMode::LAZY_IF_SUPPORTED, toCacheKey("test"), false);
EXPECT_FALSE(qec->getQueryTreeCache().cacheContains(toCacheKey("test")));
qec->getQueryTreeCache().setMaxSizeSingleEntry(originalSize);

for ([[maybe_unused]] Result::IdTableVocabPair& _ :
cacheValue.resultTable().idTables()) {
}

EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
EXPECT_FALSE(qec->getQueryTreeCache().cacheContains(toCacheKey("test")));
}

// _____________________________________________________________________________
Expand All @@ -612,12 +620,12 @@ TEST(Operation, checkLazyOperationIsNotCachedIfUnlikelyToFitInCache) {
ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started};

auto cacheValue = valuesForTesting.runComputationAndPrepareForCache(
timer, ComputationMode::LAZY_IF_SUPPORTED, "test", false);
EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
timer, ComputationMode::LAZY_IF_SUPPORTED, toCacheKey("test"), false);
EXPECT_FALSE(qec->getQueryTreeCache().cacheContains(toCacheKey("test")));

for ([[maybe_unused]] Result::IdTableVocabPair& _ :
cacheValue.resultTable().idTables()) {
}

EXPECT_FALSE(qec->getQueryTreeCache().cacheContains("test"));
EXPECT_FALSE(qec->getQueryTreeCache().cacheContains(toCacheKey("test")));
}
Loading