Skip to content

Commit

Permalink
Add SlotLockManager::lock_all_slots (#156)
Browse files Browse the repository at this point in the history
* Add SlotLockManager::lock_all_slots

* Use slot_less_than (instead of operator <) to compare slot offsets (SlotLockManager).
  • Loading branch information
tonyastolfi authored Jul 16, 2024
1 parent 883f5b6 commit c930fc8
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 58 deletions.
49 changes: 36 additions & 13 deletions src/llfs/slot_lock_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ SlotLockManager::~SlotLockManager() noexcept
{
this->halt();

auto locked = this->state_.lock();
batt::ScopedLock<State> locked{this->state_};

BATT_CHECK(locked->lock_heap_.empty()) << this->debug_info_locked(locked);
}
Expand Down Expand Up @@ -53,7 +53,7 @@ slot_offset_type SlotLockManager::get_lower_bound() const
//
slot_offset_type SlotLockManager::get_upper_bound() const
{
auto locked = this->state_.lock();
batt::ScopedLock<State> locked{this->state_};
return locked->upper_bound_;
}

Expand All @@ -68,7 +68,7 @@ StatusOr<slot_offset_type> SlotLockManager::await_lower_bound(slot_offset_type m
//
void SlotLockManager::update_upper_bound(slot_offset_type offset)
{
auto locked = this->state_.lock();
batt::ScopedLock<State> locked{this->state_};

this->update_upper_bound_locked(locked, /*new_upper_bound=*/offset);
}
Expand All @@ -77,16 +77,39 @@ void SlotLockManager::update_upper_bound(slot_offset_type offset)
//
StatusOr<SlotReadLock> SlotLockManager::lock_slots(const SlotRange& range, const char* holder)
{
(void)holder;
batt::ScopedLock<State> locked{this->state_};

auto locked = this->state_.lock();

if (range.lower_bound < this->lower_bound_.get_value()) {
if (slot_less_than(range.lower_bound, this->lower_bound_.get_value())) {
return Status{
batt::StatusCode::kOutOfRange}; // TODO [tastolfi 2021-10-20] "the requested value
// extends below the current locked slot range"
}

return this->lock_slots_nocheck(locked, range, holder);
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
StatusOr<SlotReadLock> SlotLockManager::lock_all_slots(const char* holder)
{
batt::ScopedLock<State> locked{this->state_};

const llfs::slot_offset_type lower_bound = this->lower_bound_.get_value();

return this->lock_slots_nocheck(locked,
SlotRange{
.lower_bound = lower_bound,
.upper_bound = lower_bound,
},
holder);
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
StatusOr<SlotReadLock> SlotLockManager::lock_slots_nocheck(batt::ScopedLock<State>& locked,
const SlotRange& range,
const char* holder)
{
const usize size_before = locked->lock_heap_.size();

SlotLockHeap::handle_type handle =
Expand All @@ -104,7 +127,7 @@ StatusOr<SlotReadLock> SlotLockManager::lock_slots(const SlotRange& range, const
//
void SlotLockManager::unlock_slots(SlotReadLock* read_lock)
{
auto locked = this->state_.lock();
batt::ScopedLock<State> locked{this->state_};

const usize size_before = locked->lock_heap_.size();
BATT_CHECK_GT(size_before, 0u);
Expand All @@ -128,7 +151,7 @@ StatusOr<SlotReadLock> SlotLockManager::update_lock(SlotReadLock old_lock,
BATT_CHECK_GE(new_range.lower_bound, old_lock.slot_range().lower_bound)
<< "The locked lower bound must increase monotonically" << BATT_INSPECT(holder);

auto locked = this->state_.lock();
batt::ScopedLock<State> locked{this->state_};

const usize size_before = locked->lock_heap_.size();

Expand All @@ -147,15 +170,15 @@ StatusOr<SlotReadLock> SlotLockManager::update_lock(SlotReadLock old_lock,
//
std::function<void(std::ostream&)> SlotLockManager::debug_info()
{
auto locked = this->state_.lock();
batt::ScopedLock<State> locked{this->state_};

return this->debug_info_locked(locked);
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
std::function<void(std::ostream&)> SlotLockManager::debug_info_locked(
batt::Mutex<State>::Lock& locked)
batt::ScopedLock<State>& locked)
{
Optional<SlotLockRecord> top_copy;
if (!locked->lock_heap_.empty()) {
Expand Down Expand Up @@ -187,7 +210,7 @@ std::function<void(std::ostream&)> SlotLockManager::debug_info_locked(

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
void SlotLockManager::update_lower_bound_locked(batt::Mutex<State>::Lock& locked)
void SlotLockManager::update_lower_bound_locked(batt::ScopedLock<State>& locked)
{
if (!locked->lock_heap_.empty()) {
const slot_offset_type trim_pos = get_slot_offset(locked->lock_heap_.top());
Expand All @@ -204,7 +227,7 @@ void SlotLockManager::update_lower_bound_locked(batt::Mutex<State>::Lock& locked

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
void SlotLockManager::update_upper_bound_locked(batt::Mutex<State>::Lock& locked,
void SlotLockManager::update_upper_bound_locked(batt::ScopedLock<State>& locked,
slot_offset_type new_upper_bound)
{
locked->upper_bound_ = slot_max(new_upper_bound, locked->upper_bound_);
Expand Down
105 changes: 60 additions & 45 deletions src/llfs/slot_lock_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,70 +26,80 @@ class SlotLockManager : public SlotReadLock::Sponsor

~SlotLockManager() noexcept;

// Returns true if `halt()` has been called.
//
/** \brief Returns true if `halt()` has been called.
*/
bool is_closed() const;

// Closes all watch objects owned by the lock manager.
//
/** \brief Closes all watch objects owned by the lock manager.
*/
void halt();

// Return the current locked range lower bound.
//
// This value is guaranteed to increase monotonically. A consequence of this invariant is that
// any attempt to lock a slot range that extends below the current locked range will fail.
//
/** \brief Return the current locked range lower bound.
*
* This value is guaranteed to increase monotonically. A consequence of this invariant is that
* any attempt to lock a slot range that extends below the current locked range will fail.
*/
slot_offset_type get_lower_bound() const;

// Return the current locked range upper bound.
//
/** \brief Return the current locked range upper bound.
*/
slot_offset_type get_upper_bound() const;

// Convenience wrapper: SlotRange{this->get_lower_bound(), this->get_upper_bound()}.
//
/** \brief Convenience wrapper: SlotRange{this->get_lower_bound(), this->get_upper_bound()}.
*/
SlotRange get_locked_range() const
{
return SlotRange{this->get_lower_bound(), this->get_upper_bound()};
}

// Blocks the current task until the locked lower bound is at least `min_offset`. This may happen
// either due to updating the upper bound or because a lock is released (see below for details).
//
/** \brief Blocks the current task until the locked lower bound is at least `min_offset`. This
* may happen either due to updating the upper bound or because a lock is released (see below for
* details).
*/
StatusOr<slot_offset_type> await_lower_bound(slot_offset_type min_offset);

// Updates the locked upper bound to be the greater of `offset` and its current value.
//
// If there are no active locks, this has the side-effect of also updating the locked lower bound
// (to maintain the invariant that the locked range is empty when no locks are held).
//
// The upper bound is guranteed to increase monotonically.
//
/** \brief Updates the locked upper bound to be the greater of `offset` and its current value.
*
* If there are no active locks, this has the side-effect of also updating the locked lower bound
* (to maintain the invariant that the locked range is empty when no locks are held).
*
* The upper bound is guranteed to increase monotonically.
*/
void update_upper_bound(slot_offset_type offset);

// Acquire a lock on the given slot range, if it does not extend below the current locked range.
//
// If `range.lower_bound` is lower than the current locked lower bound, the operation fails and an
// error Status is returned.
//
// If `range.upper_bound` is greater than the current locked upper bound, then the locked upper
// bound is set to `range.upper_bound`.
//
// When the last moved copy of the returned SlotReadLock is destroyed, the lock is released and
// the locked interval is shrunk.
//
/** \brief Acquire a lock on the given slot range, if it does not extend below the current locked
* range.
*
* If `range.lower_bound` is lower than the current locked lower bound, the operation fails and an
* error Status is returned.
*
* If `range.upper_bound` is greater than the current locked upper bound, then the locked upper
* bound is set to `range.upper_bound`.
*
* When the last moved copy of the returned SlotReadLock is destroyed, the lock is released and
* the locked interval is shrunk.
*/
StatusOr<SlotReadLock> lock_slots(const SlotRange& range, const char* holder);

// Efficiently updates an existing lock by changing the range to a greater one.
//
/** \brief Atomically reads the slot lower bound and acquires/returns a lock at that
* offset.
*
* This function creates a lock whose lower and upper bounds are equal; thus neither bound (in the
* manager) is modified by acquiring and this lock.
*/
StatusOr<SlotReadLock> lock_all_slots(const char* holder);

/** \brief Efficiently updates an existing lock by changing the range to a greater one.
*/
StatusOr<SlotReadLock> update_lock(SlotReadLock old_lock, const SlotRange& new_range,
const char* holder);

// For debugging
//
/** \brief For debugging.
*/
std::function<void(std::ostream&)> debug_info();

// Clone a lock.
//
/** \brief Clone a lock.
*/
SlotReadLock clone_lock(const SlotReadLock* lock) override;

private:
Expand All @@ -105,16 +115,21 @@ class SlotLockManager : public SlotReadLock::Sponsor
//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
};

std::function<void(std::ostream&)> debug_info_locked(batt::Mutex<State>::Lock& locked);
/** \brief Implements the core logic of acquiring a SlotReadLock. Does not check to make sure the
* range is valid; the caller must make sure it is or behavior is undefined!
*/
StatusOr<SlotReadLock> lock_slots_nocheck(batt::ScopedLock<State>& locked, const SlotRange& range,
const char* holder);

std::function<void(std::ostream&)> debug_info_locked(batt::ScopedLock<State>& locked);

void unlock_slots(SlotReadLock*) override;

void update_lower_bound_locked(batt::Mutex<State>::Lock& locked);
void update_lower_bound_locked(batt::ScopedLock<State>& locked);

void update_upper_bound_locked(batt::Mutex<State>::Lock& locked,
slot_offset_type new_upper_bound);
void update_upper_bound_locked(batt::ScopedLock<State>& locked, slot_offset_type new_upper_bound);

batt::Mutex<State> state_;
mutable batt::Mutex<State> state_;
batt::Watch<slot_offset_type> lower_bound_{0};
};

Expand Down

0 comments on commit c930fc8

Please sign in to comment.