Skip to content

Commit

Permalink
Add basic vocab size tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinTF committed Nov 30, 2024
1 parent 57cef0f commit 7865102
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 44 deletions.
48 changes: 43 additions & 5 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,40 @@

#include "engine/Operation.h"

#include <absl/cleanup/cleanup.h>

#include "engine/QueryExecutionTree.h"
#include "util/OnDestructionDontThrowDuringStackUnwinding.h"
#include "util/TransparentFunctors.h"

using namespace std::chrono_literals;

namespace {
// Keep track of some statistics about the local vocabs of the results.
struct LocalVocabTracking {
size_t maxSize_ = 0;
size_t sizeSum_ = 0;
size_t totalVocabs_ = 0;
size_t nonEmptyVocabs_ = 0;

float avgSize() const {
return nonEmptyVocabs_ == 0
? 0
: static_cast<float>(sizeSum_) / nonEmptyVocabs_;
}
};

// Merge the stats of a single local vocab into the overall stats.
void mergeStats(LocalVocabTracking& stats, const LocalVocab& vocab) {
stats.maxSize_ = std::max(stats.maxSize_, vocab.size());
stats.sizeSum_ += vocab.size();
++stats.totalVocabs_;
if (vocab.size() > 0) {
++stats.nonEmptyVocabs_;
}
}
} // namespace

