Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use one LocalVocab for each block during lazy evaluation #1567

Merged
merged 10 commits into from
Oct 22, 2024
56 changes: 26 additions & 30 deletions src/engine/Bind.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,25 @@

auto applyBind = [this, subRes](IdTable idTable, LocalVocab* localVocab) {
return computeExpressionBind(localVocab, std::move(idTable),
subRes->localVocab(),
_bind._expression.getPimpl());
};

if (subRes->isFullyMaterialized()) {
if (requestLaziness && subRes->idTable().size() > CHUNK_SIZE) {
auto localVocab =
std::make_shared<LocalVocab>(subRes->getCopyOfLocalVocab());
auto generator = [](std::shared_ptr<LocalVocab> vocab, auto applyBind,
std::shared_ptr<const Result> result)
-> cppcoro::generator<IdTable> {
size_t size = result->idTable().size();
for (size_t offset = 0; offset < size; offset += CHUNK_SIZE) {
co_yield applyBind(
cloneSubView(result->idTable(),
{offset, std::min(size, offset + CHUNK_SIZE)}),
vocab.get());
}
}(localVocab, std::move(applyBind), std::move(subRes));
return {std::move(generator), resultSortedOn(), std::move(localVocab)};
return {
[](auto applyBind,
std::shared_ptr<const Result> result) -> Result::Generator {
size_t size = result->idTable().size();
for (size_t offset = 0; offset < size; offset += CHUNK_SIZE) {
LocalVocab outVocab = result->getCopyOfLocalVocab();
IdTable idTable = applyBind(
cloneSubView(result->idTable(),
{offset, std::min(size, offset + CHUNK_SIZE)}),
&outVocab);
co_yield {std::move(idTable), std::move(outVocab)};
}
}(std::move(applyBind), std::move(subRes)),
resultSortedOn()};
}
// Make a deep copy of the local vocab from `subRes` and then add to it (in
// case BIND adds a new word or words).
Expand All @@ -132,28 +131,25 @@
LOG(DEBUG) << "BIND result computation done." << std::endl;
return {std::move(result), resultSortedOn(), std::move(localVocab)};
}
auto localVocab = std::make_shared<LocalVocab>();
auto generator =
[](std::shared_ptr<LocalVocab> vocab, auto applyBind,
std::shared_ptr<const Result> result) -> cppcoro::generator<IdTable> {
for (IdTable& idTable : result->idTables()) {
co_yield applyBind(std::move(idTable), vocab.get());
[](auto applyBind,
std::shared_ptr<const Result> result) -> Result::Generator {
for (auto& [idTable, localVocab] : result->idTables()) {
IdTable resultTable = applyBind(std::move(idTable), &localVocab);
co_yield {std::move(resultTable), std::move(localVocab)};
}
std::array<const LocalVocab*, 2> vocabs{vocab.get(), &result->localVocab()};
*vocab = LocalVocab::merge(std::span{vocabs});
}(localVocab, std::move(applyBind), std::move(subRes));
return {std::move(generator), resultSortedOn(), std::move(localVocab)};
}(std::move(applyBind), std::move(subRes));
return {std::move(generator), resultSortedOn()};
}

// _____________________________________________________________________________
IdTable Bind::computeExpressionBind(
LocalVocab* outputLocalVocab, IdTable idTable,
const LocalVocab& inputLocalVocab,
LocalVocab* localVocab, IdTable idTable,
const sparqlExpression::SparqlExpression* expression) const {
sparqlExpression::EvaluationContext evaluationContext(
*getExecutionContext(), _subtree->getVariableColumns(), idTable,
getExecutionContext()->getAllocator(), inputLocalVocab,
cancellationHandle_, deadline_);
getExecutionContext()->getAllocator(), *localVocab, cancellationHandle_,
deadline_);

sparqlExpression::ExpressionResult expressionResult =
expression->evaluate(&evaluationContext);
Expand Down Expand Up @@ -188,7 +184,7 @@
if (it != resultGenerator.end()) {
Id constantId =
sparqlExpression::detail::constantExpressionResultToId(
std::move(*it), *outputLocalVocab);
std::move(*it), *localVocab);
checkCancellation();
ad_utility::chunkedFill(outputColumn, constantId, CHUNK_SIZE,
[this]() { checkCancellation(); });
Expand All @@ -199,7 +195,7 @@
for (auto& resultValue : resultGenerator) {
outputColumn[i] =
sparqlExpression::detail::constantExpressionResultToId(
std::move(resultValue), *outputLocalVocab);
std::move(resultValue), *localVocab);

