diff --git a/src/engine/IndexScan.cpp b/src/engine/IndexScan.cpp index 09d6b15de7..1e6b71981d 100644 --- a/src/engine/IndexScan.cpp +++ b/src/engine/IndexScan.cpp @@ -4,6 +4,7 @@ #include "engine/IndexScan.h" +#include #include #include @@ -14,6 +15,7 @@ #include "parser/ParsedQuery.h" using std::string; +using LazyScanMetadata = CompressedRelationReader::LazyScanMetadata; // _____________________________________________________________________________ // Return the number of `Variables` given the `TripleComponent` values for @@ -442,15 +444,211 @@ Permutation::IdTableGenerator IndexScan::lazyScanForJoinOfColumnWithScan( AD_CORRECTNESS_CHECK(numVariables_ <= 3 && numVariables_ > 0); AD_CONTRACT_CHECK(joinColumn.empty() || !joinColumn[0].isUndefined()); - auto metaBlocks1 = getMetadataForScan(); + auto metaBlocks = getMetadataForScan(); - if (!metaBlocks1.has_value()) { + if (!metaBlocks.has_value()) { return {}; } auto blocks = CompressedRelationReader::getBlocksForJoin(joinColumn, - metaBlocks1.value()); + metaBlocks.value()); auto result = getLazyScan(blocks); - result.details().numBlocksAll_ = metaBlocks1.value().blockMetadata_.size(); + result.details().numBlocksAll_ = metaBlocks.value().blockMetadata_.size(); return result; } + +// _____________________________________________________________________________ +void IndexScan::updateRuntimeInfoForLazyScan(const LazyScanMetadata& metadata) { + updateRuntimeInformationWhenOptimizedOut( + RuntimeInformation::Status::lazilyMaterialized); + auto& rti = runtimeInfo(); + rti.numRows_ = metadata.numElementsYielded_; + rti.totalTime_ = metadata.blockingTime_; + rti.addDetail("num-blocks-read", metadata.numBlocksRead_); + rti.addDetail("num-blocks-all", metadata.numBlocksAll_); + rti.addDetail("num-elements-read", metadata.numElementsRead_); + + // Add more details, but only if the respective value is non-zero. + auto updateIfPositive = [&rti](const auto& value, const std::string& key) { + if (value > 0) { + rti.addDetail(key, value); + } + }; + updateIfPositive(metadata.numBlocksSkippedBecauseOfGraph_, + "num-blocks-skipped-graph"); + updateIfPositive(metadata.numBlocksPostprocessed_, + "num-blocks-postprocessed"); + updateIfPositive(metadata.numBlocksWithUpdate_, "num-blocks-with-update"); +} + +// Store a Generator and its corresponding iterator as well as unconsumed values +// resulting from the generator. +struct IndexScan::SharedGeneratorState { + // The generator that yields the tables to be joined with the index scan. + Result::Generator generator_; + // The column index of the join column in the tables yielded by the generator. + ColumnIndex joinColumn_; + // Metadata and blocks of this index scan. + Permutation::MetadataAndBlocks metaBlocks_; + // The iterator of the generator that is currently being consumed. + std::optional iterator_ = std::nullopt; + // Values returned by the generator that have not been re-yielded yet. + // Typically we expect only 3 or less values to be prefetched (this is an + // implementation detail of `BlockZipperJoinImpl`). + using PrefetchStorage = absl::InlinedVector; + PrefetchStorage prefetchedValues_{}; + // Metadata of blocks that still need to be read. + std::vector pendingBlocks_{}; + // The index of the last matching block that was found using the join column. + std::optional lastBlockIndex_ = std::nullopt; + // Indicates if the generator has yielded any undefined values. + bool hasUndef_ = false; + // Indicates if the generator has been fully consumed. + bool doneFetching_ = false; + + // Advance the `iterator` to the next non-empty table. Set `hasUndef_` to true + // if the first table is undefined. Also set `doneFetching_` if the generator + // has been fully consumed. + void advanceInputToNextNonEmptyTable() { + bool firstStep = !iterator_.has_value(); + if (iterator_.has_value()) { + ++iterator_.value(); + } else { + iterator_ = generator_.begin(); + } + auto& iterator = iterator_.value(); + while (iterator != generator_.end()) { + if (!iterator->idTable_.empty()) { + break; + } + ++iterator; + } + doneFetching_ = iterator_ == generator_.end(); + // Set the undef flag if the first table is undefined. + if (firstStep) { + hasUndef_ = + !doneFetching_ && iterator->idTable_.at(0, joinColumn_).isUndefined(); + } + } + + // Consume the next non-empty table from the generator and calculate the next + // matching blocks from the index scan. + void fetch() { + advanceInputToNextNonEmptyTable(); + if (doneFetching_) { + return; + } + auto& idTable = iterator_.value()->idTable_; + auto joinColumn = idTable.getColumn(joinColumn_); + AD_EXPENSIVE_CHECK(std::ranges::is_sorted(joinColumn)); + AD_CORRECTNESS_CHECK(!joinColumn.empty()); + // Skip processing for undef case, it will be handled differently + if (hasUndef_) { + return; + } + AD_CORRECTNESS_CHECK(!joinColumn[0].isUndefined()); + auto newBlocks = + CompressedRelationReader::getBlocksForJoin(joinColumn, metaBlocks_); + if (newBlocks.empty()) { + // The current input table matches no blocks, so we don't have to yield + // it. + return; + } + prefetchedValues_.push_back(std::move(*iterator_.value())); + // Find first value that differs from the last one that was used to find + // matching blocks. + auto startIterator = + lastBlockIndex_.has_value() + ? std::ranges::upper_bound(newBlocks, lastBlockIndex_.value(), {}, + &CompressedBlockMetadata::blockIndex_) + : newBlocks.begin(); + lastBlockIndex_ = newBlocks.back().blockIndex_; + std::ranges::move(startIterator, newBlocks.end(), + std::back_inserter(pendingBlocks_)); + } + + // Check if there are any undefined values yielded by the original generator. + // If the generator hasn't been started to get consumed, this will start it. + bool hasUndef() { + if (!iterator_.has_value()) { + fetch(); + } + return hasUndef_; + } +}; + +// _____________________________________________________________________________ +Result::Generator IndexScan::createPrefilteredJoinSide( + std::shared_ptr innerState) { + if (innerState->hasUndef()) { + AD_CORRECTNESS_CHECK(innerState->prefetchedValues_.empty()); + for (auto& value : std::ranges::subrange{innerState->iterator_.value(), + innerState->generator_.end()}) { + co_yield value; + } + co_return; + } + auto& prefetchedValues = innerState->prefetchedValues_; + while (true) { + if (prefetchedValues.empty()) { + if (innerState->doneFetching_) { + co_return; + } + innerState->fetch(); + AD_CORRECTNESS_CHECK(!prefetchedValues.empty() || + innerState->doneFetching_); + } + // Make a defensive copy of the values to avoid modification during + // iteration when yielding. + auto copy = std::move(prefetchedValues); + // Moving out does not necessarily clear the values, so we do it explicitly. + prefetchedValues.clear(); + for (auto& value : copy) { + co_yield value; + } + } +} + +// _____________________________________________________________________________ +Result::Generator IndexScan::createPrefilteredIndexScanSide( + std::shared_ptr innerState) { + if (innerState->hasUndef()) { + for (auto& pair : chunkedIndexScan()) { + co_yield pair; + } + co_return; + } + LazyScanMetadata metadata; + auto& pendingBlocks = innerState->pendingBlocks_; + while (true) { + if (pendingBlocks.empty()) { + if (innerState->doneFetching_) { + metadata.numBlocksAll_ = innerState->metaBlocks_.blockMetadata_.size(); + updateRuntimeInfoForLazyScan(metadata); + co_return; + } + innerState->fetch(); + } + auto scan = getLazyScan(std::move(pendingBlocks)); + AD_CORRECTNESS_CHECK(pendingBlocks.empty()); + for (IdTable& idTable : scan) { + co_yield {std::move(idTable), LocalVocab{}}; + } + metadata.aggregate(scan.details()); + } +} + +// _____________________________________________________________________________ +std::pair IndexScan::prefilterTables( + Result::Generator input, ColumnIndex joinColumn) { + AD_CORRECTNESS_CHECK(numVariables_ <= 3 && numVariables_ > 0); + auto metaBlocks = getMetadataForScan(); + + if (!metaBlocks.has_value()) { + return {Result::Generator{}, Result::Generator{}}; + } + auto state = std::make_shared( + std::move(input), joinColumn, std::move(metaBlocks.value())); + return {createPrefilteredJoinSide(state), + createPrefilteredIndexScanSide(state)}; +} diff --git a/src/engine/IndexScan.h b/src/engine/IndexScan.h index c5757dfe11..c10680f59e 100644 --- a/src/engine/IndexScan.h +++ b/src/engine/IndexScan.h @@ -99,7 +99,32 @@ class IndexScan final : public Operation { Permutation::IdTableGenerator lazyScanForJoinOfColumnWithScan( std::span joinColumn) const; + // Return two generators, the first of which yields exactly the elements of + // `input` and the second of which yields the matching blocks, skipping the + // blocks consisting only of rows that don't match the tables yielded by + // `input` to speed up join algorithms when no undef values are presend. When + // there are undef values, the second generator represents the full index + // scan. + std::pair prefilterTables( + Result::Generator input, ColumnIndex joinColumn); + private: + // Implementation detail that allows to consume a generator from two other + // cooperating generators. Needs to be forward declared as it is used by + // several member functions below. + struct SharedGeneratorState; + + // Helper function that creates a generator that re-yields the generator + // wrapped by `innerState`. + static Result::Generator createPrefilteredJoinSide( + std::shared_ptr innerState); + + // Helper function that creates a generator yielding prefiltered rows of this + // index scan according to the block metadata, that match the tables yielded + // by the generator wrapped by `innerState`. + Result::Generator createPrefilteredIndexScanSide( + std::shared_ptr innerState); + // TODO Make the `getSizeEstimateBeforeLimit()` function `const` for // ALL the `Operations`. uint64_t getSizeEstimateBeforeLimit() override { return sizeEstimate_; } @@ -145,6 +170,11 @@ class IndexScan final : public Operation { ScanSpecification getScanSpecification() const; ScanSpecificationAsTripleComponent getScanSpecificationTc() const; + // Set the runtime info of the `scanTree` when it was lazily executed during a + // join. + void updateRuntimeInfoForLazyScan( + const CompressedRelationReader::LazyScanMetadata& metadata); + private: ProtoResult computeResult(bool requestLaziness) override; diff --git a/src/engine/Join.cpp b/src/engine/Join.cpp index 24ee843c1a..d3c5370e16 100644 --- a/src/engine/Join.cpp +++ b/src/engine/Join.cpp @@ -74,12 +74,27 @@ std::variant resultToView( } return convertGenerator(std::move(result.idTables()), permutation); } + +using GeneratorWithDetails = + cppcoro::generator, + CompressedRelationReader::LazyScanMetadata>; +// Convert a `generator` for more +// efficient access in the join columns below. +GeneratorWithDetails convertGenerator(Permutation::IdTableGenerator gen) { + co_await cppcoro::getDetails = gen.details(); + gen.setDetailsPointer(&co_await cppcoro::getDetails); + for (auto& table : gen) { + // IndexScans don't have a local vocabulary, so we can just use an empty one + ad_utility::IdTableAndFirstCol t{std::move(table), LocalVocab{}}; + co_yield t; + } +} } // namespace // _____________________________________________________________________________ Join::Join(QueryExecutionContext* qec, std::shared_ptr t1, std::shared_ptr t2, ColumnIndex t1JoinCol, - ColumnIndex t2JoinCol) + ColumnIndex t2JoinCol, bool allowSwappingChildrenOnlyForTesting) : Operation(qec) { AD_CONTRACT_CHECK(t1 && t2); // Currently all join algorithms require both inputs to be sorted, so we @@ -91,8 +106,11 @@ Join::Join(QueryExecutionContext* qec, std::shared_ptr t1, // are identical except for the order of the join operands, are easier to // identify. auto swapChildren = [&]() { - std::swap(t1, t2); - std::swap(t1JoinCol, t2JoinCol); + // This can be disabled by tests to fix the order of the subtrees. + if (allowSwappingChildrenOnlyForTesting) { + std::swap(t1, t2); + std::swap(t1JoinCol, t2JoinCol); + } }; if (t1->getCacheKey() > t2->getCacheKey()) { swapChildren(); @@ -119,15 +137,6 @@ Join::Join(QueryExecutionContext* qec, std::shared_ptr t1, AD_CONTRACT_CHECK(_joinVar == findJoinVar(*_right, _rightJoinCol)); } -// _____________________________________________________________________________ -Join::Join(InvalidOnlyForTestingJoinTag, QueryExecutionContext* qec) - : Operation(qec) { - // Needed, so that the timeout checker in Join::join doesn't create a seg - // fault if it tries to create a message about the timeout. - _left = std::make_shared(qec); - _right = _left; -} - // _____________________________________________________________________________ string Join::getCacheKeyImpl() const { std::ostringstream os; @@ -179,8 +188,7 @@ ProtoResult Join::computeResult(bool requestLaziness) { if (rightResIfCached && !leftResIfCached) { AD_CORRECTNESS_CHECK(rightResIfCached->isFullyMaterialized()); return computeResultForIndexScanAndIdTable( - requestLaziness, std::move(rightResIfCached), _rightJoinCol, - leftIndexScan, _leftJoinCol); + requestLaziness, std::move(rightResIfCached), leftIndexScan); } else if (!leftResIfCached) { return computeResultForTwoIndexScans(requestLaziness); @@ -204,10 +212,13 @@ ProtoResult Join::computeResult(bool requestLaziness) { // constructor that it is the right child. auto rightIndexScan = std::dynamic_pointer_cast(_right->getRootOperation()); - if (rightIndexScan && !rightResIfCached && leftRes->isFullyMaterialized()) { - return computeResultForIndexScanAndIdTable( - requestLaziness, std::move(leftRes), _leftJoinCol, rightIndexScan, - _rightJoinCol); + if (rightIndexScan && !rightResIfCached) { + if (leftRes->isFullyMaterialized()) { + return computeResultForIndexScanAndIdTable( + requestLaziness, std::move(leftRes), rightIndexScan); + } + return computeResultForIndexScanAndLazyOperation( + requestLaziness, std::move(leftRes), rightIndexScan); } std::shared_ptr rightRes = @@ -217,8 +228,7 @@ ProtoResult Join::computeResult(bool requestLaziness) { return computeResultForTwoMaterializedInputs(std::move(leftRes), std::move(rightRes)); } - return lazyJoin(std::move(leftRes), _leftJoinCol, std::move(rightRes), - _rightJoinCol, requestLaziness); + return lazyJoin(std::move(leftRes), std::move(rightRes), requestLaziness); } // _____________________________________________________________________________ @@ -331,8 +341,7 @@ void Join::computeSizeEstimateAndMultiplicities() { // ______________________________________________________________________________ -void Join::join(const IdTable& a, ColumnIndex jc1, const IdTable& b, - ColumnIndex jc2, IdTable* result) const { +void Join::join(const IdTable& a, const IdTable& b, IdTable* result) const { LOG(DEBUG) << "Performing join between two tables.\n"; LOG(DEBUG) << "A: width = " << a.numColumns() << ", size = " << a.size() << "\n"; @@ -344,10 +353,9 @@ void Join::join(const IdTable& a, ColumnIndex jc1, const IdTable& b, return; } checkCancellation(); - ad_utility::JoinColumnMapping joinColumnData{ - {{jc1, jc2}}, a.numColumns(), b.numColumns()}; - auto joinColumnL = a.getColumn(jc1); - auto joinColumnR = b.getColumn(jc2); + ad_utility::JoinColumnMapping joinColumnData = getJoinColumnMapping(); + auto joinColumnL = a.getColumn(_leftJoinCol); + auto joinColumnR = b.getColumn(_rightJoinCol); auto aPermuted = a.asColumnSubsetView(joinColumnData.permutationLeft()); auto bPermuted = b.asColumnSubsetView(joinColumnData.permutationRight()); @@ -479,25 +487,20 @@ ProtoResult Join::createResult( } // ______________________________________________________________________________ -ProtoResult Join::lazyJoin(std::shared_ptr a, ColumnIndex jc1, - std::shared_ptr b, ColumnIndex jc2, +ProtoResult Join::lazyJoin(std::shared_ptr a, + std::shared_ptr b, bool requestLaziness) const { // If both inputs are fully materialized, we can join them more // efficiently. AD_CONTRACT_CHECK(!a->isFullyMaterialized() || !b->isFullyMaterialized()); - ad_utility::JoinColumnMapping joinColMap{ - {{jc1, jc2}}, - _left->getRootOperation()->getResultWidth(), - _right->getRootOperation()->getResultWidth()}; + ad_utility::JoinColumnMapping joinColMap = getJoinColumnMapping(); auto resultPermutation = joinColMap.permutationResult(); return createResult( requestLaziness, [this, a = std::move(a), b = std::move(b), joinColMap = std::move(joinColMap)]( std::function yieldTable) { - ad_utility::AddCombinedRowToIdTable rowAdder{ - 1, IdTable{getResultWidth(), allocator()}, cancellationHandle_, - CHUNK_SIZE, std::move(yieldTable)}; + auto rowAdder = makeRowAdder(std::move(yieldTable)); auto leftRange = resultToView(*a, joinColMap.permutationLeft()); auto rightRange = resultToView(*b, joinColMap.permutationRight()); std::visit( @@ -644,50 +647,6 @@ void Join::addCombinedRowToIdTable(const ROW_A& rowA, const ROW_B& rowB, } } -namespace { -using GeneratorWithDetails = - cppcoro::generator, - CompressedRelationReader::LazyScanMetadata>; -// Convert a `generator` for more -// efficient access in the join columns below. -GeneratorWithDetails convertGenerator(Permutation::IdTableGenerator gen) { - co_await cppcoro::getDetails = gen.details(); - gen.setDetailsPointer(&co_await cppcoro::getDetails); - for (auto& table : gen) { - // IndexScans don't have a local vocabulary, so we can just use an empty one - ad_utility::IdTableAndFirstCol t{std::move(table), LocalVocab{}}; - co_yield t; - } -} - -// Set the runtime info of the `scanTree` when it was lazily executed during a -// join. -void updateRuntimeInfoForLazyScan( - IndexScan& scanTree, - const CompressedRelationReader::LazyScanMetadata& metadata) { - scanTree.updateRuntimeInformationWhenOptimizedOut( - RuntimeInformation::Status::lazilyMaterialized); - auto& rti = scanTree.runtimeInfo(); - rti.numRows_ = metadata.numElementsYielded_; - rti.totalTime_ = metadata.blockingTime_; - rti.addDetail("num-blocks-read", metadata.numBlocksRead_); - rti.addDetail("num-blocks-all", metadata.numBlocksAll_); - rti.addDetail("num-elements-read", metadata.numElementsRead_); - - // Add more details, but only if the respective value is non-zero. - auto updateIfPositive = [&rti](const auto& value, const std::string& key) { - if (value > 0) { - rti.addDetail(key, value); - } - }; - updateIfPositive(metadata.numBlocksSkippedBecauseOfGraph_, - "num-blocks-skipped-graph"); - updateIfPositive(metadata.numBlocksPostprocessed_, - "num-blocks-postprocessed"); - updateIfPositive(metadata.numBlocksWithUpdate_, "num-blocks-with-update"); -} -} // namespace - // ______________________________________________________________________________________________________ ProtoResult Join::computeResultForTwoIndexScans(bool requestLaziness) const { return createResult( @@ -702,9 +661,7 @@ ProtoResult Join::computeResultForTwoIndexScans(bool requestLaziness) const { // don't have to permute the inputs and results for the // `AddCombinedRowToIdTable` class to work correctly. AD_CORRECTNESS_CHECK(_leftJoinCol == 0 && _rightJoinCol == 0); - ad_utility::AddCombinedRowToIdTable rowAdder{ - 1, IdTable{getResultWidth(), allocator()}, cancellationHandle_, - CHUNK_SIZE, std::move(yieldTable)}; + auto rowAdder = makeRowAdder(std::move(yieldTable)); ad_utility::Timer timer{ ad_utility::timer::Timer::InitialStatus::Started}; @@ -718,8 +675,8 @@ ProtoResult Join::computeResultForTwoIndexScans(bool requestLaziness) const { ad_utility::zipperJoinForBlocksWithoutUndef(leftBlocks, rightBlocks, std::less{}, rowAdder); - updateRuntimeInfoForLazyScan(*leftScan, leftBlocks.details()); - updateRuntimeInfoForLazyScan(*rightScan, rightBlocks.details()); + leftScan->updateRuntimeInfoForLazyScan(leftBlocks.details()); + rightScan->updateRuntimeInfoForLazyScan(rightBlocks.details()); AD_CORRECTNESS_CHECK(leftBlocks.details().numBlocksRead_ <= rightBlocks.details().numElementsRead_); @@ -735,33 +692,20 @@ ProtoResult Join::computeResultForTwoIndexScans(bool requestLaziness) const { template ProtoResult Join::computeResultForIndexScanAndIdTable( bool requestLaziness, std::shared_ptr resultWithIdTable, - ColumnIndex joinColTable, std::shared_ptr scan, - ColumnIndex joinColScan) const { - const IdTable& idTable = resultWithIdTable->idTable(); - // We first have to permute the columns. - auto [jcLeft, jcRight, numColsLeft, numColsRight] = [&]() { - return idTableIsRightInput - ? std::tuple{joinColScan, joinColTable, scan->getResultWidth(), - idTable.numColumns()} - : std::tuple{joinColTable, joinColScan, idTable.numColumns(), - scan->getResultWidth()}; - }(); - - ad_utility::JoinColumnMapping joinColMap{ - {{jcLeft, jcRight}}, numColsLeft, numColsRight}; + std::shared_ptr scan) const { + AD_CORRECTNESS_CHECK((idTableIsRightInput ? _leftJoinCol : _rightJoinCol) == + 0); + ad_utility::JoinColumnMapping joinColMap = getJoinColumnMapping(); auto resultPermutation = joinColMap.permutationResult(); return createResult( requestLaziness, - [this, joinColTable, scan = std::move(scan), joinColScan, + [this, scan = std::move(scan), resultWithIdTable = std::move(resultWithIdTable), joinColMap = std::move(joinColMap)]( std::function yieldTable) { const IdTable& idTable = resultWithIdTable->idTable(); - ad_utility::AddCombinedRowToIdTable rowAdder{ - 1, IdTable{getResultWidth(), allocator()}, cancellationHandle_, - CHUNK_SIZE, std::move(yieldTable)}; + auto rowAdder = makeRowAdder(std::move(yieldTable)); - AD_CORRECTNESS_CHECK(joinColScan == 0); auto permutationIdTable = ad_utility::IdTableAndFirstCol{ idTable.asColumnSubsetView(idTableIsRightInput ? joinColMap.permutationRight() @@ -771,7 +715,9 @@ ProtoResult Join::computeResultForIndexScanAndIdTable( ad_utility::Timer timer{ ad_utility::timer::Timer::InitialStatus::Started}; bool idTableHasUndef = - !idTable.empty() && idTable.at(0, joinColTable).isUndefined(); + !idTable.empty() && + idTable.at(0, idTableIsRightInput ? _rightJoinCol : _leftJoinCol) + .isUndefined(); std::optional> indexScanResult = std::nullopt; auto rightBlocks = [&scan, idTableHasUndef, &permutationIdTable, @@ -811,8 +757,8 @@ ProtoResult Join::computeResultForIndexScanAndIdTable( rightBlocks); if (std::holds_alternative(rightBlocks)) { - updateRuntimeInfoForLazyScan( - *scan, std::get(rightBlocks).details()); + scan->updateRuntimeInfoForLazyScan( + std::get(rightBlocks).details()); } auto localVocab = std::move(rowAdder.localVocab()); @@ -821,13 +767,47 @@ ProtoResult Join::computeResultForIndexScanAndIdTable( }, std::move(resultPermutation)); } + +// ______________________________________________________________________________________________________ +ProtoResult Join::computeResultForIndexScanAndLazyOperation( + bool requestLaziness, std::shared_ptr resultWithIdTable, + std::shared_ptr scan) const { + AD_CORRECTNESS_CHECK(_rightJoinCol == 0); + + ad_utility::JoinColumnMapping joinColMap = getJoinColumnMapping(); + auto resultPermutation = joinColMap.permutationResult(); + return createResult( + requestLaziness, + [this, scan = std::move(scan), + resultWithIdTable = std::move(resultWithIdTable), + joinColMap = std::move(joinColMap)]( + std::function yieldTable) { + auto rowAdder = makeRowAdder(std::move(yieldTable)); + + auto [joinSide, indexScanSide] = scan->prefilterTables( + std::move(resultWithIdTable->idTables()), _leftJoinCol); + + // Note: The `zipperJoinForBlocksWithPotentialUndef` automatically + // switches to a more efficient implementation if there are no UNDEF + // values in any of the inputs. + zipperJoinForBlocksWithPotentialUndef( + convertGenerator(std::move(joinSide), joinColMap.permutationLeft()), + convertGenerator(std::move(indexScanSide), + joinColMap.permutationRight()), + std::less{}, rowAdder); + + auto localVocab = std::move(rowAdder.localVocab()); + return Result::IdTableVocabPair{std::move(rowAdder).resultTable(), + std::move(localVocab)}; + }, + std::move(resultPermutation)); +} // _____________________________________________________________________________ ProtoResult Join::computeResultForTwoMaterializedInputs( std::shared_ptr leftRes, std::shared_ptr rightRes) const { IdTable idTable{getResultWidth(), allocator()}; - join(leftRes->idTable(), _leftJoinCol, rightRes->idTable(), _rightJoinCol, - &idTable); + join(leftRes->idTable(), rightRes->idTable(), &idTable); checkCancellation(); return {std::move(idTable), resultSortedOn(), @@ -839,3 +819,18 @@ ProtoResult Join::createEmptyResult() const { return {IdTable{getResultWidth(), allocator()}, resultSortedOn(), LocalVocab{}}; } + +// _____________________________________________________________________________ +ad_utility::JoinColumnMapping Join::getJoinColumnMapping() const { + return ad_utility::JoinColumnMapping{{{_leftJoinCol, _rightJoinCol}}, + _left->getResultWidth(), + _right->getResultWidth()}; +} + +// _____________________________________________________________________________ +ad_utility::AddCombinedRowToIdTable Join::makeRowAdder( + std::function callback) const { + return ad_utility::AddCombinedRowToIdTable{ + 1, IdTable{getResultWidth(), allocator()}, cancellationHandle_, + CHUNK_SIZE, std::move(callback)}; +} diff --git a/src/engine/Join.h b/src/engine/Join.h index a65d10f87b..8c8978c8d3 100644 --- a/src/engine/Join.h +++ b/src/engine/Join.h @@ -6,11 +6,11 @@ #pragma once +#include "engine/AddCombinedRowToTable.h" #include "engine/IndexScan.h" #include "engine/Operation.h" #include "engine/QueryExecutionTree.h" -#include "util/HashMap.h" -#include "util/HashSet.h" +#include "util/JoinAlgorithms/JoinColumnMapping.h" #include "util/TypeTraits.h" class Join : public Operation { @@ -29,24 +29,14 @@ class Join : public Operation { vector _multiplicities; public: + // `allowSwappingChildrenOnlyForTesting` should only ever be changed by tests. Join(QueryExecutionContext* qec, std::shared_ptr t1, std::shared_ptr t2, ColumnIndex t1JoinCol, - ColumnIndex t2JoinCol); + ColumnIndex t2JoinCol, bool allowSwappingChildrenOnlyForTesting = true); using OptionalPermutation = std::optional>; static constexpr size_t CHUNK_SIZE = 100'000; - // A very explicit constructor, which initializes an invalid join object (it - // has no subtrees, which violates class invariants). These invalid Join - // objects can be used for unit tests that only test member functions which - // don't access the subtrees. - // - // @param qec Needed for creating some dummies, so that the time out checker - // in Join::join doesn't create a seg fault, when it detects a time out and - // tries to create an error message. (test/IndexTestHelpers.h has a function - // `getQec` for easily creating one for tests.) - struct InvalidOnlyForTestingJoinTag {}; - explicit Join(InvalidOnlyForTestingJoinTag, QueryExecutionContext* qec); virtual string getDescriptor() const override; @@ -93,8 +83,7 @@ class Join : public Operation { * TODO Move the merge join into it's own function and make this function * a proper switch. **/ - void join(const IdTable& a, ColumnIndex jc1, const IdTable& b, - ColumnIndex jc2, IdTable* result) const; + void join(const IdTable& a, const IdTable& b, IdTable* result) const; private: // Part of the implementation of `createResult`. This function is called when @@ -132,8 +121,8 @@ class Join : public Operation { // Fallback implementation of a join that is used when at least one of the two // inputs is not fully materialized. This represents the general case where we // don't have any optimization left to try. - ProtoResult lazyJoin(std::shared_ptr a, ColumnIndex jc1, - std::shared_ptr b, ColumnIndex jc2, + ProtoResult lazyJoin(std::shared_ptr a, + std::shared_ptr b, bool requestLaziness) const; /** @@ -165,15 +154,23 @@ class Join : public Operation { // `IndexScan`s that is actually needed without fully materializing them. ProtoResult computeResultForTwoIndexScans(bool requestLaziness) const; - // A special implementation that is called when one of the children is an - // `IndexScan`. The argument `scanIsLeft` determines whether the `IndexScan` - // is the left or the right child of this `Join`. This needs to be known to - // determine the correct order of the columns in the result. - template + // A special implementation that is called when exactly one of the children is + // an `IndexScan` and the other one is a fully materialized result. The + // argument `idTableIsRightInput` determines whether the `IndexScan` is the + // left or the right child of this `Join`. This needs to be known to determine + // the correct order of the columns in the result. + template ProtoResult computeResultForIndexScanAndIdTable( bool requestLaziness, std::shared_ptr resultWithIdTable, - ColumnIndex joinColTable, std::shared_ptr scan, - ColumnIndex joinColScan) const; + std::shared_ptr scan) const; + + // Special implementation that is called when the right child is an + // `IndexScan` and the left child is a lazy result. (The constructor will + // ensure the correct order if they are initially swapped). This allows the + // `IndexScan` to skip rows that won't match in the join operation. + ProtoResult computeResultForIndexScanAndLazyOperation( + bool requestLaziness, std::shared_ptr resultWithIdTable, + std::shared_ptr scan) const; // Default case where both inputs are fully materialized. ProtoResult computeResultForTwoMaterializedInputs( @@ -207,4 +204,14 @@ class Join : public Operation { // Commonly used code for the various known-to-be-empty cases. ProtoResult createEmptyResult() const; + + // Get permutation of input and output columns to apply before and after + // joining. This is required because the join algorithms expect the join + // columns to be the first columns of the input tables and the result to be in + // the order of the input tables. + ad_utility::JoinColumnMapping getJoinColumnMapping() const; + + // Helper function to create the commonly used instance of this class. + ad_utility::AddCombinedRowToIdTable makeRowAdder( + std::function callback) const; }; diff --git a/src/index/CompressedRelation.cpp b/src/index/CompressedRelation.cpp index 8e41809c5f..05ce8a8a82 100644 --- a/src/index/CompressedRelation.cpp +++ b/src/index/CompressedRelation.cpp @@ -1594,3 +1594,16 @@ void CompressedRelationReader::LazyScanMetadata::update( ++numBlocksSkippedBecauseOfGraph_; } } + +// _____________________________________________________________________________ +void CompressedRelationReader::LazyScanMetadata::aggregate( + const LazyScanMetadata& newValue) { + numElementsYielded_ += newValue.numElementsYielded_; + blockingTime_ += newValue.blockingTime_; + numBlocksRead_ += newValue.numBlocksRead_; + numBlocksAll_ += newValue.numBlocksAll_; + numElementsRead_ += newValue.numElementsRead_; + numBlocksSkippedBecauseOfGraph_ += newValue.numBlocksSkippedBecauseOfGraph_; + numBlocksPostprocessed_ += newValue.numBlocksPostprocessed_; + numBlocksWithUpdate_ += newValue.numBlocksWithUpdate_; +} diff --git a/src/index/CompressedRelation.h b/src/index/CompressedRelation.h index c152999e10..36cbf14af0 100644 --- a/src/index/CompressedRelation.h +++ b/src/index/CompressedRelation.h @@ -515,6 +515,9 @@ class CompressedRelationReader { // call the overload directly above. void update( const std::optional& blockAndMetadata); + + // Aggregate the metadata from `newValue` into this metadata. + void aggregate(const LazyScanMetadata& newValue); }; using IdTableGenerator = cppcoro::generator; diff --git a/test/JoinTest.cpp b/test/JoinTest.cpp index f1b24c2eb4..d5a0117078 100644 --- a/test/JoinTest.cpp +++ b/test/JoinTest.cpp @@ -390,7 +390,7 @@ TEST(JoinTest, joinWithColumnAndScan) { } TEST(JoinTest, joinWithColumnAndScanEmptyInput) { - auto test = [](size_t materializationThreshold) { + auto test = [](size_t materializationThreshold, bool lazyJoinValues) { auto qec = ad_utility::testing::getQec("

