Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinTF committed Dec 3, 2024
1 parent 317127c commit 9934c62
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 45 deletions.
68 changes: 31 additions & 37 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "engine/IndexScan.h"

#include <absl/container/inlined_vector.h>
#include <absl/strings/str_join.h>

#include <boost/optional.hpp>
Expand All @@ -14,23 +15,7 @@
#include "parser/ParsedQuery.h"

using std::string;

namespace {
using LazyScanMetadata = CompressedRelationReader::LazyScanMetadata;
// Add the values of two LazyScanMetadata objects to the first one.
void aggregateMetadata(LazyScanMetadata& aggregate,
const LazyScanMetadata& newValue) {
aggregate.numElementsYielded_ += newValue.numElementsYielded_;
aggregate.blockingTime_ += newValue.blockingTime_;
aggregate.numBlocksRead_ += newValue.numBlocksRead_;
aggregate.numBlocksAll_ += newValue.numBlocksAll_;
aggregate.numElementsRead_ += newValue.numElementsRead_;
aggregate.numBlocksSkippedBecauseOfGraph_ +=
newValue.numBlocksSkippedBecauseOfGraph_;
aggregate.numBlocksPostprocessed_ += newValue.numBlocksPostprocessed_;
aggregate.numBlocksWithUpdate_ += newValue.numBlocksWithUpdate_;
}
} // namespace

