Skip to content

Commit

Permalink
Add metrics to IoRingLogDevice2; add burst mode optimization. (#157)
Browse files Browse the repository at this point in the history
* Add metrics to IoRingLogDevice2; add burst mode optimization.

* fix formatting.

* Add more metrics to IoRingLogDevice2 and verify them in the sim test.
  • Loading branch information
tonyastolfi authored Jul 29, 2024
1 parent 8d4ca66 commit 758d7ee
Show file tree
Hide file tree
Showing 10 changed files with 402 additions and 55 deletions.
7 changes: 6 additions & 1 deletion src/llfs/basic_log_storage_driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ class BasicLogStorageDriver
//
//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -

Impl& impl()
Impl& impl() noexcept
{
return this->impl_;
}

const Impl& impl() const noexcept
{
return this->impl_;
}
Expand Down
7 changes: 6 additions & 1 deletion src/llfs/basic_ring_buffer_log_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ class BasicRingBufferLogDevice

Status sync(LogReadMode mode, SlotUpperBoundAt event) override;

driver_type& driver()
driver_type& driver() noexcept
{
return this->driver_;
}

const driver_type& driver() const noexcept
{
return this->driver_;
}
Expand Down
10 changes: 5 additions & 5 deletions src/llfs/ioring_log_device.test.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ inline void run_log_device_benchmark(
const auto flushed_range = log_device.slot_range(LogReadMode::kDurable);
const auto committed_range = log_device.slot_range(LogReadMode::kSpeculative);

LLFS_LOG_INFO() << BATT_INSPECT(flushed_range) << BATT_INSPECT(flushed_range.size())
<< BATT_INSPECT(committed_range) << BATT_INSPECT(committed_range.size())
<< BATT_INSPECT(last_iteration);
LLFS_VLOG(1) << BATT_INSPECT(flushed_range) << BATT_INSPECT(flushed_range.size())
<< BATT_INSPECT(committed_range) << BATT_INSPECT(committed_range.size())
<< BATT_INSPECT(last_iteration);

log_device.halt();
}};
Expand All @@ -121,14 +121,14 @@ inline void run_log_device_benchmark(
llfs::SlotUpperBoundAt{durable.lower_bound + trim_trigger});

if (!sync_status.ok()) {
LLFS_LOG_INFO() << BATT_INSPECT(durable.lower_bound);
LLFS_VLOG(1) << BATT_INSPECT(durable.lower_bound);
break;
}

llfs::Status trim_status = log_device.trim(durable.lower_bound + aligned_trim_size);

if (!trim_status.ok()) {
LLFS_LOG_INFO() << BATT_INSPECT(durable.lower_bound);
LLFS_VLOG(1) << BATT_INSPECT(durable.lower_bound);
break;
}

Expand Down
26 changes: 21 additions & 5 deletions src/llfs/ioring_log_device2.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//
#include <llfs/basic_ring_buffer_log_device.hpp>
#include <llfs/ioring_log_config2.hpp>
#include <llfs/ioring_log_device2_metrics.hpp>
#include <llfs/ioring_log_device_storage.hpp>
#include <llfs/log_device.hpp>
#include <llfs/log_device_runtime_options.hpp>
Expand Down Expand Up @@ -47,6 +48,9 @@ class IoRingLogDriver2
using Self = IoRingLogDriver2;
using AlignedUnit = std::aligned_storage_t<kLogAtomicWriteSize, kLogAtomicWriteSize>;
using EventLoopTask = typename StorageT::EventLoopTask;
using Metrics = IoRingLogDevice2Metrics;

//+++++++++++-+-+--+----- --- -- - - - -

static constexpr batt::StaticType<TargetTrimPos> kTargetTrimPos{};
static constexpr batt::StaticType<CommitPos> kCommitPos{};
Expand All @@ -70,6 +74,8 @@ class IoRingLogDriver2
StorageT&& storage //
) noexcept;

~IoRingLogDriver2() noexcept;

//----

Status set_trim_pos(slot_offset_type trim_pos)
Expand Down Expand Up @@ -130,6 +136,11 @@ class IoRingLogDriver2

//+++++++++++-+-+--+----- --- -- - - - -

const Metrics& metrics() const noexcept
{
return this->metrics_;
}