1.

2. 3."); auto cleanup = setRuntimeParameterForTest<"lazy-index-scan-max-size-materialization">( @@ -400,7 +400,9 @@ TEST(JoinTest, joinWithColumnAndScanEmptyInput) { qec, PSO, SparqlTriple{Var{"?s"}, "

", Var{"?o"}}); auto valuesTree = ad_utility::makeExecutionTree( - qec, IdTable{1, qec->getAllocator()}, Vars{Variable{"?s"}}); + qec, IdTable{1, qec->getAllocator()}, Vars{Variable{"?s"}}, false, + std::vector{0}, LocalVocab{}, std::nullopt, + !lazyJoinValues); auto join = Join{qec, fullScanPSO, valuesTree, 0, 0}; EXPECT_EQ(join.getDescriptor(), "Join on ?s"); @@ -414,15 +416,17 @@ TEST(JoinTest, joinWithColumnAndScanEmptyInput) { testJoinOperation(joinSwitched, makeExpectedColumns(expectedVariables, expected)); }; - test(0); - test(1); - test(2); - test(3); - test(1'000'000); + for (bool lazyJoinValues : {true, false}) { + test(0, lazyJoinValues); + test(1, lazyJoinValues); + test(2, lazyJoinValues); + test(3, lazyJoinValues); + test(1'000'000, lazyJoinValues); + } } TEST(JoinTest, joinWithColumnAndScanUndefValues) { - auto test = [](size_t materializationThreshold) { + auto test = [](size_t materializationThreshold, bool lazyJoinValues) { auto qec = ad_utility::testing::getQec("