// _____________________________________________________________________________
// Return the number of `Variables` given the `TripleComponent` values for
Expand Down Expand Up @@ -507,49 +492,53 @@ struct IndexScan::SharedGeneratorState {
Permutation::MetadataAndBlocks metaBlocks_;
// The iterator of the generator that is currently being consumed.
std::optional<Result::Generator::iterator> iterator_ = std::nullopt;
// The index of the last matching block that was found using the join column.
std::optional<size_t> lastBlockIndex_ = std::nullopt;
// Values returned by the generator that have not been re-yielded yet.
std::deque<Result::IdTableVocabPair> prefetchedValues_{};
// Typically we expect only 3 or less values to be prefetched (this is an
// implementation detail of `BlockZipperJoinImpl`).
using PrefetchStorage = absl::InlinedVector<Result::IdTableVocabPair, 3>;
PrefetchStorage prefetchedValues_{};
// Metadata of blocks that still need to be read.
std::vector<CompressedBlockMetadata> pendingBlocks_{};
// The index of the last matching block that was found using the join column.
std::optional<size_t> lastBlockIndex_ = std::nullopt;
// Indicates if the generator has yielded any undefined values.
bool hasUndef_ = false;
// Indicates if the generator has been fully consumed.
bool doneFetching_ = false;

// Advance the iterator to the next non-empty table. Set `hasUndef_` to true
// Advance the `iterator` to the next non-empty table. Set `hasUndef_` to true
// if the first table is undefined. Also set `doneFetching_` if the generator
// has been fully consumed.
void advanceInputToFirstNonEmptyTable() {
void advanceInputToNextNonEmptyTable() {
bool firstStep = !iterator_.has_value();
if (iterator_.has_value()) {
++iterator_.value();
} else {
iterator_ = generator_.begin();
}
while (iterator_ != generator_.end()) {
if (!iterator_.value()->idTable_.empty()) {
auto& iterator = iterator_.value();
while (iterator != generator_.end()) {
if (!iterator->idTable_.empty()) {
break;
}
++iterator_.value();
++iterator;
}
doneFetching_ = iterator_ == generator_.end();
// Set the undef flag if the first table is undefined.
if (firstStep) {
hasUndef_ = !doneFetching_ &&
iterator_.value()->idTable_.at(0, joinColumn_).isUndefined();
hasUndef_ =
!doneFetching_ && iterator->idTable_.at(0, joinColumn_).isUndefined();
}
}

// Consume the next non-empty table from the generator and calculate the next
// matching blocks from the index scan.
void fetch() {
advanceInputToFirstNonEmptyTable();
advanceInputToNextNonEmptyTable();
if (doneFetching_) {
return;
}
auto& [idTable, localVocab] = *iterator_.value();
auto& idTable = iterator_.value()->idTable_;
auto joinColumn = idTable.getColumn(joinColumn_);
AD_EXPENSIVE_CHECK(std::ranges::is_sorted(joinColumn));
AD_CORRECTNESS_CHECK(!joinColumn.empty());
Expand All @@ -561,6 +550,8 @@ struct IndexScan::SharedGeneratorState {
auto newBlocks =
CompressedRelationReader::getBlocksForJoin(joinColumn, metaBlocks_);
if (newBlocks.empty()) {
// The current input table matches no blocks, so we don't have to yield
// it.
return;
}
prefetchedValues_.push_back(std::move(*iterator_.value()));
Expand All @@ -571,9 +562,9 @@ struct IndexScan::SharedGeneratorState {
? std::ranges::upper_bound(newBlocks, lastBlockIndex_.value(), {},
&CompressedBlockMetadata::blockIndex_)
: newBlocks.begin();
lastBlockIndex_ = newBlocks.back().blockIndex_;
std::ranges::move(startIterator, newBlocks.end(),
std::back_inserter(pendingBlocks_));
lastBlockIndex_ = newBlocks.back().blockIndex_;
}

// Check if there are any undefined values yielded by the original generator.
Expand All @@ -591,10 +582,9 @@ Result::Generator IndexScan::createPrefilteredJoinSide(
std::shared_ptr<SharedGeneratorState> innerState) {
if (innerState->hasUndef()) {
AD_CORRECTNESS_CHECK(innerState->prefetchedValues_.empty());
auto& iterator = innerState->iterator_.value();
while (iterator != innerState->generator_.end()) {
co_yield *iterator;
++iterator;
for (auto& value : std::ranges::subrange{innerState->iterator_.value(),
innerState->generator_.end()}) {
co_yield value;
}
co_return;
}
Expand All @@ -608,9 +598,13 @@ Result::Generator IndexScan::createPrefilteredJoinSide(
AD_CORRECTNESS_CHECK(!prefetchedValues.empty() ||
innerState->doneFetching_);
}
while (!prefetchedValues.empty()) {
co_yield prefetchedValues.front();
prefetchedValues.pop_front();
// Make a defensive copy of the values to avoid modification during
// iteration when yielding.
auto copy = std::move(prefetchedValues);
// Moving out does not necessarily clear the values, so we do it explicitly.
prefetchedValues.clear();
for (auto& value : copy) {
co_yield value;
}
}
}
Expand Down Expand Up @@ -640,7 +634,7 @@ Result::Generator IndexScan::createPrefilteredIndexScanSide(
for (IdTable& idTable : scan) {
co_yield {std::move(idTable), LocalVocab{}};
}
aggregateMetadata(metadata, scan.details());
metadata.aggregate(scan.details());
}
}

Expand Down
12 changes: 4 additions & 8 deletions src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,7 @@ ProtoResult Join::lazyJoin(std::shared_ptr<const Result> a,
[this, a = std::move(a), b = std::move(b),
joinColMap = std::move(joinColMap)](
std::function<void(IdTable&, LocalVocab&)> yieldTable) {
ad_utility::AddCombinedRowToIdTable rowAdder =
makeRowAdder(std::move(yieldTable));
auto rowAdder = makeRowAdder(std::move(yieldTable));
auto leftRange = resultToView(*a, joinColMap.permutationLeft());
auto rightRange = resultToView(*b, joinColMap.permutationRight());
std::visit(
Expand Down Expand Up @@ -662,8 +661,7 @@ ProtoResult Join::computeResultForTwoIndexScans(bool requestLaziness) const {
// don't have to permute the inputs and results for the
// `AddCombinedRowToIdTable` class to work correctly.
AD_CORRECTNESS_CHECK(_leftJoinCol == 0 && _rightJoinCol == 0);
ad_utility::AddCombinedRowToIdTable rowAdder =
makeRowAdder(std::move(yieldTable));
auto rowAdder = makeRowAdder(std::move(yieldTable));

ad_utility::Timer timer{
ad_utility::timer::Timer::InitialStatus::Started};
Expand Down Expand Up @@ -706,8 +704,7 @@ ProtoResult Join::computeResultForIndexScanAndIdTable(
joinColMap = std::move(joinColMap)](
std::function<void(IdTable&, LocalVocab&)> yieldTable) {
const IdTable& idTable = resultWithIdTable->idTable();
ad_utility::AddCombinedRowToIdTable rowAdder =
makeRowAdder(std::move(yieldTable));
auto rowAdder = makeRowAdder(std::move(yieldTable));

auto permutationIdTable = ad_utility::IdTableAndFirstCol{
idTable.asColumnSubsetView(idTableIsRightInput
Expand Down Expand Up @@ -785,8 +782,7 @@ ProtoResult Join::computeResultForIndexScanAndLazyOperation(
resultWithIdTable = std::move(resultWithIdTable),
joinColMap = std::move(joinColMap)](
std::function<void(IdTable&, LocalVocab&)> yieldTable) {
ad_utility::AddCombinedRowToIdTable rowAdder =
makeRowAdder(std::move(yieldTable));
auto rowAdder = makeRowAdder(std::move(yieldTable));

auto [joinSide, indexScanSide] = scan->prefilterTables(
std::move(resultWithIdTable->idTables()), _leftJoinCol);
Expand Down
13 changes: 13 additions & 0 deletions src/index/CompressedRelation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1594,3 +1594,16 @@ void CompressedRelationReader::LazyScanMetadata::update(
++numBlocksSkippedBecauseOfGraph_;
}
}

// _____________________________________________________________________________
void CompressedRelationReader::LazyScanMetadata::aggregate(
const LazyScanMetadata& newValue) {
numElementsYielded_ += newValue.numElementsYielded_;
blockingTime_ += newValue.blockingTime_;
numBlocksRead_ += newValue.numBlocksRead_;
numBlocksAll_ += newValue.numBlocksAll_;
numElementsRead_ += newValue.numElementsRead_;
numBlocksSkippedBecauseOfGraph_ += newValue.numBlocksSkippedBecauseOfGraph_;
numBlocksPostprocessed_ += newValue.numBlocksPostprocessed_;
numBlocksWithUpdate_ += newValue.numBlocksWithUpdate_;
}
3 changes: 3 additions & 0 deletions src/index/CompressedRelation.h
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,9 @@ class CompressedRelationReader {
// call the overload directly above.
void update(
const std::optional<DecompressedBlockAndMetadata>& blockAndMetadata);

// Aggregate the metadata from `newValue` into this metadata.
void aggregate(const LazyScanMetadata& newValue);
};

using IdTableGenerator = cppcoro::generator<IdTable, LazyScanMetadata>;
Expand Down

0 comments on commit 9934c62

Please sign in to comment.