Skip to content

Commit

Permalink
Add convenience Result::Generatortype
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinTF committed Oct 21, 2024
1 parent 918e501 commit 2c6556b
Show file tree
Hide file tree
Showing 18 changed files with 81 additions and 105 deletions.
9 changes: 5 additions & 4 deletions src/engine/Bind.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ ProtoResult Bind::computeResult(bool requestLaziness) {
if (subRes->isFullyMaterialized()) {
if (requestLaziness && subRes->idTable().size() > CHUNK_SIZE) {
return {
[](auto applyBind, std::shared_ptr<const Result> result)
-> cppcoro::generator<Result::IdTableVocabPair> {
[](auto applyBind,
std::shared_ptr<const Result> result) -> Result::Generator {
size_t size = result->idTable().size();
for (size_t offset = 0; offset < size; offset += CHUNK_SIZE) {
LocalVocab outVocab = result->getCopyOfLocalVocab();
Expand All @@ -132,8 +132,9 @@ ProtoResult Bind::computeResult(bool requestLaziness) {
return {std::move(result), resultSortedOn(), std::move(localVocab)};
}
auto localVocab = std::make_shared<LocalVocab>();
auto generator = [](auto applyBind, std::shared_ptr<const Result> result)
-> cppcoro::generator<Result::IdTableVocabPair> {
auto generator =
[](auto applyBind,
std::shared_ptr<const Result> result) -> Result::Generator {
for (Result::IdTableVocabPair& pair : result->idTables()) {
IdTable idTable = applyBind(std::move(pair.idTable_), &pair.localVocab_);
co_yield {std::move(idTable), std::move(pair.localVocab_)};
Expand Down
3 changes: 1 addition & 2 deletions src/engine/Filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ ProtoResult Filter::computeResult(bool requestLaziness) {
}

if (requestLaziness) {
return {[](auto subRes,
auto* self) -> cppcoro::generator<Result::IdTableVocabPair> {
return {[](auto subRes, auto* self) -> Result::Generator {
for (Result::IdTableVocabPair& pair : subRes->idTables()) {
IdTable result = self->filterIdTable(
subRes->sortedBy(), pair.idTable_, pair.localVocab_);
Expand Down
4 changes: 2 additions & 2 deletions src/engine/GroupBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ ProtoResult GroupBy::computeResult(bool requestLaziness) {
if (!subresult->isFullyMaterialized()) {
AD_CORRECTNESS_CHECK(metadataForUnsequentialData.has_value());

cppcoro::generator<Result::IdTableVocabPair> generator = CALL_FIXED_SIZE(
Result::Generator generator = CALL_FIXED_SIZE(
(std::array{inWidth, outWidth}), &GroupBy::computeResultLazily, this,
std::move(subresult), std::move(aggregates),
std::move(metadataForUnsequentialData).value().aggregateAliases_,
Expand Down Expand Up @@ -467,7 +467,7 @@ void GroupBy::processEmptyImplicitGroup(

// _____________________________________________________________________________
template <size_t IN_WIDTH, size_t OUT_WIDTH>
cppcoro::generator<Result::IdTableVocabPair> GroupBy::computeResultLazily(
Result::Generator GroupBy::computeResultLazily(
std::shared_ptr<const Result> subresult, std::vector<Aggregate> aggregates,
std::vector<HashMapAliasInformation> aggregateAliases,
std::vector<size_t> groupByCols, bool singleIdTable) const {
Expand Down
2 changes: 1 addition & 1 deletion src/engine/GroupBy.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class GroupBy : public Operation {
// skipping empty tables unless `singleIdTable` is set which causes the
// function to yield a single id table with the complete result.
template <size_t IN_WIDTH, size_t OUT_WIDTH>
cppcoro::generator<Result::IdTableVocabPair> computeResultLazily(
Result::Generator computeResultLazily(
std::shared_ptr<const Result> subresult,
std::vector<Aggregate> aggregates,
std::vector<HashMapAliasInformation> aggregateAliases,
Expand Down
2 changes: 1 addition & 1 deletion src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ VariableToColumnMap IndexScan::computeVariableToColumnMap() const {
}

// _____________________________________________________________________________
cppcoro::generator<Result::IdTableVocabPair> IndexScan::scanInChunks() const {
Result::Generator IndexScan::scanInChunks() const {
auto metadata = getMetadataForScan();
if (!metadata.has_value()) {
co_return;
Expand Down
2 changes: 1 addition & 1 deletion src/engine/IndexScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class IndexScan final : public Operation {

VariableToColumnMap computeVariableToColumnMap() const override;

cppcoro::generator<Result::IdTableVocabPair> scanInChunks() const;
Result::Generator scanInChunks() const;

// Helper functions for the public `getLazyScanFor...` functions (see above).
Permutation::IdTableGenerator getLazyScan(
Expand Down
31 changes: 12 additions & 19 deletions src/engine/Result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,8 @@ Result::Result(IdTableVocabPair pair, std::vector<ColumnIndex> sortedBy)
std::move(pair.localVocab_)} {}

// _____________________________________________________________________________
Result::Result(cppcoro::generator<IdTableVocabPair> idTables,
std::vector<ColumnIndex> sortedBy)
: data_{GenContainer{[](auto idTables, auto sortedBy)
-> cppcoro::generator<IdTableVocabPair> {
Result::Result(Generator idTables, std::vector<ColumnIndex> sortedBy)
: data_{GenContainer{[](auto idTables, auto sortedBy) -> Generator {
std::optional<IdTable::row_type> previousId = std::nullopt;
for (IdTableVocabPair& pair : idTables) {
auto& idTable = pair.idTable_;
Expand Down Expand Up @@ -126,10 +124,8 @@ void Result::applyLimitOffset(
limitOffset);
limitTimeCallback(limitTimer.msecs(), idTable());
} else {
auto generator =
[](cppcoro::generator<IdTableVocabPair> original,
LimitOffsetClause limitOffset,
auto limitTimeCallback) -> cppcoro::generator<IdTableVocabPair> {
auto generator = [](Generator original, LimitOffsetClause limitOffset,
auto limitTimeCallback) -> Generator {
if (limitOffset._limit.value_or(1) == 0) {
co_return;
}
Expand Down Expand Up @@ -164,9 +160,8 @@ void Result::assertThatLimitWasRespected(const LimitOffsetClause& limitOffset) {
auto limit = limitOffset._limit;
AD_CONTRACT_CHECK(!limit.has_value() || numRows <= limit.value());
} else {
auto generator = [](cppcoro::generator<IdTableVocabPair> original,
LimitOffsetClause limitOffset)
-> cppcoro::generator<IdTableVocabPair> {
auto generator = [](Generator original,
LimitOffsetClause limitOffset) -> Generator {
auto limit = limitOffset._limit;
uint64_t elementCount = 0;
for (IdTableVocabPair& pair : original) {
Expand Down Expand Up @@ -197,10 +192,9 @@ void Result::checkDefinedness(const VariableToColumnMap& varColMap) {
AD_EXPENSIVE_CHECK(performCheck(
varColMap, std::get<IdTableSharedLocalVocabPair>(data_).idTable_));
} else {
auto generator = [](cppcoro::generator<IdTableVocabPair> original,
auto generator = [](Generator original,
[[maybe_unused]] VariableToColumnMap varColMap,
[[maybe_unused]] auto performCheck)
-> cppcoro::generator<IdTableVocabPair> {
[[maybe_unused]] auto performCheck) -> Generator {
for (IdTableVocabPair& pair : original) {
// No need to check subsequent idTables assuming the datatypes
// don't change mid result.
Expand All @@ -217,9 +211,8 @@ void Result::runOnNewChunkComputed(
std::function<void(const IdTable&, std::chrono::microseconds)> onNewChunk,
std::function<void(bool)> onGeneratorFinished) {
AD_CONTRACT_CHECK(!isFullyMaterialized());
auto generator =
[](cppcoro::generator<IdTableVocabPair> original, auto onNewChunk,
auto onGeneratorFinished) -> cppcoro::generator<IdTableVocabPair> {
auto generator = [](Generator original, auto onNewChunk,
auto onGeneratorFinished) -> Generator {
// Call this within destructor to make sure it is also called when an
// operation stops iterating before reaching the end.
absl::Cleanup cleanup{
Expand All @@ -237,7 +230,7 @@ void Result::runOnNewChunkComputed(
throw;
}
}(std::move(idTables()), std::move(onNewChunk),
std::move(onGeneratorFinished));
std::move(onGeneratorFinished));
data_.emplace<GenContainer>(std::move(generator));
}

Expand All @@ -260,7 +253,7 @@ const IdTable& Result::idTable() const {
}

// _____________________________________________________________________________
cppcoro::generator<Result::IdTableVocabPair>& Result::idTables() const {
Result::Generator& Result::idTables() const {
AD_CONTRACT_CHECK(!isFullyMaterialized());
const auto& container = std::get<GenContainer>(data_);
AD_CONTRACT_CHECK(!container.consumed_->exchange(true));
Expand Down
11 changes: 6 additions & 5 deletions src/engine/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ class Result {
: idTable_{std::move(idTable)}, localVocab_{std::move(localVocab)} {}
};

using Generator = cppcoro::generator<IdTableVocabPair>;

private:
// Needs to be mutable in order to be consumable from a const result.
struct GenContainer {
mutable cppcoro::generator<IdTableVocabPair> generator_;
mutable Generator generator_;
mutable std::unique_ptr<std::atomic_bool> consumed_ =
std::make_unique<std::atomic_bool>(false);
explicit GenContainer(cppcoro::generator<IdTableVocabPair> generator)
explicit GenContainer(Generator generator)
: generator_{std::move(generator)} {}
};

Expand Down Expand Up @@ -107,8 +109,7 @@ class Result {
Result(IdTable idTable, std::vector<ColumnIndex> sortedBy,
LocalVocab&& localVocab);
Result(IdTableVocabPair pair, std::vector<ColumnIndex> sortedBy);
Result(cppcoro::generator<IdTableVocabPair> idTables,
std::vector<ColumnIndex> sortedBy);
Result(Generator idTables, std::vector<ColumnIndex> sortedBy);
// Prevent accidental copying of a result table.
Result(const Result& other) = delete;
Result& operator=(const Result& other) = delete;
Expand Down Expand Up @@ -153,7 +154,7 @@ class Result {

// Access to the underlying `IdTable`s. Throw an `ad_utility::Exception`
// if the underlying `data_` member holds the wrong variant.
cppcoro::generator<IdTableVocabPair>& idTables() const;
Generator& idTables() const;

// Const access to the columns by which the `idTable()` is sorted.
const std::vector<ColumnIndex>& sortedBy() const { return sortedBy_; }
Expand Down
2 changes: 1 addition & 1 deletion src/engine/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ void Service::writeJsonResult(const std::vector<std::string>& vars,
}

// ____________________________________________________________________________
cppcoro::generator<Result::IdTableVocabPair> Service::computeResultLazily(
Result::Generator Service::computeResultLazily(
const std::vector<std::string> vars,
ad_utility::LazyJsonParser::Generator body, bool singleIdTable) {
LocalVocab localVocab{};
Expand Down
2 changes: 1 addition & 1 deletion src/engine/Service.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class Service : public Operation {

// Compute the result lazy as IdTable generator.
// If the `singleIdTable` flag is set, the result is yielded as one idTable.
cppcoro::generator<Result::IdTableVocabPair> computeResultLazily(
Result::Generator computeResultLazily(
const std::vector<std::string> vars,
ad_utility::LazyJsonParser::Generator body, bool singleIdTable);
};
2 changes: 1 addition & 1 deletion src/engine/Union.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ IdTable Union::transformToCorrectColumnFormat(
}

// _____________________________________________________________________________
cppcoro::generator<Result::IdTableVocabPair> Union::computeResultLazily(
Result::Generator Union::computeResultLazily(
std::shared_ptr<const Result> result1,
std::shared_ptr<const Result> result2) const {
std::vector<ColumnIndex> permutation = computePermutation<true>();
Expand Down
2 changes: 1 addition & 1 deletion src/engine/Union.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class Union : public Operation {
// Create a generator that yields the `IdTable` for the left or right child
// one after another and apply a potential differing permutation to it. Write
// the merged LocalVocab to the given `LocalVocab` object at the end.
cppcoro::generator<Result::IdTableVocabPair> computeResultLazily(
Result::Generator computeResultLazily(
std::shared_ptr<const Result> result1,
std::shared_ptr<const Result> result2) const;
};
21 changes: 9 additions & 12 deletions test/ExportQueryExecutionTreesTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1228,9 +1228,8 @@ TEST(ExportQueryExecutionTrees, getIdTablesReturnsSingletonIterator) {
TEST(ExportQueryExecutionTrees, getIdTablesMirrorsGenerator) {
IdTable idTable1 = makeIdTableFromVector({{1}, {2}, {3}});
IdTable idTable2 = makeIdTableFromVector({{42}, {1337}});
auto tableGenerator =
[](IdTable idTableA,
IdTable idTableB) -> cppcoro::generator<Result::IdTableVocabPair> {
auto tableGenerator = [](IdTable idTableA,
IdTable idTableB) -> Result::Generator {
co_yield {std::move(idTableA), LocalVocab{}};

co_yield {std::move(idTableB), LocalVocab{}};
Expand All @@ -1245,7 +1244,7 @@ TEST(ExportQueryExecutionTrees, getIdTablesMirrorsGenerator) {

// _____________________________________________________________________________
TEST(ExportQueryExecutionTrees, ensureCorrectSlicingOfSingleIdTable) {
auto tableGenerator = []() -> cppcoro::generator<Result::IdTableVocabPair> {
auto tableGenerator = []() -> Result::Generator {
Result::IdTableVocabPair pair1{makeIdTableFromVector({{1}, {2}, {3}}),
LocalVocab{}};
co_yield pair1;
Expand All @@ -1263,7 +1262,7 @@ TEST(ExportQueryExecutionTrees, ensureCorrectSlicingOfSingleIdTable) {
// _____________________________________________________________________________
TEST(ExportQueryExecutionTrees,
ensureCorrectSlicingOfIdTablesWhenFirstIsSkipped) {
auto tableGenerator = []() -> cppcoro::generator<Result::IdTableVocabPair> {
auto tableGenerator = []() -> Result::Generator {
Result::IdTableVocabPair pair1{makeIdTableFromVector({{1}, {2}, {3}}),
LocalVocab{}};
co_yield pair1;
Expand All @@ -1286,7 +1285,7 @@ TEST(ExportQueryExecutionTrees,
// _____________________________________________________________________________
TEST(ExportQueryExecutionTrees,
ensureCorrectSlicingOfIdTablesWhenLastIsSkipped) {
auto tableGenerator = []() -> cppcoro::generator<Result::IdTableVocabPair> {
auto tableGenerator = []() -> Result::Generator {
Result::IdTableVocabPair pair1{makeIdTableFromVector({{1}, {2}, {3}}),
LocalVocab{}};
co_yield pair1;
Expand All @@ -1309,7 +1308,7 @@ TEST(ExportQueryExecutionTrees,
// _____________________________________________________________________________
TEST(ExportQueryExecutionTrees,
ensureCorrectSlicingOfIdTablesWhenFirstAndSecondArePartial) {
auto tableGenerator = []() -> cppcoro::generator<Result::IdTableVocabPair> {
auto tableGenerator = []() -> Result::Generator {
Result::IdTableVocabPair pair1{makeIdTableFromVector({{1}, {2}, {3}}),
LocalVocab{}};
co_yield pair1;
Expand All @@ -1333,7 +1332,7 @@ TEST(ExportQueryExecutionTrees,
// _____________________________________________________________________________
TEST(ExportQueryExecutionTrees,
ensureCorrectSlicingOfIdTablesWhenFirstAndLastArePartial) {
auto tableGenerator = []() -> cppcoro::generator<Result::IdTableVocabPair> {
auto tableGenerator = []() -> Result::Generator {
Result::IdTableVocabPair pair1{makeIdTableFromVector({{1}, {2}, {3}}),
LocalVocab{}};
co_yield pair1;
Expand Down Expand Up @@ -1363,8 +1362,7 @@ TEST(ExportQueryExecutionTrees,
// _____________________________________________________________________________
TEST(ExportQueryExecutionTrees, ensureGeneratorIsNotConsumedWhenNotRequired) {
{
auto throwingGenerator =
[]() -> cppcoro::generator<Result::IdTableVocabPair> {
auto throwingGenerator = []() -> Result::Generator {
ADD_FAILURE() << "Generator was started" << std::endl;
throw std::runtime_error("Generator was started");
co_return;
Expand All @@ -1377,8 +1375,7 @@ TEST(ExportQueryExecutionTrees, ensureGeneratorIsNotConsumedWhenNotRequired) {
}

{
auto throwAfterYieldGenerator =
[]() -> cppcoro::generator<Result::IdTableVocabPair> {
auto throwAfterYieldGenerator = []() -> Result::Generator {
Result::IdTableVocabPair pair1{makeIdTableFromVector({{1}}),
LocalVocab{}};
co_yield pair1;
Expand Down
3 changes: 1 addition & 2 deletions test/FilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ namespace {
ValueId asBool(bool value) { return Id::makeFromBool(value); }

// Convert a generator to a vector for easier comparison in assertions
std::vector<IdTable> toVector(
cppcoro::generator<Result::IdTableVocabPair> generator) {
std::vector<IdTable> toVector(Result::Generator generator) {
std::vector<IdTable> result;
for (auto& pair : generator) {
result.push_back(std::move(pair.idTable_));
Expand Down
8 changes: 2 additions & 6 deletions test/OperationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,7 @@ TEST(Operation, ensureSignalUpdateIsOnlyCalledEvery50msAndAtTheEnd) {
index, &cache, makeAllocator(ad_utility::MemorySize::megabytes(100)),
SortPerformanceEstimator{}, [&](std::string) { ++updateCallCounter; }};
CustomGeneratorOperation operation{
&context,
[](const IdTable& idTable)
-> cppcoro::generator<Result::IdTableVocabPair> {
&context, [](const IdTable& idTable) -> Result::Generator {
std::this_thread::sleep_for(50ms);
co_yield {idTable.clone(), LocalVocab{}};
// This one should not trigger because it's below the 50ms threshold
Expand Down Expand Up @@ -451,9 +449,7 @@ TEST(Operation, ensureSignalUpdateIsCalledAtTheEndOfPartialConsumption) {
index, &cache, makeAllocator(ad_utility::MemorySize::megabytes(100)),
SortPerformanceEstimator{}, [&](std::string) { ++updateCallCounter; }};
CustomGeneratorOperation operation{
&context,
[](const IdTable& idTable)
-> cppcoro::generator<Result::IdTableVocabPair> {
&context, [](const IdTable& idTable) -> Result::Generator {
co_yield {idTable.clone(), LocalVocab{}};
co_yield {idTable.clone(), LocalVocab{}};
}(idTable)};
Expand Down
Loading

0 comments on commit 2c6556b

Please sign in to comment.