Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement block prefiltering for Joins of a lazy result + IndexScan #1647

Merged
merged 20 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 202 additions & 4 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "engine/IndexScan.h"

#include <absl/container/inlined_vector.h>
#include <absl/strings/str_join.h>

#include <boost/optional.hpp>
Expand All @@ -14,6 +15,7 @@
#include "parser/ParsedQuery.h"

using std::string;
using LazyScanMetadata = CompressedRelationReader::LazyScanMetadata;

// _____________________________________________________________________________
// Return the number of `Variables` given the `TripleComponent` values for
Expand Down Expand Up @@ -442,15 +444,211 @@
AD_CORRECTNESS_CHECK(numVariables_ <= 3 && numVariables_ > 0);
AD_CONTRACT_CHECK(joinColumn.empty() || !joinColumn[0].isUndefined());

auto metaBlocks1 = getMetadataForScan();
auto metaBlocks = getMetadataForScan();

if (!metaBlocks1.has_value()) {
if (!metaBlocks.has_value()) {
return {};
}
auto blocks = CompressedRelationReader::getBlocksForJoin(joinColumn,
metaBlocks1.value());
metaBlocks.value());

auto result = getLazyScan(blocks);
result.details().numBlocksAll_ = metaBlocks1.value().blockMetadata_.size();
result.details().numBlocksAll_ = metaBlocks.value().blockMetadata_.size();
return result;
}

// _____________________________________________________________________________
void IndexScan::updateRuntimeInfoForLazyScan(const LazyScanMetadata& metadata) {
updateRuntimeInformationWhenOptimizedOut(
RuntimeInformation::Status::lazilyMaterialized);
auto& rti = runtimeInfo();
rti.numRows_ = metadata.numElementsYielded_;
rti.totalTime_ = metadata.blockingTime_;
rti.addDetail("num-blocks-read", metadata.numBlocksRead_);
rti.addDetail("num-blocks-all", metadata.numBlocksAll_);
rti.addDetail("num-elements-read", metadata.numElementsRead_);

// Add more details, but only if the respective value is non-zero.
auto updateIfPositive = [&rti](const auto& value, const std::string& key) {
if (value > 0) {
rti.addDetail(key, value);
}

Check warning on line 475 in src/engine/IndexScan.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/IndexScan.cpp#L474-L475

Added lines #L474 - L475 were not covered by tests
};
updateIfPositive(metadata.numBlocksSkippedBecauseOfGraph_,
"num-blocks-skipped-graph");
updateIfPositive(metadata.numBlocksPostprocessed_,
"num-blocks-postprocessed");
updateIfPositive(metadata.numBlocksWithUpdate_, "num-blocks-with-update");
Comment on lines +465 to +481
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this part also a member function of the LazyScanMetadata?
This is then better if we add additional members to it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an abstraction that can be moved into LazyScanMetadata, but the part that calls updateRuntimeInformationWhenOptimizedOut has to be part of a class related to Operation and the compressed relation reader class currently does not include RuntimeInformation, which means a new header would need to be added that isn't really related to this class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't feel that you have the time right now, limit yourself to the comments that are important for the main functionality of this PR.

The real solution here is that neither the LazyScanMetadata nor the CompressedBlockMetadata should be part of the large CompressedRelation.h file.

}