private:
//+++++++++++-+-+--+----- --- -- - - - -

Expand Down Expand Up @@ -199,7 +210,7 @@ class IoRingLogDriver2
* two writes may be initiated by this function, provided the conditions above are still met after
* starting the first write.
*/
void start_flush(CommitPos observed_commit_pos);
void start_flush(const CommitPos observed_commit_pos);

/** \brief Returns the passed slot range with the lower and upper bounds aligned to the nearest
* data page boundary.
Expand Down Expand Up @@ -282,6 +293,10 @@ class IoRingLogDriver2

//+++++++++++-+-+--+----- --- -- - - - -

/** \brief Diagnostic metrics for this object.
*/
Metrics metrics_;

/** \brief The context passed in at construction time; provides access to the ring buffer.
*/
LogStorageDriverContext& context_;
Expand Down Expand Up @@ -374,10 +389,6 @@ class IoRingLogDriver2
*/
usize writes_pending_ = 0;

/** \brief The maximum observed value of this->writes_pending_ (high water mark).
*/
usize writes_max_ = 0;

/** \brief Buffer containing the control block structure.
*/
std::unique_ptr<AlignedUnit[]> control_block_memory_;
Expand Down Expand Up @@ -448,6 +459,11 @@ class BasicIoRingLogDevice2
config, options, std::move(storage)}
{
}

const IoRingLogDevice2Metrics& metrics() const noexcept
{
return this->driver().impl().metrics();
}
};