1.

2. 3."); auto cleanup = setRuntimeParameterForTest<"lazy-index-scan-max-size-materialization">( @@ -432,7 +436,9 @@ TEST(JoinTest, joinWithColumnAndScanUndefValues) { qec, PSO, SparqlTriple{Var{"?s"}, "

", Var{"?o"}}); auto U = Id::makeUndefined(); auto valuesTree = ad_utility::makeExecutionTree( - qec, makeIdTableFromVector({{U}}), Vars{Variable{"?s"}}); + qec, makeIdTableFromVector({{U}}), Vars{Variable{"?s"}}, false, + std::vector{0}, LocalVocab{}, std::nullopt, + !lazyJoinValues); auto join = Join{qec, fullScanPSO, valuesTree, 0, 0}; EXPECT_EQ(join.getDescriptor(), "Join on ?s"); @@ -458,11 +464,13 @@ TEST(JoinTest, joinWithColumnAndScanUndefValues) { qec->getQueryTreeCache().clearAll(); testJoinOperation(joinSwitched, expectedColumns, false); }; - test(0); - test(1); - test(2); - test(3); - test(1'000'000); + for (bool lazyJoinValues : {true, false}) { + test(0, lazyJoinValues); + test(1, lazyJoinValues); + test(2, lazyJoinValues); + test(3, lazyJoinValues); + test(1'000'000, lazyJoinValues); + } } TEST(JoinTest, joinTwoScans) { diff --git a/test/engine/IndexScanTest.cpp b/test/engine/IndexScanTest.cpp index 33f056994a..90afa3ce22 100644 --- a/test/engine/IndexScanTest.cpp +++ b/test/engine/IndexScanTest.cpp @@ -759,3 +759,322 @@ TEST(IndexScan, checkEvaluationWithPrefiltering) { pr(andExpr(gt(IntId(10)), lt(IntId(194))), Variable{"?price"}), {I(10), I(12), I(18), I(22), I(25), I(147), I(189), I(194)}); } + +class IndexScanWithLazyJoin : public ::testing::TestWithParam { + protected: + QueryExecutionContext* qec_ = nullptr; + + void SetUp() override { + std::string kg = + "

