diff --git a/src/engine/IndexScan.cpp b/src/engine/IndexScan.cpp index 1e6b71981d..f56123a42b 100644 --- a/src/engine/IndexScan.cpp +++ b/src/engine/IndexScan.cpp @@ -532,39 +532,43 @@ struct IndexScan::SharedGeneratorState { } // Consume the next non-empty table from the generator and calculate the next - // matching blocks from the index scan. + // matching blocks from the index scan. This function guarantees that after + // it returns, both `prefetchedValues` and `pendingBlocks` contain at least + // one element. void fetch() { - advanceInputToNextNonEmptyTable(); - if (doneFetching_) { - return; - } - auto& idTable = iterator_.value()->idTable_; - auto joinColumn = idTable.getColumn(joinColumn_); - AD_EXPENSIVE_CHECK(std::ranges::is_sorted(joinColumn)); - AD_CORRECTNESS_CHECK(!joinColumn.empty()); - // Skip processing for undef case, it will be handled differently - if (hasUndef_) { - return; - } - AD_CORRECTNESS_CHECK(!joinColumn[0].isUndefined()); - auto newBlocks = - CompressedRelationReader::getBlocksForJoin(joinColumn, metaBlocks_); - if (newBlocks.empty()) { - // The current input table matches no blocks, so we don't have to yield - // it. - return; + while (prefetchedValues_.empty() || pendingBlocks_.empty()) { + advanceInputToNextNonEmptyTable(); + if (doneFetching_) { + return; + } + auto& idTable = iterator_.value()->idTable_; + auto joinColumn = idTable.getColumn(joinColumn_); + AD_EXPENSIVE_CHECK(std::ranges::is_sorted(joinColumn)); + AD_CORRECTNESS_CHECK(!joinColumn.empty()); + // Skip processing for undef case, it will be handled differently + if (hasUndef_) { + return; + } + AD_CORRECTNESS_CHECK(!joinColumn[0].isUndefined()); + auto newBlocks = + CompressedRelationReader::getBlocksForJoin(joinColumn, metaBlocks_); + if (newBlocks.empty()) { + // The current input table matches no blocks, so we don't have to yield + // it. + continue; + } + prefetchedValues_.push_back(std::move(*iterator_.value())); + // Find first value that differs from the last one that was used to find + // matching blocks. + auto startIterator = + lastBlockIndex_.has_value() + ? std::ranges::upper_bound(newBlocks, lastBlockIndex_.value(), {}, + &CompressedBlockMetadata::blockIndex_) + : newBlocks.begin(); + lastBlockIndex_ = newBlocks.back().blockIndex_; + std::ranges::move(startIterator, newBlocks.end(), + std::back_inserter(pendingBlocks_)); } - prefetchedValues_.push_back(std::move(*iterator_.value())); - // Find first value that differs from the last one that was used to find - // matching blocks. - auto startIterator = - lastBlockIndex_.has_value() - ? std::ranges::upper_bound(newBlocks, lastBlockIndex_.value(), {}, - &CompressedBlockMetadata::blockIndex_) - : newBlocks.begin(); - lastBlockIndex_ = newBlocks.back().blockIndex_; - std::ranges::move(startIterator, newBlocks.end(), - std::back_inserter(pendingBlocks_)); } // Check if there are any undefined values yielded by the original generator. diff --git a/test/engine/IndexScanTest.cpp b/test/engine/IndexScanTest.cpp index 90afa3ce22..6663beaf3f 100644 --- a/test/engine/IndexScanTest.cpp +++ b/test/engine/IndexScanTest.cpp @@ -939,6 +939,8 @@ TEST_P(IndexScanWithLazyJoin, using P = Result::IdTableVocabPair; P p1{makeIdTableFromVector({{Id::makeFromBool(true)}}), LocalVocab{}}; co_yield p1; + P p2{makeIdTableFromVector({{Id::makeFromBool(true)}}), LocalVocab{}}; + co_yield p2; }; auto [joinSideResults, scanResults] =