/** \brief A fast, durable LogDevice implementation.
Expand Down
89 changes: 78 additions & 11 deletions src/llfs/ioring_log_device2.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <llfs/data_reader.hpp>
#include <llfs/slot_writer.hpp>

#include <batteries/metrics/metric_registry.hpp>

namespace llfs {

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand All @@ -34,6 +36,37 @@ inline /*explicit*/ IoRingLogDriver2<StorageT>::IoRingLogDriver2(
{
BATT_CHECK_GE(this->config_.data_alignment_log2, this->config_.device_page_size_log2);
BATT_CHECK_GE(this->data_page_size_, sizeof(PackedLogControlBlock2));

using batt::Token;

// Register all metrics.
//
MetricLabelSet labels{
MetricLabel{Token{"object_type"}, Token{"llfs_IoRingLogDriver2"}},
MetricLabel{Token{"log_name"}, Token{this->options_.name}},
};

this->metrics_.export_to(global_metric_registry(), labels);

global_metric_registry() //
.add("trim_target_pos", this->observed_watch_[Self::kTargetTrimPos], batt::make_copy(labels))
.add("commit_pos", this->observed_watch_[Self::kCommitPos], batt::make_copy(labels))
.add("trim_pos", this->trim_pos_, batt::make_copy(labels))
.add("flush_pos", this->flush_pos_, batt::make_copy(labels));
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
template <typename StorageT>
inline IoRingLogDriver2<StorageT>::~IoRingLogDriver2() noexcept
{
this->metrics_.unexport_from(global_metric_registry());

global_metric_registry() //
.remove(this->observed_watch_[Self::kTargetTrimPos])
.remove(this->observed_watch_[Self::kCommitPos])
.remove(this->trim_pos_)
.remove(this->flush_pos_);
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand Down Expand Up @@ -292,7 +325,7 @@ inline void IoRingLogDriver2<StorageT>::wait_for_slot_offset_change(T observed_v
//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
template <typename StorageT>
inline void IoRingLogDriver2<StorageT>::start_flush(CommitPos observed_commit_pos)
inline void IoRingLogDriver2<StorageT>::start_flush(const CommitPos observed_commit_pos)
{
// Unflushed data comes after flushed.
//
Expand Down Expand Up @@ -347,23 +380,43 @@ inline void IoRingLogDriver2<StorageT>::start_flush(CommitPos observed_commit_po
<< BATT_INSPECT(physical_lower_bound) << BATT_INSPECT(physical_upper_bound);

slot_range.upper_bound = new_upper_bound;
this->metrics_.flush_write_split_wrap_count.add(1);
}
}

// If burst mode optimization is enabled, then round slot_range.upper_bound *down* to get the
// aligned upper bound (instead of up, the default); this way, we are less likely to need to
// issue another I/O to fill in partial data in flush_tail_.
//
if (this->flush_tail_ && this->options_.optimize_burst_mode) {
this->metrics_.burst_mode_checked.add(1);

const usize new_upper_bound = slot_max(
slot_range.lower_bound,
batt::round_down_bits(this->config_.data_alignment_log2, slot_range.upper_bound));

if (slot_range.upper_bound != new_upper_bound) {
this->metrics_.burst_mode_applied.add(1);
}

slot_range.upper_bound = new_upper_bound;
}

// Align to 512-byte boundaries for direct I/O
//
SlotRange aligned_range = this->get_aligned_range(slot_range);

// If this flush would overlap with an ongoing one (at the last device page) then trim the
// aligned_range so it doesn't.
// aligned_range (on the lower end) so it doesn't.
//
if (this->flush_tail_) {
if (slot_less_than(aligned_range.lower_bound, this->flush_tail_->upper_bound)) {
aligned_range.lower_bound = this->flush_tail_->upper_bound;
if (aligned_range.empty()) {
flush_upper_bound = this->flush_tail_->upper_bound;
continue;
}
if (this->flush_tail_ &&
slot_less_than(aligned_range.lower_bound, this->flush_tail_->upper_bound)) {
aligned_range.lower_bound = this->flush_tail_->upper_bound;
this->metrics_.flush_write_tail_collision_count.add(1);

if (aligned_range.empty()) {
flush_upper_bound = this->flush_tail_->upper_bound;
continue;
}
}

Expand Down Expand Up @@ -446,10 +499,13 @@ inline void IoRingLogDriver2<StorageT>::start_flush_write(const SlotRange& slot_
<< write_offset + buffer.size() << ", size=" << buffer.size() << ")";

++this->writes_pending_;
this->writes_max_ = std::max(this->writes_max_, this->writes_pending_);
this->metrics_.max_concurrent_writes.clamp_min(this->writes_pending_);

BATT_CHECK_LE(this->writes_pending_, this->options_.max_concurrent_writes);

this->metrics_.total_write_count.add(1);
this->metrics_.flush_write_count.add(1);

this->storage_.async_write_some(write_offset, buffer,
this->make_write_handler([this, slot_range, aligned_range](
Self* this_, StatusOr<i32> result) {
Expand Down Expand Up @@ -502,6 +558,9 @@ inline void IoRingLogDriver2<StorageT>::handle_flush_write(const SlotRange& slot
};
BATT_CHECK(!flushed_range.empty());

this->metrics_.bytes_written.add(bytes_written);
this->metrics_.bytes_flushed.add(flushed_range.size());

LLFS_DVLOG(1) << BATT_INSPECT(flushed_range);

// Update this->known_flush_pos_ and this->known_flushed_commit_pos_ to reflect the write.
Expand All @@ -528,6 +587,7 @@ inline void IoRingLogDriver2<StorageT>::handle_flush_write(const SlotRange& slot
};

if (!unflushed_remainder.empty()) {
this->metrics_.flush_tail_rewrite_count.add(1);
this->start_flush_write(unflushed_remainder, this->get_aligned_range(unflushed_remainder));
}
}
Expand Down Expand Up @@ -625,6 +685,9 @@ void IoRingLogDriver2<StorageT>::start_control_block_update(

this->writing_control_block_ = true;

this->metrics_.total_write_count.add(1);
this->metrics_.control_block_write_count.add(1);

this->storage_.async_write_some_fixed(
this->config_.control_block_offset, this->control_block_buffer_, /*buf_index=*/0,
this->make_write_handler([](Self* this_, StatusOr<i32> result) {
Expand All @@ -648,7 +711,11 @@ void IoRingLogDriver2<StorageT>::handle_control_block_update(StatusOr<i32> resul
return;
}

if (BATT_CHECKED_CAST(usize, *result) != this->control_block_buffer_.size()) {
const usize bytes_written = BATT_CHECKED_CAST(usize, *result);

this->metrics_.bytes_written.add(bytes_written);

if (bytes_written != this->control_block_buffer_.size()) {
LLFS_LOG_ERROR() << "Failed to write entire log control block!";
this->context_.update_error_status(batt::StatusCode::kInternal);
return;
Expand Down
Loading

0 comments on commit 758d7ee

Please sign in to comment.