.

. " + "

.

. " + "

.

. " + " . ."; + qec_ = getQec(std::move(kg)); + } + + // Convert a TripleComponent to a ValueId. + Id toValueId(const TripleComponent& tc) const { + return tc.toValueId(qec_->getIndex().getVocab()).value(); + } + + // Create an id table with a single column from a vector of + // `TripleComponent`s. + IdTable makeIdTable(std::vector entries) const { + IdTable result{1, makeAllocator()}; + result.reserve(entries.size()); + for (const TripleComponent& entry : entries) { + result.emplace_back(); + result.back()[0] = toValueId(entry); + } + return result; + } + + // Convert a vector of a tuple of triples to an id table. + IdTable tableFromTriples( + std::vector> triples) const { + IdTable result{2, makeAllocator()}; + result.reserve(triples.size()); + for (const auto& triple : triples) { + result.emplace_back(); + result.back()[0] = toValueId(triple.at(0)); + result.back()[1] = toValueId(triple.at(1)); + } + return result; + } + + // Create a common `IndexScan` instance. + IndexScan makeScan() const { + SparqlTriple xpy{Tc{Var{"?x"}}, "

", Tc{Var{"?y"}}}; + // We need to scan all the blocks that contain the `

` predicate. + return IndexScan{qec_, Permutation::PSO, xpy}; + } + + // Consume generator `first` first and store it in a vector, then do the same + // with `second`. + static std::pair, + std::vector> + consumeSequentially(Result::Generator first, Result::Generator second) { + std::vector firstResult; + std::vector secondResult; + + for (Result::IdTableVocabPair& element : first) { + firstResult.push_back(std::move(element)); + } + for (Result::IdTableVocabPair& element : second) { + secondResult.push_back(std::move(element)); + } + return {std::move(firstResult), std::move(secondResult)}; + } + + // Consume the generators and store the results in vectors using the + // parameterized strategy. + static std::pair, + std::vector> + consumeGenerators( + std::pair generatorPair) { + std::vector joinSideResults; + std::vector scanResults; + + bool rightFirst = GetParam(); + if (rightFirst) { + std::tie(scanResults, joinSideResults) = consumeSequentially( + std::move(generatorPair.second), std::move(generatorPair.first)); + } else { + std::tie(joinSideResults, scanResults) = consumeSequentially( + std::move(generatorPair.first), std::move(generatorPair.second)); + } + return {std::move(joinSideResults), std::move(scanResults)}; + } +}; + +// _____________________________________________________________________________ +TEST_P(IndexScanWithLazyJoin, prefilterTablesDoesFilterCorrectly) { + IndexScan scan = makeScan(); + + auto makeJoinSide = [](auto* self) -> Result::Generator { + using P = Result::IdTableVocabPair; + P p1{self->makeIdTable({iri(""), iri("")}), LocalVocab{}}; + co_yield p1; + P p2{self->makeIdTable({iri("")}), LocalVocab{}}; + co_yield p2; + LocalVocab vocab; + vocab.getIndexAndAddIfNotContained(LocalVocabEntry{ + ad_utility::triple_component::Literal::literalWithoutQuotes("Test")}); + P p3{self->makeIdTable({iri(""), iri(""), iri("")}), + std::move(vocab)}; + co_yield p3; + }; + + auto [joinSideResults, scanResults] = + consumeGenerators(scan.prefilterTables(makeJoinSide(this), 0)); + + ASSERT_EQ(scanResults.size(), 2); + ASSERT_EQ(joinSideResults.size(), 3); + + EXPECT_TRUE(scanResults.at(0).localVocab_.empty()); + EXPECT_TRUE(joinSideResults.at(0).localVocab_.empty()); + + EXPECT_TRUE(scanResults.at(1).localVocab_.empty()); + EXPECT_TRUE(joinSideResults.at(1).localVocab_.empty()); + + EXPECT_FALSE(joinSideResults.at(2).localVocab_.empty()); + + EXPECT_EQ( + scanResults.at(0).idTable_, + tableFromTriples({{iri(""), iri("")}, {iri(""), iri("")}})); + EXPECT_EQ(joinSideResults.at(0).idTable_, + makeIdTable({iri(""), iri("")})); + + EXPECT_EQ( + scanResults.at(1).idTable_, + tableFromTriples({{iri(""), iri("")}, {iri(""), iri("")}})); + EXPECT_EQ(joinSideResults.at(1).idTable_, makeIdTable({iri("")})); + + EXPECT_EQ(joinSideResults.at(2).idTable_, + makeIdTable({iri(""), iri(""), iri("")})); +} + +// _____________________________________________________________________________ +TEST_P(IndexScanWithLazyJoin, + prefilterTablesDoesFilterCorrectlyWithOverlappingValues) { + std::string kg = "

.

. "; + qec_ = getQec(std::move(kg)); + IndexScan scan = makeScan(); + + auto makeJoinSide = [](auto* self) -> Result::Generator { + using P = Result::IdTableVocabPair; + P p1{self->makeIdTable({iri("")}), LocalVocab{}}; + co_yield p1; + P p2{self->makeIdTable({iri("")}), LocalVocab{}}; + co_yield p2; + }; + + auto [joinSideResults, scanResults] = + consumeGenerators(scan.prefilterTables(makeJoinSide(this), 0)); + + ASSERT_EQ(scanResults.size(), 1); + ASSERT_EQ(joinSideResults.size(), 2); + + EXPECT_TRUE(scanResults.at(0).localVocab_.empty()); + EXPECT_TRUE(joinSideResults.at(0).localVocab_.empty()); + EXPECT_TRUE(joinSideResults.at(1).localVocab_.empty()); + + EXPECT_EQ( + scanResults.at(0).idTable_, + tableFromTriples({{iri(""), iri("")}, {iri(""), iri("")}})); + EXPECT_EQ(joinSideResults.at(0).idTable_, makeIdTable({iri("")})); + + EXPECT_EQ(joinSideResults.at(1).idTable_, makeIdTable({iri("")})); +} + +// _____________________________________________________________________________ +TEST_P(IndexScanWithLazyJoin, + prefilterTablesDoesFilterCorrectlyWithSkipTablesWithoutMatchingBlock) { + std::string kg = "

.

. "; + qec_ = getQec(std::move(kg)); + IndexScan scan = makeScan(); + + auto makeJoinSide = []() -> Result::Generator { + using P = Result::IdTableVocabPair; + P p1{makeIdTableFromVector({{Id::makeFromBool(true)}}), LocalVocab{}}; + co_yield p1; + }; + + auto [joinSideResults, scanResults] = + consumeGenerators(scan.prefilterTables(makeJoinSide(), 0)); + + ASSERT_EQ(scanResults.size(), 0); + ASSERT_EQ(joinSideResults.size(), 0); +} + +// _____________________________________________________________________________ +TEST_P(IndexScanWithLazyJoin, prefilterTablesDoesNotFilterOnUndefined) { + IndexScan scan = makeScan(); + + auto makeJoinSide = [](auto* self) -> Result::Generator { + using P = Result::IdTableVocabPair; + P p1{IdTable{1, makeAllocator()}, LocalVocab{}}; + co_yield p1; + P p2{makeIdTableFromVector({{Id::makeUndefined()}}), LocalVocab{}}; + co_yield p2; + P p3{makeIdTableFromVector({{Id::makeUndefined()}}), LocalVocab{}}; + co_yield p3; + P p4{IdTable{1, makeAllocator()}, LocalVocab{}}; + co_yield p4; + P p5{self->makeIdTable({iri(""), iri("")}), LocalVocab{}}; + co_yield p5; + P p6{self->makeIdTable({iri(""), iri(""), iri("")}), + LocalVocab{}}; + co_yield p6; + P p7{IdTable{1, makeAllocator()}, LocalVocab{}}; + co_yield p7; + }; + + auto [_, scanResults] = + consumeGenerators(scan.prefilterTables(makeJoinSide(this), 0)); + + ASSERT_EQ(scanResults.size(), 3); + EXPECT_TRUE(scanResults.at(0).localVocab_.empty()); + EXPECT_TRUE(scanResults.at(1).localVocab_.empty()); + EXPECT_TRUE(scanResults.at(2).localVocab_.empty()); + + EXPECT_EQ( + scanResults.at(0).idTable_, + tableFromTriples({{iri(""), iri("")}, {iri(""), iri("")}})); + + EXPECT_EQ( + scanResults.at(1).idTable_, + tableFromTriples({{iri(""), iri("")}, {iri(""), iri("")}})); + + EXPECT_EQ( + scanResults.at(2).idTable_, + tableFromTriples({{iri(""), iri("")}, {iri(""), iri("")}})); +} + +// _____________________________________________________________________________ +TEST_P(IndexScanWithLazyJoin, prefilterTablesDoesNotFilterWithSingleUndefined) { + IndexScan scan = makeScan(); + + auto makeJoinSide = []() -> Result::Generator { + using P = Result::IdTableVocabPair; + P p1{makeIdTableFromVector({{Id::makeUndefined()}}), LocalVocab{}}; + co_yield p1; + }; + + auto [_, scanResults] = + consumeGenerators(scan.prefilterTables(makeJoinSide(), 0)); + + ASSERT_EQ(scanResults.size(), 3); + EXPECT_TRUE(scanResults.at(0).localVocab_.empty()); + EXPECT_TRUE(scanResults.at(1).localVocab_.empty()); + EXPECT_TRUE(scanResults.at(2).localVocab_.empty()); + + EXPECT_EQ( + scanResults.at(0).idTable_, + tableFromTriples({{iri(""), iri("")}, {iri(""), iri("")}})); + + EXPECT_EQ( + scanResults.at(1).idTable_, + tableFromTriples({{iri(""), iri("")}, {iri(""), iri("")}})); + + EXPECT_EQ( + scanResults.at(2).idTable_, + tableFromTriples({{iri(""), iri("")}, {iri(""), iri("")}})); +} + +// _____________________________________________________________________________ +TEST_P(IndexScanWithLazyJoin, prefilterTablesWorksWithSingleEmptyTable) { + IndexScan scan = makeScan(); + + auto makeJoinSide = []() -> Result::Generator { + using P = Result::IdTableVocabPair; + P p{IdTable{1, makeAllocator()}, LocalVocab{}}; + co_yield p; + }; + + auto [_, scanResults] = + consumeGenerators(scan.prefilterTables(makeJoinSide(), 0)); + + ASSERT_EQ(scanResults.size(), 0); +} + +// _____________________________________________________________________________ +TEST_P(IndexScanWithLazyJoin, prefilterTablesWorksWithEmptyGenerator) { + IndexScan scan = makeScan(); + + auto makeJoinSide = []() -> Result::Generator { co_return; }; + + auto [_, scanResults] = + consumeGenerators(scan.prefilterTables(makeJoinSide(), 0)); + + ASSERT_EQ(scanResults.size(), 0); +} + +INSTANTIATE_TEST_SUITE_P(IndexScanWithLazyJoinSuite, IndexScanWithLazyJoin, + ::testing::Bool(), + [](const testing::TestParamInfo& info) { + return info.param ? "RightSideFirst" + : "LeftSideFirst"; + }); + +// _____________________________________________________________________________ +TEST(IndexScan, prefilterTablesWithEmptyIndexScanReturnsEmptyGenerators) { + auto* qec = getQec("

"); + // Match with something that does not match. + SparqlTriple xpy{Tc{Var{"?x"}}, "", Tc{Var{"?y"}}}; + IndexScan scan{qec, Permutation::PSO, xpy}; + + auto makeJoinSide = []() -> Result::Generator { + ADD_FAILURE() + << "The generator should not be consumed when the IndexScan is empty." + << std::endl; + co_return; + }; + + auto [leftGenerator, rightGenerator] = + scan.prefilterTables(makeJoinSide(), 0); + + EXPECT_EQ(leftGenerator.begin(), leftGenerator.end()); + EXPECT_EQ(rightGenerator.begin(), rightGenerator.end()); +} diff --git a/test/util/JoinHelpers.h b/test/util/JoinHelpers.h index c5f3aad417..6a46ddb91e 100644 --- a/test/util/JoinHelpers.h +++ b/test/util/JoinHelpers.h @@ -68,7 +68,19 @@ inline auto makeHashJoinLambda() { * `ad_utility::callFixedSize`. */ inline auto makeJoinLambda() { - return [J = Join{Join::InvalidOnlyForTestingJoinTag{}, - ad_utility::testing::getQec()}]( - auto&&... args) { return J.join(AD_FWD(args)...); }; + return [](const IdTable& a, ColumnIndex jc1, + const IdTable& b, ColumnIndex jc2, + IdTable* result) { + std::vector> leftVariables{{Variable{"?x"}}}; + leftVariables.resize(a.numColumns()); + std::vector> rightVariables{{Variable{"?x"}}}; + rightVariables.resize(b.numColumns()); + auto* qec = ad_utility::testing::getQec(); + auto leftTree = ad_utility::makeExecutionTree( + qec, a.clone(), std::move(leftVariables), false, std::vector{jc1}); + auto rightTree = ad_utility::makeExecutionTree( + qec, b.clone(), std::move(rightVariables), false, std::vector{jc2}); + Join join{qec, leftTree, rightTree, jc1, jc2, false}; + return join.join(a, b, result); + }; }