Check warning on line 198 in src/engine/Bind.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Bind.cpp#L198

Added line #L198 was not covered by tests
i++;
checkCancellation();
}
Expand Down
3 changes: 1 addition & 2 deletions src/engine/Bind.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ class Bind : public Operation {

// Implementation for the binding of arbitrary expressions.
IdTable computeExpressionBind(
LocalVocab* outputLocalVocab, IdTable idTable,
const LocalVocab& inputLocalVocab,
LocalVocab* localVocab, IdTable idTable,
const sparqlExpression::SparqlExpression* expression) const;

[[nodiscard]] VariableToColumnMap computeVariableToColumnMap() const override;
Expand Down
69 changes: 38 additions & 31 deletions src/engine/ExportQueryExecutionTrees.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
#include "util/http/MediaTypes.h"

// __________________________________________________________________________
cppcoro::generator<const IdTable&> ExportQueryExecutionTrees::getIdTables(
const Result& result) {
cppcoro::generator<ExportQueryExecutionTrees::TableConstRefWithVocab>
ExportQueryExecutionTrees::getIdTables(const Result& result) {
if (result.isFullyMaterialized()) {
co_yield result.idTable();
TableConstRefWithVocab pair{result.idTable(), result.localVocab()};
co_yield pair;
} else {
for (const IdTable& idTable : result.idTables()) {
co_yield idTable;
for (const Result::IdTableVocabPair& pair : result.idTables()) {
TableConstRefWithVocab tableWithVocab{pair.idTable_, pair.localVocab_};
co_yield tableWithVocab;
}
}
}
Expand All @@ -33,11 +35,14 @@ ExportQueryExecutionTrees::getRowIndices(LimitOffsetClause limitOffset,
if (limitOffset._limit.value_or(1) == 0) {
co_return;
}
for (const IdTable& idTable : getIdTables(result)) {
uint64_t currentOffset = limitOffset.actualOffset(idTable.numRows());
uint64_t upperBound = limitOffset.upperBound(idTable.numRows());
for (TableConstRefWithVocab& tableWithVocab : getIdTables(result)) {
uint64_t currentOffset =
limitOffset.actualOffset(tableWithVocab.idTable_.numRows());
uint64_t upperBound =
limitOffset.upperBound(tableWithVocab.idTable_.numRows());
if (currentOffset != upperBound) {
co_yield {idTable, std::views::iota(currentOffset, upperBound)};
co_yield {std::move(tableWithVocab),
std::views::iota(currentOffset, upperBound)};
}
limitOffset._offset -= currentOffset;
if (limitOffset._limit.has_value()) {
Expand All @@ -57,9 +62,10 @@ ExportQueryExecutionTrees::constructQueryResultToTriples(
const ad_utility::sparql_types::Triples& constructTriples,
LimitOffsetClause limitAndOffset, std::shared_ptr<const Result> result,
CancellationHandle cancellationHandle) {
for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) {
for (const auto& [pair, range] : getRowIndices(limitAndOffset, *result)) {
auto& idTable = pair.idTable_;
for (uint64_t i : range) {
ConstructQueryExportContext context{i, idTable, result->localVocab(),
ConstructQueryExportContext context{i, idTable, pair.localVocab_,
qet.getVariableColumns(),
qet.getQec()->getIndex()};
using enum PositionInTriple;
Expand Down Expand Up @@ -172,10 +178,10 @@ ExportQueryExecutionTrees::idTableToQLeverJSONBindings(
std::shared_ptr<const Result> result,
CancellationHandle cancellationHandle) {
AD_CORRECTNESS_CHECK(result != nullptr);
for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) {
for (const auto& [pair, range] : getRowIndices(limitAndOffset, *result)) {
for (uint64_t rowIndex : range) {
co_yield idTableToQLeverJSONRow(qet, columns, result->localVocab(),
rowIndex, idTable)
co_yield idTableToQLeverJSONRow(qet, columns, pair.localVocab_, rowIndex,
pair.idTable_)
.dump();
cancellationHandle->throwIfCancelled();
}
Expand Down Expand Up @@ -405,14 +411,14 @@ ExportQueryExecutionTrees::selectQueryResultToStream(

// special case : binary export of IdTable
if constexpr (format == MediaType::octetStream) {
for (const auto& [idTable, range] :
getRowIndices(limitAndOffset, *result)) {
for (const auto& [pair, range] : getRowIndices(limitAndOffset, *result)) {
for (uint64_t i : range) {
for (const auto& columnIndex : selectedColumnIndices) {
if (columnIndex.has_value()) {
co_yield std::string_view{reinterpret_cast<const char*>(&idTable(
i, columnIndex.value().columnIndex_)),
sizeof(Id)};
co_yield std::string_view{
reinterpret_cast<const char*>(
&pair.idTable_(i, columnIndex.value().columnIndex_)),
sizeof(Id)};
}
}
cancellationHandle->throwIfCancelled();
Expand All @@ -436,15 +442,15 @@ ExportQueryExecutionTrees::selectQueryResultToStream(
constexpr auto& escapeFunction = format == MediaType::tsv
? RdfEscaping::escapeForTsv
: RdfEscaping::escapeForCsv;
for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) {
for (const auto& [pair, range] : getRowIndices(limitAndOffset, *result)) {
for (uint64_t i : range) {
for (size_t j = 0; j < selectedColumnIndices.size(); ++j) {
if (selectedColumnIndices[j].has_value()) {
const auto& val = selectedColumnIndices[j].value();
Id id = idTable(i, val.columnIndex_);
Id id = pair.idTable_(i, val.columnIndex_);
auto optionalStringAndType =
idToStringAndType<format == MediaType::csv>(
qet.getQec()->getIndex(), id, result->localVocab(),
qet.getQec()->getIndex(), id, pair.localVocab_,
escapeFunction);
if (optionalStringAndType.has_value()) [[likely]] {
co_yield optionalStringAndType.value().first;
Expand Down Expand Up @@ -561,15 +567,15 @@ ad_utility::streams::stream_generator ExportQueryExecutionTrees::
auto selectedColumnIndices =
qet.selectedVariablesToColumnIndices(selectClause, false);
// TODO<joka921> we could prefilter for the nonexisting variables.
for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) {
for (const auto& [pair, range] : getRowIndices(limitAndOffset, *result)) {
for (uint64_t i : range) {
co_yield "\n <result>";
for (size_t j = 0; j < selectedColumnIndices.size(); ++j) {
if (selectedColumnIndices[j].has_value()) {
const auto& val = selectedColumnIndices[j].value();
Id id = idTable(i, val.columnIndex_);
Id id = pair.idTable_(i, val.columnIndex_);
co_yield idToXMLBinding(val.variable_, id, qet.getQec()->getIndex(),
result->localVocab());
pair.localVocab_);
}
}
co_yield "\n </result>";
Expand Down Expand Up @@ -611,12 +617,13 @@ ad_utility::streams::stream_generator ExportQueryExecutionTrees::
co_return;
}

auto getBinding = [&](const IdTable& idTable, const uint64_t& i) {
auto getBinding = [&](const IdTable& idTable, const uint64_t& i,
const LocalVocab& localVocab) {
nlohmann::ordered_json binding = {};
for (const auto& column : columns) {
auto optionalStringAndType = idToStringAndType(
qet.getQec()->getIndex(), idTable(i, column->columnIndex_),
result->localVocab());
auto optionalStringAndType =
idToStringAndType(qet.getQec()->getIndex(),
idTable(i, column->columnIndex_), localVocab);
if (optionalStringAndType.has_value()) [[likely]] {
const auto& [stringValue, xsdType] = optionalStringAndType.value();
binding[column->variable_] =
Expand All @@ -627,12 +634,12 @@ ad_utility::streams::stream_generator ExportQueryExecutionTrees::
};

bool isFirstRow = true;
for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) {
for (const auto& [pair, range] : getRowIndices(limitAndOffset, *result)) {
for (uint64_t i : range) {
if (!isFirstRow) [[likely]] {
co_yield ",";
}
co_yield getBinding(idTable, i);
co_yield getBinding(pair.idTable_, i, pair.localVocab_);
cancellationHandle->throwIfCancelled();
isFirstRow = false;
}
Expand Down
12 changes: 10 additions & 2 deletions src/engine/ExportQueryExecutionTrees.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,23 @@ class ExportQueryExecutionTrees {
const parsedQuery::SelectClause& selectClause,
LimitOffsetClause limitAndOffset, CancellationHandle cancellationHandle);

// Public for testing.
public:
struct TableConstRefWithVocab {
const IdTable& idTable_;
const LocalVocab& localVocab_;
};
// Helper type that contains an `IdTable` and a view with related indices to
// access the `IdTable` with.
struct TableWithRange {
const IdTable& idTable_;
TableConstRefWithVocab tableWithVocab_;
std::ranges::iota_view<uint64_t, uint64_t> view_;
};

private:
// Yield all `IdTables` provided by the given `result`.
static cppcoro::generator<const IdTable&> getIdTables(const Result& result);
static cppcoro::generator<ExportQueryExecutionTrees::TableConstRefWithVocab>
getIdTables(const Result& result);

// Return a range that contains the indices of the rows that have to be
// exported from the `idTable` given the `LimitOffsetClause`. It takes into
Expand Down
44 changes: 26 additions & 18 deletions src/engine/Filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,47 +48,55 @@ ProtoResult Filter::computeResult(bool requestLaziness) {
LOG(DEBUG) << "Filter result computation..." << endl;
checkCancellation();

auto localVocab = subRes->getSharedLocalVocab();
if (subRes->isFullyMaterialized()) {
IdTable result = filterIdTable(subRes, subRes->idTable());
IdTable result = filterIdTable(subRes->sortedBy(), subRes->idTable(),
subRes->localVocab());
LOG(DEBUG) << "Filter result computation done." << endl;

return {std::move(result), resultSortedOn(), std::move(localVocab)};
return {std::move(result), resultSortedOn(), subRes->getSharedLocalVocab()};
}

if (requestLaziness) {
return {[](auto subRes, auto* self) -> cppcoro::generator<IdTable> {
for (IdTable& idTable : subRes->idTables()) {
IdTable result = self->filterIdTable(subRes, idTable);
co_yield result;
return {[](auto subRes, auto* self) -> Result::Generator {
for (auto& [idTable, localVocab] : subRes->idTables()) {
IdTable result = self->filterIdTable(subRes->sortedBy(),
idTable, localVocab);
co_yield {std::move(result), std::move(localVocab)};
}
}(std::move(subRes), this),
resultSortedOn(), std::move(localVocab)};
resultSortedOn()};
}

// If we receive a generator of IdTables, we need to materialize it into a
// single IdTable.
size_t width = getSubtree().get()->getResultWidth();
IdTable result{width, getExecutionContext()->getAllocator()};
ad_utility::callFixedSize(width, [this, &subRes, &result]<int WIDTH>() {
for (IdTable& idTable : subRes->idTables()) {
computeFilterImpl<WIDTH>(result, idTable, subRes->localVocab(),
subRes->sortedBy());
}
});
std::vector<LocalVocab> localVocabs;
ad_utility::callFixedSize(
width, [this, &subRes, &result, &localVocabs]<int WIDTH>() {
for (Result::IdTableVocabPair& pair : subRes->idTables()) {
computeFilterImpl<WIDTH>(result, pair.idTable_, pair.localVocab_,
subRes->sortedBy());
localVocabs.emplace_back(std::move(pair.localVocab_));
}
});

LocalVocab resultLocalVocab{};
resultLocalVocab.mergeWith(localVocabs);

LOG(DEBUG) << "Filter result computation done." << endl;

return {std::move(result), resultSortedOn(), std::move(localVocab)};
return {std::move(result), resultSortedOn(), std::move(resultLocalVocab)};
}

// _____________________________________________________________________________
IdTable Filter::filterIdTable(const std::shared_ptr<const Result>& subRes,
const IdTable& idTable) const {
IdTable Filter::filterIdTable(std::vector<ColumnIndex> sortedBy,
const IdTable& idTable,
const LocalVocab& localVocab) const {
size_t width = idTable.numColumns();
IdTable result{width, getExecutionContext()->getAllocator()};
CALL_FIXED_SIZE(width, &Filter::computeFilterImpl, this, result, idTable,
subRes->localVocab(), subRes->sortedBy());
localVocab, std::move(sortedBy));
return result;
}

Expand Down
5 changes: 3 additions & 2 deletions src/engine/Filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Filter : public Operation {
std::vector<ColumnIndex> sortedBy) const;

// Run `computeFilterImpl` on the provided IdTable
IdTable filterIdTable(const std::shared_ptr<const Result>& subRes,
const IdTable& idTable) const;
IdTable filterIdTable(std::vector<ColumnIndex> sortedBy,
const IdTable& idTable,
const LocalVocab& localVocab) const;
};
Loading
Loading