Skip to content

Commit

Permalink
Implement lazy Union operation (#1557)
Browse files Browse the repository at this point in the history
Allow the `Union` operation to lazily output its result.
  • Loading branch information
RobinTF authored Oct 18, 2024
1 parent 2b4e6b3 commit e53d783
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 46 deletions.
138 changes: 101 additions & 37 deletions src/engine/Union.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,24 @@ size_t Union::getCostEstimate() {
getSizeEstimateBeforeLimit();
}

ProtoResult Union::computeResult([[maybe_unused]] bool requestLaziness) {
ProtoResult Union::computeResult(bool requestLaziness) {
LOG(DEBUG) << "Union result computation..." << std::endl;
std::shared_ptr<const Result> subRes1 = _subtrees[0]->getResult();
std::shared_ptr<const Result> subRes2 = _subtrees[1]->getResult();
LOG(DEBUG) << "Union subresult computation done." << std::endl;
std::shared_ptr<const Result> subRes1 =
_subtrees[0]->getResult(requestLaziness);
std::shared_ptr<const Result> subRes2 =
_subtrees[1]->getResult(requestLaziness);

IdTable idTable{getExecutionContext()->getAllocator()};
if (requestLaziness) {
auto localVocab = std::make_shared<LocalVocab>();
auto generator =
computeResultLazily(std::move(subRes1), std::move(subRes2), localVocab);
return {std::move(generator), resultSortedOn(), std::move(localVocab)};
}

idTable.setNumColumns(getResultWidth());
Union::computeUnion(&idTable, subRes1->idTable(), subRes2->idTable(),
_columnOrigins);
LOG(DEBUG) << "Union subresult computation done." << std::endl;

IdTable idTable =
computeUnion(subRes1->idTable(), subRes2->idTable(), _columnOrigins);

LOG(DEBUG) << "Union result computation done" << std::endl;
// If only one of the two operands has a non-empty local vocabulary, share
Expand All @@ -177,43 +184,40 @@ ProtoResult Union::computeResult([[maybe_unused]] bool requestLaziness) {
Result::getMergedLocalVocab(*subRes1, *subRes2)};
}

void Union::computeUnion(
IdTable* resPtr, const IdTable& left, const IdTable& right,
const std::vector<std::array<size_t, 2>>& columnOrigins) {
IdTable& res = *resPtr;
res.resize(left.size() + right.size());

static constexpr size_t chunkSize = 1'000'000;
// _____________________________________________________________________________
void Union::copyChunked(auto beg, auto end, auto target) const {
size_t total = end - beg;
for (size_t i = 0; i < total; i += chunkSize) {
checkCancellation();
size_t actualEnd = std::min(i + chunkSize, total);
std::copy(beg + i, beg + actualEnd, target + i);
}
}

// A drop-in replacement for `std::copy` that performs the copying in chunks
// of `chunkSize` and checks the timeout after each chunk.
auto copyChunked = [this](auto beg, auto end, auto target) {
size_t total = end - beg;
for (size_t i = 0; i < total; i += chunkSize) {
checkCancellation();
size_t actualEnd = std::min(i + chunkSize, total);
std::copy(beg + i, beg + actualEnd, target + i);
}
};
// _____________________________________________________________________________
void Union::fillChunked(auto beg, auto end, const auto& value) const {
size_t total = end - beg;
for (size_t i = 0; i < total; i += chunkSize) {
checkCancellation();
size_t actualEnd = std::min(i + chunkSize, total);
std::fill(beg + i, beg + actualEnd, value);
}
};

// A similar timeout-checking replacement for `std::fill`.
auto fillChunked = [this](auto beg, auto end, const auto& value) {
size_t total = end - beg;
for (size_t i = 0; i < total; i += chunkSize) {
checkCancellation();
size_t actualEnd = std::min(i + chunkSize, total);
std::fill(beg + i, beg + actualEnd, value);
}
};
// _____________________________________________________________________________
IdTable Union::computeUnion(
const IdTable& left, const IdTable& right,
const std::vector<std::array<size_t, 2>>& columnOrigins) const {
IdTable res{getResultWidth(), getExecutionContext()->getAllocator()};
res.resize(left.size() + right.size());

// Write the column with the `inputColumnIndex` from the `inputTable` into the
// `targetColumn`. Always copy the complete input column and start at position
// `offset` in the target column. If the `inputColumnIndex` is `NO_COLUMN`,
// then the corresponding range in the `targetColumn` will be filled with
// UNDEF.
auto writeColumn = [&copyChunked, &fillChunked](
const auto& inputTable, auto& targetColumn,
size_t inputColumnIndex, size_t offset) {
auto writeColumn = [this](const auto& inputTable, auto& targetColumn,
size_t inputColumnIndex, size_t offset) {
if (inputColumnIndex != NO_COLUMN) {
decltype(auto) input = inputTable.getColumn(inputColumnIndex);
copyChunked(input.begin(), input.end(), targetColumn.begin() + offset);
Expand All @@ -233,4 +237,64 @@ void Union::computeUnion(
writeColumn(left, targetColumn, leftCol, 0u);
writeColumn(right, targetColumn, rightCol, left.size());
}
return res;
}

// _____________________________________________________________________________
template <bool left>
std::vector<ColumnIndex> Union::computePermutation() const {
constexpr size_t treeIndex = left ? 0 : 1;
ColumnIndex startOfUndefColumns = _subtrees[treeIndex]->getResultWidth();
std::vector<ColumnIndex> permutation{};
permutation.reserve(_columnOrigins.size());
for (const auto& columnOrigin : _columnOrigins) {
ColumnIndex originIndex = columnOrigin[treeIndex];
if (originIndex == NO_COLUMN) {
originIndex = startOfUndefColumns;
startOfUndefColumns++;
}
permutation.push_back(originIndex);
}
return permutation;
}

// _____________________________________________________________________________
IdTable Union::transformToCorrectColumnFormat(
IdTable idTable, const std::vector<ColumnIndex>& permutation) const {
while (idTable.numColumns() < getResultWidth()) {
idTable.addEmptyColumn();
auto column = idTable.getColumn(idTable.numColumns() - 1);
fillChunked(column.begin(), column.end(), Id::makeUndefined());
}

idTable.setColumnSubset(permutation);
return idTable;
}

// _____________________________________________________________________________
cppcoro::generator<IdTable> Union::computeResultLazily(
std::shared_ptr<const Result> result1,
std::shared_ptr<const Result> result2,
std::shared_ptr<LocalVocab> localVocab) const {
std::vector<ColumnIndex> permutation = computePermutation<true>();
if (result1->isFullyMaterialized()) {
co_yield transformToCorrectColumnFormat(result1->idTable().clone(),
permutation);
} else {
for (IdTable& idTable : result1->idTables()) {
co_yield transformToCorrectColumnFormat(std::move(idTable), permutation);
}
}
permutation = computePermutation<false>();
if (result2->isFullyMaterialized()) {
co_yield transformToCorrectColumnFormat(result2->idTable().clone(),
permutation);
} else {
for (IdTable& idTable : result2->idTables()) {
co_yield transformToCorrectColumnFormat(std::move(idTable), permutation);
}
}
std::array<const LocalVocab*, 2> vocabs{&result1->localVocab(),
&result2->localVocab()};
*localVocab = LocalVocab::merge(vocabs);
}
37 changes: 32 additions & 5 deletions src/engine/Union.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,45 @@ class Union : public Operation {

const static size_t NO_COLUMN;

static constexpr size_t chunkSize = 1'000'000;

// The method is declared here to make it unit testable
void computeUnion(IdTable* inputTable, const IdTable& left,
const IdTable& right,
const std::vector<std::array<size_t, 2>>& columnOrigins);
IdTable computeUnion(
const IdTable& left, const IdTable& right,
const std::vector<std::array<size_t, 2>>& columnOrigins) const;

vector<QueryExecutionTree*> getChildren() override {
return {_subtrees[0].get(), _subtrees[1].get()};
}

private:
virtual ProtoResult computeResult(
[[maybe_unused]] bool requestLaziness) override;
// A drop-in replacement for `std::copy` that performs the copying in chunks
// of `chunkSize` and checks the timeout after each chunk.
void copyChunked(auto beg, auto end, auto target) const;

// A similar timeout-checking replacement for `std::fill`.
void fillChunked(auto beg, auto end, const auto& value) const;

ProtoResult computeResult(bool requestLaziness) override;

VariableToColumnMap computeVariableToColumnMap() const override;

// Compute the permutation of the `IdTable` being yielded for the left or
// right child depending on `left`. This permutation can then be used to swap
// the columns without any copy operations.
template <bool left>
std::vector<ColumnIndex> computePermutation() const;

// Take the given `IdTable`, add any missing columns to it (filled with
// undefined values) and permutate the columns to match the end result.
IdTable transformToCorrectColumnFormat(
IdTable idTable, const std::vector<ColumnIndex>& permutation) const;

// Create a generator that yields the `IdTable` for the left or right child
// one after another and apply a potential differing permutation to it. Write
// the merged LocalVocab to the given `LocalVocab` object at the end.
cppcoro::generator<IdTable> computeResultLazily(
std::shared_ptr<const Result> result1,
std::shared_ptr<const Result> result2,
std::shared_ptr<LocalVocab> localVocab) const;
};
99 changes: 95 additions & 4 deletions test/UnionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ using Vars = std::vector<std::optional<Variable>>;
} // namespace

// A simple test for computing a union.
TEST(UnionTest, computeUnion) {
TEST(Union, computeUnion) {
auto* qec = ad_utility::testing::getQec();
IdTable left = makeIdTableFromVector({{V(1)}, {V(2)}, {V(3)}});
auto leftT = ad_utility::makeExecutionTree<ValuesForTesting>(
Expand All @@ -30,7 +30,7 @@ TEST(UnionTest, computeUnion) {
auto rightT = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, right.clone(), Vars{Variable{"?u"}, Variable{"?x"}});

Union u{ad_utility::testing::getQec(), leftT, rightT};
Union u{qec, leftT, rightT};
auto resultTable = u.computeResultOnlyForTesting();
const auto& result = resultTable.idTable();

Expand All @@ -42,7 +42,7 @@ TEST(UnionTest, computeUnion) {

// A test with large inputs to test the chunked writing that is caused by the
// timeout checks.
TEST(UnionTest, computeUnionLarge) {
TEST(Union, computeUnionLarge) {
auto* qec = ad_utility::testing::getQec();
VectorTable leftInput, rightInput, expected;
size_t numInputsL = 1'500'000u;
Expand All @@ -65,9 +65,100 @@ TEST(UnionTest, computeUnionLarge) {
auto rightT = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, makeIdTableFromVector(rightInput), Vars{Variable{"?u"}});

Union u{ad_utility::testing::getQec(), leftT, rightT};
Union u{qec, leftT, rightT};
auto resultTable = u.computeResultOnlyForTesting();
const auto& result = resultTable.idTable();

ASSERT_EQ(result, makeIdTableFromVector(expected));
}

// _____________________________________________________________________________
TEST(Union, computeUnionLazy) {
auto runTest = [](bool nonLazyChildren,
ad_utility::source_location loc =
ad_utility::source_location::current()) {
auto l = generateLocationTrace(loc);
auto* qec = ad_utility::testing::getQec();
qec->getQueryTreeCache().clearAll();
IdTable left = makeIdTableFromVector({{V(1)}, {V(2)}, {V(3)}});
auto leftT = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, std::move(left), Vars{Variable{"?x"}}, false,
std::vector<ColumnIndex>{}, LocalVocab{}, std::nullopt,
nonLazyChildren);

IdTable right = makeIdTableFromVector({{V(4), V(5)}, {V(6), V(7)}});
auto rightT = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, std::move(right), Vars{Variable{"?u"}, Variable{"?x"}}, false,
std::vector<ColumnIndex>{}, LocalVocab{}, std::nullopt,
nonLazyChildren);

Union u{qec, std::move(leftT), std::move(rightT)};
auto resultTable = u.computeResultOnlyForTesting(true);
ASSERT_FALSE(resultTable.isFullyMaterialized());
auto& result = resultTable.idTables();

auto U = Id::makeUndefined();
auto expected1 = makeIdTableFromVector({{V(1), U}, {V(2), U}, {V(3), U}});
auto expected2 = makeIdTableFromVector({{V(5), V(4)}, {V(7), V(6)}});

auto iterator = result.begin();
ASSERT_NE(iterator, result.end());
ASSERT_EQ(*iterator, expected1);

++iterator;
ASSERT_NE(iterator, result.end());
ASSERT_EQ(*iterator, expected2);

ASSERT_EQ(++iterator, result.end());
};

runTest(false);
runTest(true);
}

// _____________________________________________________________________________
TEST(Union, ensurePermutationIsAppliedCorrectly) {
using Var = Variable;
auto* qec = ad_utility::testing::getQec();
auto leftT = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, makeIdTableFromVector({{1, 2, 3, 4, 5}}),
Vars{Var{"?a"}, Var{"?b"}, Var{"?c"}, Var{"?d"}, Var{"?e"}});

auto rightT = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, makeIdTableFromVector({{6, 7, 8}}),
Vars{Var{"?b"}, Var{"?a"}, Var{"?e"}});

Union u{qec, std::move(leftT), std::move(rightT)};

{
qec->getQueryTreeCache().clearAll();
auto resultTable = u.computeResultOnlyForTesting(true);
ASSERT_FALSE(resultTable.isFullyMaterialized());
auto& result = resultTable.idTables();

auto U = Id::makeUndefined();
auto expected1 = makeIdTableFromVector({{1, 2, 3, 4, 5}});
auto expected2 = makeIdTableFromVector({{V(7), V(6), U, U, V(8)}});

auto iterator = result.begin();
ASSERT_NE(iterator, result.end());
ASSERT_EQ(*iterator, expected1);

++iterator;
ASSERT_NE(iterator, result.end());
ASSERT_EQ(*iterator, expected2);

ASSERT_EQ(++iterator, result.end());
}

{
qec->getQueryTreeCache().clearAll();
auto resultTable = u.computeResultOnlyForTesting();
ASSERT_TRUE(resultTable.isFullyMaterialized());

auto U = Id::makeUndefined();
auto expected =
makeIdTableFromVector({{1, 2, 3, 4, 5}, {V(7), V(6), U, U, V(8)}});
EXPECT_EQ(resultTable.idTable(), expected);
}
}

0 comments on commit e53d783

Please sign in to comment.