template <typename F>
void Operation::forAllDescendants(F f) {
static_assert(
Expand Down Expand Up @@ -120,23 +148,33 @@ ProtoResult Operation::runComputation(const ad_utility::Timer& timer,
// correctly because the result was computed, so we can pass `nullopt` as
// the last argument.
if (result.isFullyMaterialized()) {
size_t numLocalVocabs = result.localVocab().numSets();
if (numLocalVocabs > 1) {
runtimeInfo().addDetail("num-local-vocabs", numLocalVocabs);
size_t vocabSize = result.localVocab().size();
if (vocabSize > 1) {
runtimeInfo().addDetail("local-vocab-size", vocabSize);
}
updateRuntimeInformationOnSuccess(result.idTable().size(),
ad_utility::CacheStatus::computed,
timer.msecs(), std::nullopt);
} else {
runtimeInfo().status_ = RuntimeInformation::lazilyMaterialized;
result.runOnNewChunkComputed(
[this, timeSizeUpdate = 0us](
const IdTable& idTable,
[this, timeSizeUpdate = 0us, vocabStats = LocalVocabTracking{}](
const Result::IdTableVocabPair& pair,
std::chrono::microseconds duration) mutable {
const IdTable& idTable = pair.idTable_;
updateRuntimeStats(false, idTable.numRows(), idTable.numColumns(),
duration);
LOG(DEBUG) << "Computed partial chunk of size " << idTable.numRows()
<< " x " << idTable.numColumns() << std::endl;
mergeStats(vocabStats, pair.localVocab_);
if (vocabStats.sizeSum_ > 0) {
runtimeInfo().addDetail(
"non-empty-local-vocabs",
absl::StrCat(vocabStats.nonEmptyVocabs_, " / ",
vocabStats.totalVocabs_,
", Ø = ", vocabStats.avgSize(),
", max = ", vocabStats.maxSize_));
}
timeSizeUpdate += duration;
if (timeSizeUpdate > 50ms) {
timeSizeUpdate = 0us;
Expand Down
4 changes: 2 additions & 2 deletions src/engine/Result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ void Result::checkDefinedness(const VariableToColumnMap& varColMap) {

// _____________________________________________________________________________
void Result::runOnNewChunkComputed(
std::function<void(const IdTable&, std::chrono::microseconds)> onNewChunk,
std::function<void(const IdTableVocabPair&, std::chrono::microseconds)> onNewChunk,
std::function<void(bool)> onGeneratorFinished) {
AD_CONTRACT_CHECK(!isFullyMaterialized());
auto generator = [](Generator original, auto onNewChunk,
Expand All @@ -220,7 +220,7 @@ void Result::runOnNewChunkComputed(
try {
ad_utility::timer::Timer timer{ad_utility::timer::Timer::Started};
for (IdTableVocabPair& pair : original) {
onNewChunk(pair.idTable_, timer.value());
onNewChunk(pair, timer.value());
co_yield pair;
timer.start();
}
Expand Down
9 changes: 5 additions & 4 deletions src/engine/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ class Result {
Result& operator=(Result&& other) = default;

// Wrap the generator stored in `data_` within a new generator that calls
// `onNewChunk` every time a new `IdTable` is yielded by the original
// generator and passed this new `IdTable` along with microsecond precision
// timing information on how long it took to compute this new chunk.
// `onNewChunk` every time a new `IdTableVocabPair` is yielded by the original
// generator and passed this new `IdTableVocabPair` along with microsecond
// precision timing information on how long it took to compute this new chunk.
// `onGeneratorFinished` is guaranteed to be called eventually as long as the
// generator is consumed at least partially, with `true` if an exception
// occurred during consumption or with `false` when the generator is done
Expand All @@ -130,7 +130,8 @@ class Result {
// Throw an `ad_utility::Exception` if the underlying `data_` member holds the
// wrong variant.
void runOnNewChunkComputed(
std::function<void(const IdTable&, std::chrono::microseconds)> onNewChunk,
std::function<void(const IdTableVocabPair&, std::chrono::microseconds)>
onNewChunk,
std::function<void(bool)> onGeneratorFinished);

// Wrap the generator stored in `data_` within a new generator that aggregates
Expand Down
14 changes: 13 additions & 1 deletion test/OperationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,12 @@ TEST(Operation, verifyRuntimeInformationIsUpdatedForLazyOperations) {
std::vector<IdTable> idTablesVector{};
idTablesVector.push_back(makeIdTableFromVector({{3, 4}}));
idTablesVector.push_back(makeIdTableFromVector({{7, 8}}));
LocalVocab localVocab{};
localVocab.getIndexAndAddIfNotContained(LocalVocabEntry{
ad_utility::triple_component::Literal::literalWithoutQuotes("Test")});
ValuesForTesting valuesForTesting{
qec, std::move(idTablesVector), {Variable{"?x"}, Variable{"?y"}}};
qec, std::move(idTablesVector), {Variable{"?x"}, Variable{"?y"}},
false, std::vector<ColumnIndex>{}, std::move(localVocab)};

ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started};
EXPECT_THROW(
Expand All @@ -374,14 +378,22 @@ TEST(Operation, verifyRuntimeInformationIsUpdatedForLazyOperations) {
{[&]() {
EXPECT_EQ(rti.status_, Status::lazilyMaterialized);
expectRtiHasDimensions(rti, 2, 1);
ASSERT_TRUE(rti.details_.contains("non-empty-local-vocabs"));
EXPECT_EQ(rti.details_["non-empty-local-vocabs"],
"1 / 1, Ø = 1, max = 1");
},
[&]() {
EXPECT_EQ(rti.status_, Status::lazilyMaterialized);
expectRtiHasDimensions(rti, 2, 2);
ASSERT_TRUE(rti.details_.contains("non-empty-local-vocabs"));
EXPECT_EQ(rti.details_["non-empty-local-vocabs"],
"2 / 2, Ø = 1, max = 1");
}});

EXPECT_EQ(rti.status_, Status::lazilyMaterialized);
expectRtiHasDimensions(rti, 2, 2);
ASSERT_TRUE(rti.details_.contains("non-empty-local-vocabs"));
EXPECT_EQ(rti.details_["non-empty-local-vocabs"], "2 / 2, Ø = 1, max = 1");
}

// _____________________________________________________________________________
Expand Down
72 changes: 42 additions & 30 deletions test/ResultTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ using ::testing::Not;
using ::testing::Values;

namespace {
using IdTableVocabPair = Result::IdTableVocabPair;

// Helper function to generate all possible splits of an IdTable in order to
// exhaustively test generator variants.
std::vector<Result::Generator> getAllSubSplits(const IdTable& idTable) {
Expand All @@ -35,7 +37,7 @@ std::vector<Result::Generator> getAllSubSplits(const IdTable& idTable) {
for (size_t i = 0; i < clone.size(); ++i) {
subSplit.push_back(clone[i]);
if (splitIndex < split.size() && split[splitIndex] == i) {
Result::IdTableVocabPair pair{std::move(subSplit), LocalVocab{}};
IdTableVocabPair pair{std::move(subSplit), LocalVocab{}};
co_yield pair;
// Move back if not moved out to reuse buffer.
subSplit = std::move(pair.idTable_);
Expand All @@ -54,7 +56,7 @@ std::vector<Result::Generator> getAllSubSplits(const IdTable& idTable) {

// _____________________________________________________________________________
void consumeGenerator(Result::Generator& generator) {
for ([[maybe_unused]] Result::IdTableVocabPair& _ : generator) {
for ([[maybe_unused]] IdTableVocabPair& _ : generator) {
}
}
} // namespace
Expand All @@ -69,7 +71,7 @@ TEST(Result, verifyIdTableThrowsWhenActuallyLazy) {
TEST(Result, verifyIdTableThrowsOnSecondAccess) {
const Result result{[]() -> Result::Generator { co_return; }(), {}};
// First access should work
for ([[maybe_unused]] Result::IdTableVocabPair& _ : result.idTables()) {
for ([[maybe_unused]] IdTableVocabPair& _ : result.idTables()) {
ADD_FAILURE() << "Generator is empty";
}
// Now it should throw
Expand Down Expand Up @@ -159,10 +161,10 @@ TEST(Result,
TEST(Result, verifyRunOnNewChunkComputedThrowsWithFullyMaterializedResult) {
Result result{makeIdTableFromVector({{}}), {}, LocalVocab{}};

EXPECT_THROW(
result.runOnNewChunkComputed(
[](const IdTable&, std::chrono::microseconds) {}, [](bool) {}),
ad_utility::Exception);
EXPECT_THROW(result.runOnNewChunkComputed(
[](const IdTableVocabPair&, std::chrono::microseconds) {},
[](bool) {}),
ad_utility::Exception);
}

// _____________________________________________________________________________
Expand All @@ -171,29 +173,38 @@ TEST(Result, verifyRunOnNewChunkComputedFiresCorrectly) {
auto idTable2 = makeIdTableFromVector({{3, 4, 0}});
auto idTable3 = makeIdTableFromVector({{1, 6, 0}, {2, 5, 0}, {3, 4, 0}});

Result result{[](auto& t1, auto& t2, auto& t3) -> Result::Generator {
std::this_thread::sleep_for(1ms);
co_yield {t1.clone(), LocalVocab{}};
std::this_thread::sleep_for(3ms);
co_yield {t2.clone(), LocalVocab{}};
std::this_thread::sleep_for(5ms);
co_yield {t3.clone(), LocalVocab{}};
}(idTable1, idTable2, idTable3),
{}};
Result result{
[](auto& t1, auto& t2, auto& t3) -> Result::Generator {
std::this_thread::sleep_for(1ms);
LocalVocab localVocab{};
localVocab.getIndexAndAddIfNotContained(LocalVocabEntry{
ad_utility::triple_component::Literal::literalWithoutQuotes(
"Test")});
co_yield {t1.clone(), std::move(localVocab)};
std::this_thread::sleep_for(3ms);
co_yield {t2.clone(), LocalVocab{}};
std::this_thread::sleep_for(5ms);
co_yield {t3.clone(), LocalVocab{}};
}(idTable1, idTable2, idTable3),
{}};
uint32_t callCounter = 0;
bool finishedConsuming = false;

result.runOnNewChunkComputed(
[&](const IdTable& idTable, std::chrono::microseconds duration) {
[&](const IdTableVocabPair& pair, std::chrono::microseconds duration) {
const IdTable& idTable = pair.idTable_;
++callCounter;
if (callCounter == 1) {
EXPECT_EQ(idTable1, idTable);
EXPECT_EQ(pair.localVocab_.size(), 1);
EXPECT_GE(duration, 1ms);
} else if (callCounter == 2) {
EXPECT_EQ(idTable2, idTable);
EXPECT_EQ(pair.localVocab_.size(), 0);
EXPECT_GE(duration, 3ms);
} else if (callCounter == 3) {
EXPECT_EQ(idTable3, idTable);
EXPECT_EQ(pair.localVocab_.size(), 0);
EXPECT_GE(duration, 5ms);
}
},
Expand All @@ -220,7 +231,7 @@ TEST(Result, verifyRunOnNewChunkCallsFinishOnError) {
uint32_t callCounterFinished = 0;

result.runOnNewChunkComputed(
[&](const IdTable&, std::chrono::microseconds) {
[&](const IdTableVocabPair&, std::chrono::microseconds) {
++callCounterGenerator;
},
[&](bool error) {
Expand Down Expand Up @@ -248,7 +259,7 @@ TEST(Result, verifyRunOnNewChunkCallsFinishOnPartialConsumption) {
{}};

result.runOnNewChunkComputed(
[&](const IdTable&, std::chrono::microseconds) {
[&](const IdTableVocabPair&, std::chrono::microseconds) {
++callCounterGenerator;
},
[&](bool error) {
Expand All @@ -267,8 +278,8 @@ TEST(Result, verifyRunOnNewChunkCallsFinishOnPartialConsumption) {
TEST(Result, verifyCacheDuringConsumptionThrowsWhenFullyMaterialized) {
Result result{makeIdTableFromVector({{}}), {}, LocalVocab{}};
EXPECT_THROW(result.cacheDuringConsumption(
[](const std::optional<Result::IdTableVocabPair>&,
const Result::IdTableVocabPair&) { return true; },
[](const std::optional<IdTableVocabPair>&,
const IdTableVocabPair&) { return true; },
[](Result) {}),
ad_utility::Exception);
}
Expand All @@ -281,9 +292,8 @@ TEST(Result, verifyCacheDuringConsumptionRespectsPassedParameters) {
for (auto& generator : getAllSubSplits(idTable)) {
Result result{std::move(generator), {0}};
result.cacheDuringConsumption(
[predictedSize = 0](
const std::optional<Result::IdTableVocabPair>& aggregator,
const Result::IdTableVocabPair& newTable) mutable {
[predictedSize = 0](const std::optional<IdTableVocabPair>& aggregator,
const IdTableVocabPair& newTable) mutable {
if (aggregator.has_value()) {
EXPECT_EQ(aggregator.value().idTable_.numColumns(), predictedSize);
} else {
Expand All @@ -304,8 +314,8 @@ TEST(Result, verifyCacheDuringConsumptionRespectsPassedParameters) {
uint32_t callCounter = 0;
Result result{std::move(generator), {}};
result.cacheDuringConsumption(
[&](const std::optional<Result::IdTableVocabPair>& aggregator,
const Result::IdTableVocabPair&) {
[&](const std::optional<IdTableVocabPair>& aggregator,
const IdTableVocabPair&) {
EXPECT_FALSE(aggregator.has_value());
++callCounter;
return false;
Expand All @@ -329,8 +339,9 @@ TEST(Result, cacheDuringConsumptionAbortsValueWhenRunningIntoMemoryLimit) {
}(flag),
{0}};
result.cacheDuringConsumption(
[](const std::optional<Result::IdTableVocabPair>&,
const Result::IdTableVocabPair&) { return true; },
[](const std::optional<IdTableVocabPair>&, const IdTableVocabPair&) {
return true;
},
[&](Result) { ADD_FAILURE() << "The result should not get cached."; });
consumeGenerator(result.idTables());
EXPECT_TRUE(flag);
Expand All @@ -351,8 +362,9 @@ TEST(
}(flag),
{0}};
result.cacheDuringConsumption(
[](const std::optional<Result::IdTableVocabPair>&,
const Result::IdTableVocabPair&) { return true; },
[](const std::optional<IdTableVocabPair>&, const IdTableVocabPair&) {
return true;
},
[&](Result) { ADD_FAILURE() << "The result should not get cached."; });
consumeGenerator(result.idTables());
EXPECT_TRUE(flag);
Expand Down
5 changes: 3 additions & 2 deletions test/engine/ValuesForTesting.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class ValuesForTesting : public Operation {
std::vector<IdTable> tables,
std::vector<std::optional<Variable>> variables,
bool unlikelyToFitInCache = false,
std::vector<ColumnIndex> sortedColumns = {})
std::vector<ColumnIndex> sortedColumns = {},
LocalVocab localVocab = LocalVocab{})
: Operation{ctx},
tables_{std::move(tables)},
variables_{std::move(variables)},
Expand All @@ -60,7 +61,7 @@ class ValuesForTesting : public Operation {
costEstimate_{0},
unlikelyToFitInCache_{unlikelyToFitInCache},
resultSortedColumns_{std::move(sortedColumns)},
localVocab_{LocalVocab{}},
localVocab_{std::move(localVocab)},
multiplicity_{std::nullopt} {
AD_CONTRACT_CHECK(
std::ranges::all_of(tables_, [this](const IdTable& table) {
Expand Down

0 comments on commit 7865102

Please sign in to comment.