// Store a Generator and its corresponding iterator as well as unconsumed values
// resulting from the generator.
struct IndexScan::SharedGeneratorState {
// The generator that yields the tables to be joined with the index scan.
Result::Generator generator_;
// The column index of the join column in the tables yielded by the generator.
ColumnIndex joinColumn_;
// Metadata and blocks of this index scan.
Permutation::MetadataAndBlocks metaBlocks_;
// The iterator of the generator that is currently being consumed.
std::optional<Result::Generator::iterator> iterator_ = std::nullopt;
// Values returned by the generator that have not been re-yielded yet.
// Typically we expect only 3 or less values to be prefetched (this is an
// implementation detail of `BlockZipperJoinImpl`).
using PrefetchStorage = absl::InlinedVector<Result::IdTableVocabPair, 3>;
PrefetchStorage prefetchedValues_{};
// Metadata of blocks that still need to be read.
std::vector<CompressedBlockMetadata> pendingBlocks_{};
// The index of the last matching block that was found using the join column.
std::optional<size_t> lastBlockIndex_ = std::nullopt;
// Indicates if the generator has yielded any undefined values.
bool hasUndef_ = false;
// Indicates if the generator has been fully consumed.
bool doneFetching_ = false;

// Advance the `iterator` to the next non-empty table. Set `hasUndef_` to true
// if the first table is undefined. Also set `doneFetching_` if the generator
// has been fully consumed.
void advanceInputToNextNonEmptyTable() {
bool firstStep = !iterator_.has_value();
if (iterator_.has_value()) {
++iterator_.value();
} else {
iterator_ = generator_.begin();
}
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
auto& iterator = iterator_.value();
while (iterator != generator_.end()) {
if (!iterator->idTable_.empty()) {
break;
}
++iterator;
}
doneFetching_ = iterator_ == generator_.end();
// Set the undef flag if the first table is undefined.
if (firstStep) {
hasUndef_ =
!doneFetching_ && iterator->idTable_.at(0, joinColumn_).isUndefined();
}
}

// Consume the next non-empty table from the generator and calculate the next
// matching blocks from the index scan.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function also sets doneFetching if necessary, maybe add this to the comment.

void fetch() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you set a mutex around the fetch() function?
That way we don't run into trouble should we ever seperate the two generators.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole system is inherently thread unsafe. The datastructures prefetchedValues_ and pendingBlocks_ are modified in the two generators. So I would have to add mutexes there too. In a multithreaded scenario the unbound prefetchedValues_ deque would potentially grow way too big, so even if the struct were thread safe it wouldn't be a good choice as-is. If we want thread safety in the future, we'd have to use something like the blocking queue instead.

advanceInputToNextNonEmptyTable();
if (doneFetching_) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (doneFetching_) {
if (doneFetching_ || hasUndef_) {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could do that, but I wanted to keep the correctness checks. I can omit them for the undef case. if you prefer it that way.

return;
}
auto& idTable = iterator_.value()->idTable_;
auto joinColumn = idTable.getColumn(joinColumn_);
AD_EXPENSIVE_CHECK(std::ranges::is_sorted(joinColumn));
AD_CORRECTNESS_CHECK(!joinColumn.empty());
// Skip processing for undef case, it will be handled differently
if (hasUndef_) {
return;
}
Comment on lines +545 to +548
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Skip processing for undef case, it will be handled differently
if (hasUndef_) {
return;
}
// The UNDEF case is handled separately (see the early return above).

AD_CORRECTNESS_CHECK(!joinColumn[0].isUndefined());
auto newBlocks =
CompressedRelationReader::getBlocksForJoin(joinColumn, metaBlocks_);
if (newBlocks.empty()) {
// The current input table matches no blocks, so we don't have to yield
// it.
return;
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
}
prefetchedValues_.push_back(std::move(*iterator_.value()));
// Find first value that differs from the last one that was used to find
// matching blocks.
auto startIterator =
lastBlockIndex_.has_value()
? std::ranges::upper_bound(newBlocks, lastBlockIndex_.value(), {},
&CompressedBlockMetadata::blockIndex_)
: newBlocks.begin();
lastBlockIndex_ = newBlocks.back().blockIndex_;
std::ranges::move(startIterator, newBlocks.end(),
std::back_inserter(pendingBlocks_));
}

// Check if there are any undefined values yielded by the original generator.
// If the generator hasn't been started to get consumed, this will start it.
bool hasUndef() {
if (!iterator_.has_value()) {
fetch();
}
return hasUndef_;
}
};

// _____________________________________________________________________________
Result::Generator IndexScan::createPrefilteredJoinSide(
std::shared_ptr<SharedGeneratorState> innerState) {
if (innerState->hasUndef()) {
AD_CORRECTNESS_CHECK(innerState->prefetchedValues_.empty());
for (auto& value : std::ranges::subrange{innerState->iterator_.value(),
innerState->generator_.end()}) {
co_yield value;
}
co_return;
}
auto& prefetchedValues = innerState->prefetchedValues_;
while (true) {
if (prefetchedValues.empty()) {
if (innerState->doneFetching_) {
co_return;
}
innerState->fetch();
AD_CORRECTNESS_CHECK(!prefetchedValues.empty() ||
innerState->doneFetching_);
}
// Make a defensive copy of the values to avoid modification during
// iteration when yielding.
auto copy = std::move(prefetchedValues);
// Moving out does not necessarily clear the values, so we do it explicitly.
prefetchedValues.clear();
for (auto& value : copy) {
co_yield value;
}
}
}

// _____________________________________________________________________________
Result::Generator IndexScan::createPrefilteredIndexScanSide(
std::shared_ptr<SharedGeneratorState> innerState) {
if (innerState->hasUndef()) {
for (auto& pair : chunkedIndexScan()) {
co_yield pair;
}
co_return;
}
LazyScanMetadata metadata;
auto& pendingBlocks = innerState->pendingBlocks_;
while (true) {
if (pendingBlocks.empty()) {
if (innerState->doneFetching_) {
metadata.numBlocksAll_ = innerState->metaBlocks_.blockMetadata_.size();
updateRuntimeInfoForLazyScan(metadata);
co_return;
}
innerState->fetch();
}
auto scan = getLazyScan(std::move(pendingBlocks));
AD_CORRECTNESS_CHECK(pendingBlocks.empty());
for (IdTable& idTable : scan) {
co_yield {std::move(idTable), LocalVocab{}};
}
metadata.aggregate(scan.details());
}
}

// _____________________________________________________________________________
std::pair<Result::Generator, Result::Generator> IndexScan::prefilterTables(
Result::Generator input, ColumnIndex joinColumn) {
AD_CORRECTNESS_CHECK(numVariables_ <= 3 && numVariables_ > 0);
auto metaBlocks = getMetadataForScan();

if (!metaBlocks.has_value()) {
return {Result::Generator{}, Result::Generator{}};
}
auto state = std::make_shared<SharedGeneratorState>(
std::move(input), joinColumn, std::move(metaBlocks.value()));
return {createPrefilteredJoinSide(state),
createPrefilteredIndexScanSide(state)};
}
30 changes: 30 additions & 0 deletions src/engine/IndexScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,32 @@ class IndexScan final : public Operation {
Permutation::IdTableGenerator lazyScanForJoinOfColumnWithScan(
std::span<const Id> joinColumn) const;

// Return two generators, the first of which yields exactly the elements of
// `input` and the second of which yields the matching blocks, skipping the
// blocks consisting only of rows that don't match the tables yielded by
// `input` to speed up join algorithms when no undef values are presend. When
// there are undef values, the second generator represents the full index
// scan.
std::pair<Result::Generator, Result::Generator> prefilterTables(
Result::Generator input, ColumnIndex joinColumn);

private:
// Implementation detail that allows to consume a generator from two other
// cooperating generators. Needs to be forward declared as it is used by
// several member functions below.
struct SharedGeneratorState;

// Helper function that creates a generator that re-yields the generator
// wrapped by `innerState`.
static Result::Generator createPrefilteredJoinSide(
std::shared_ptr<SharedGeneratorState> innerState);

// Helper function that creates a generator yielding prefiltered rows of this
// index scan according to the block metadata, that match the tables yielded
// by the generator wrapped by `innerState`.
Result::Generator createPrefilteredIndexScanSide(
std::shared_ptr<SharedGeneratorState> innerState);

// TODO<joka921> Make the `getSizeEstimateBeforeLimit()` function `const` for
// ALL the `Operations`.
uint64_t getSizeEstimateBeforeLimit() override { return sizeEstimate_; }
Expand Down Expand Up @@ -145,6 +170,11 @@ class IndexScan final : public Operation {
ScanSpecification getScanSpecification() const;
ScanSpecificationAsTripleComponent getScanSpecificationTc() const;

// Set the runtime info of the `scanTree` when it was lazily executed during a
// join.
void updateRuntimeInfoForLazyScan(
const CompressedRelationReader::LazyScanMetadata& metadata);

private:
ProtoResult computeResult(bool requestLaziness) override;

Expand Down
Loading
Loading