Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation for async_rw_mutex #1356

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/Doxyfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ INPUT = "$(PIKA_DOCS_DOXYGEN_INPUT_ROOT)/libs/pika/async_cuda"
"$(PIKA_DOCS_DOXYGEN_INPUT_ROOT)/libs/pika/async_cuda_base" \
"$(PIKA_DOCS_DOXYGEN_INPUT_ROOT)/libs/pika/init_runtime" \
"$(PIKA_DOCS_DOXYGEN_INPUT_ROOT)/libs/pika/runtime" \
"$(PIKA_DOCS_DOXYGEN_INPUT_ROOT)/libs/pika/execution"
"$(PIKA_DOCS_DOXYGEN_INPUT_ROOT)/libs/pika/execution" \
"$(PIKA_DOCS_DOXYGEN_INPUT_ROOT)/libs/pika/synchronization"
FILE_PATTERNS = *.cpp *.hpp *.cu
RECURSIVE = YES
EXCLUDE_PATTERNS = */test */detail
Expand Down
27 changes: 26 additions & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ headers are internal implementation details.

These headers are part of the public API, but are currently undocumented.

- ``pika/async_rw_mutex.hpp``
- ``pika/barrier.hpp``
- ``pika/condition_variable.hpp``
- ``pika/latch.hpp``
Expand Down Expand Up @@ -138,6 +137,32 @@ All sender adaptors are `customization point objects (CPOs)
:language: c++
:start-at: #include

.. _header_pika_async_rw_mutex:

Asynchronous read-write mutex (``pika/async_rw_mutex.hpp``)
===========================================================

This header provides access to a sender-based asynchronous mutex, allowing both shared and exclusive
access to a wrapped value. The functionality is in the namespace ``pika::execution::experimental``.

Unlike typical mutexes, this one provides access exactly in the order that it is requested in
synchronous code. This allows writing algorithms that mostly look like synchronous code, but can run
asynchronously. This mutex is used extensively in `DLA-Future
<https://github.com/eth-cscs/DLA-Future>`__, where it forms the basis for asynchronous access to
blocks of distributed matrices.

.. doxygenclass:: pika::execution::experimental::async_rw_mutex
.. doxygenenum:: pika::execution::experimental::async_rw_mutex_access_type
.. doxygenclass:: pika::execution::experimental::async_rw_mutex_access_wrapper
.. doxygenclass:: pika::execution::experimental::async_rw_mutex_access_wrapper< ReadWriteT, ReadT, async_rw_mutex_access_type::readwrite >
.. doxygenclass:: pika::execution::experimental::async_rw_mutex_access_wrapper< ReadWriteT, ReadT, async_rw_mutex_access_type::read >
.. doxygenclass:: pika::execution::experimental::async_rw_mutex_access_wrapper< void, void, async_rw_mutex_access_type::read >
.. doxygenclass:: pika::execution::experimental::async_rw_mutex_access_wrapper< void, void, async_rw_mutex_access_type::readwrite >

.. literalinclude:: ../examples/documentation/async_rw_mutex_documentation.cpp
:language: c++
:start-at: #include

.. _header_pika_cuda:

CUDA/HIP support (``pika/cuda.hpp``)
Expand Down
1 change: 1 addition & 0 deletions examples/documentation/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

set(example_programs
async_rw_mutex_documentation
drop_operation_state_documentation
drop_value_documentation
hello_world_documentation
Expand Down
85 changes: 85 additions & 0 deletions examples/documentation/async_rw_mutex_documentation.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2024 ETH Zurich
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <pika/async_rw_mutex.hpp>
#include <pika/execution.hpp>
#include <pika/init.hpp>

#include <fmt/printf.h>

#include <type_traits>
#include <utility>

int main(int argc, char* argv[])
{
namespace ex = pika::execution::experimental;
namespace tt = pika::this_thread::experimental;

pika::start(argc, argv);
ex::thread_pool_scheduler sched{};

{
// Below we will access the value proteced by the mutex with the
// following implied dependency graph:
//
// /--> ro_access1 --\
// rw_access1 +---> ro_access2 ---+---> rw_access2
// \--> ro_access3 --/
//
// Note that the senders themselves don't depend on each other
// explicitly as above, but the senders provided by the mutex enforce
// the given order.
ex::async_rw_mutex<int> m{0};

// This read-write access is guaranteed to not run concurrently with any
// other accesses. It will also run first since we requested the sender
// first from the mutex.
auto rw_access1 =
m.readwrite() | ex::continues_on(sched) | ex::then([](auto w) {
w.get() = 13;
fmt::print("updated value to {}\n", w.get());
});

// These read-only accesses can only read the value, but they can run
// concurrently. They'll see the write from the access above.
auto ro_access1 =
m.read() | ex::continues_on(sched) | ex::then([](auto w) {

Check notice on line 49 in examples/documentation/async_rw_mutex_documentation.cpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

examples/documentation/async_rw_mutex_documentation.cpp#L49

Check buffer boundaries if used in a loop including recursive loops (CWE-120, CWE-20).
static_assert(std::is_const_v<
std::remove_reference_t<decltype(w.get())>>);
fmt::print("value is now {}\n", w.get());
});
auto ro_access2 =
m.read() | ex::continues_on(sched) | ex::then([](auto w) {
static_assert(std::is_const_v<
std::remove_reference_t<decltype(w.get())>>);
fmt::print("value is {} here as well\n", w.get());
});
auto ro_access3 =
m.read() | ex::continues_on(sched) | ex::then([](auto w) {
static_assert(std::is_const_v<
std::remove_reference_t<decltype(w.get())>>);
fmt::print("and {} here too\n", w.get());
});

// This read-write access will run once all the above read-only accesses
// are done.
auto rw_access2 =
m.readwrite() | ex::continues_on(sched) | ex::then([](auto w) {
w.get() = 42;
fmt::print("value is {} at the end\n", w.get());
});

// Start and wait for all the work to finish.
tt::sync_wait(ex::when_all(std::move(rw_access1), std::move(ro_access1),
std::move(ro_access2), std::move(ro_access3),
std::move(rw_access2)));
Comment on lines +75 to +78
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: in principle, given that DAG, can we sync_wait just the last readwrite access?

My doubt is this one. They do not depend on each other as sender, but the mutex implicitly creates a dependency. This "difference" (if it is a difference, or is it just conceptual?) requires that we start each sender independently, or we can just sync_wait the last one and all the chain is started as if they were actually dependent?

}

pika::finalize();
pika::stop();

return 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
#include <utility>

namespace pika::execution::experimental {
/// The type of access provided by async_rw_mutex.
/// \brief The type of access provided by async_rw_mutex.
enum class async_rw_mutex_access_type
{
/// \brief Read-only access.
read,
/// \brief Read-write access.
readwrite
};

Expand Down Expand Up @@ -149,19 +151,23 @@ namespace pika::execution::experimental {
};
} // namespace detail

/// A wrapper for values sent by senders from async_rw_mutex.
/// \brief A wrapper for values sent by senders from \ref async_rw_mutex.
///
/// All values sent by async_rw_mutex::read and async_rw_mutex::readwrite
/// are wrapped by this class. It acts as a lock on the wrapped object and
/// manages the lifetime of it. The wrapper has reference semantics. When
/// the access type is readwrite the wrapper is only movable. When the last
/// copy of a wrapper is released the next access through the async_rw_mutex
/// (if any) will be triggered.
/// All values sent by senders accessed through \ref async_rw_mutex are wrapped by this class.
/// The wrapper has reference semantics to the wrapped object, and controls when subsequent
/// accesses is given. When the destructor of the last or only wrapper runs, senders for
/// subsequent accesses will signal their value channel.
///
/// When the access type is \ref async_rw_mutex_access_type::readwrite the wrapper is move-only.
/// When the access type is \ref async_rw_mutex_access_type::read the wrapper is copyable.
template <typename ReadWriteT, typename ReadT, async_rw_mutex_access_type AccessType>
struct async_rw_mutex_access_wrapper;
class async_rw_mutex_access_wrapper;

/// \brief A wrapper for values sent by senders from \ref async_rw_mutex with read-only access.
///
/// The wrapper is copyable.
template <typename ReadWriteT, typename ReadT>
struct async_rw_mutex_access_wrapper<ReadWriteT, ReadT, async_rw_mutex_access_type::read>
class async_rw_mutex_access_wrapper<ReadWriteT, ReadT, async_rw_mutex_access_type::read>
{
private:
using shared_state_type = std::shared_ptr<detail::async_rw_mutex_shared_state<ReadWriteT>>;
Expand All @@ -178,15 +184,19 @@ namespace pika::execution::experimental {
async_rw_mutex_access_wrapper(async_rw_mutex_access_wrapper const&) = default;
async_rw_mutex_access_wrapper& operator=(async_rw_mutex_access_wrapper const&) = default;

/// \brief Access the wrapped type by const reference.
ReadT& get() const
{
PIKA_ASSERT(state);
return state->get_value();
}
};

/// \brief A wrapper for values sent by senders from \ref async_rw_mutex with read-write access.
///
/// The wrapper is move-only.
template <typename ReadWriteT, typename ReadT>
struct async_rw_mutex_access_wrapper<ReadWriteT, ReadT, async_rw_mutex_access_type::readwrite>
class async_rw_mutex_access_wrapper<ReadWriteT, ReadT, async_rw_mutex_access_type::readwrite>
{
private:
static_assert(!std::is_void<ReadWriteT>::value,
Expand All @@ -210,6 +220,7 @@ namespace pika::execution::experimental {
async_rw_mutex_access_wrapper(async_rw_mutex_access_wrapper const&) = delete;
async_rw_mutex_access_wrapper& operator=(async_rw_mutex_access_wrapper const&) = delete;

/// \brief Access the wrapped type by reference.
ReadWriteT& get()
{
PIKA_ASSERT(state);
Expand All @@ -220,8 +231,12 @@ namespace pika::execution::experimental {
// The void wrappers for read and readwrite are identical, but must be
// specialized separately to avoid ambiguity with the non-void
// specializations above.

/// \brief A wrapper for read-only access granted by a \p void \ref async_rw_mutex.
///
/// The wrapper is copyable.
template <>
struct async_rw_mutex_access_wrapper<void, void, async_rw_mutex_access_type::read>
class async_rw_mutex_access_wrapper<void, void, async_rw_mutex_access_type::read>
{
private:
using shared_state_type = std::shared_ptr<detail::async_rw_mutex_shared_state<void>>;
Expand All @@ -239,8 +254,11 @@ namespace pika::execution::experimental {
async_rw_mutex_access_wrapper& operator=(async_rw_mutex_access_wrapper const&) = default;
};

/// \brief A wrapper for read-write access granted by a \p void \ref async_rw_mutex.
///
/// The wrapper is move-only.
template <>
struct async_rw_mutex_access_wrapper<void, void, async_rw_mutex_access_type::readwrite>
class async_rw_mutex_access_wrapper<void, void, async_rw_mutex_access_type::readwrite>
{
private:
using shared_state_type = std::shared_ptr<detail::async_rw_mutex_shared_state<void>>;
Expand All @@ -258,31 +276,36 @@ namespace pika::execution::experimental {
async_rw_mutex_access_wrapper& operator=(async_rw_mutex_access_wrapper const&) = delete;
};

/// Read-write mutex where access is granted to a value through senders.
/// \brief Read-write mutex where access is granted to a value through senders.
///
/// The wrapped value is accessed through \ref read and \ref readwrite, both of which return
/// senders which send a wrapped value on the value channel when the wrapped value is safe to
/// read or write.
///
/// The wrapped value is accessed through read and readwrite, both of which
/// return senders which call set_value on a connected receiver when the
/// wrapped value is safe to read or write. The senders send the value
/// through a wrapper type which is implicitly convertible to a reference of
/// the wrapped value. Read-only senders send wrappers that are convertible
/// to const references.
/// A read-write sender gives exclusive access to the wrapped value, while a read-only sender
/// allows concurrent access to the value (with other read-only accesses).
///
/// A read-write sender gives exclusive access to the wrapped value, while a
/// read-only sender gives shared (with other read-only senders) access to
/// the value.
/// When the wrapped type is \p void, the mutex acts as a simple mutex around some externally
/// managed resource. The mutex still allows read-write and read-only access when the type is \p
/// void. The read-write wrapper types are move-only. The read-only wrapper types are copyable.
///
/// A void mutex acts as a mutex around some user-managed resource, i.e. the
/// void mutex does not manage any value and the types sent by the senders
/// are not convertible. The sent types are copyable and release access to
/// the protected resource when released.
/// The order in which senders signal a receiver is determined by the order in which the senders
/// are retrieved from the mutex. Connecting and starting the senders is thread-safe.
///
/// The order in which senders call set_value is determined by the order in
/// which the senders are retrieved from the mutex. Connecting and starting
/// the senders is thread-safe.
/// The mutex is move-only.
///
/// Retrieving senders from the mutex is not thread-safe.
/// \warning Because access to the wrapped value is granted in the order that it is requested
/// from the mutex, there is a risk of deadlocks if senders of later accesses are started and
/// waited for without starting senders of earlier accesses.
///
/// The mutex is movable and non-copyable.
/// \warning Retrieving senders from the mutex is not thread-safe. The senders of the mutex are
/// intended to be accessed in synchronous code, while the access provided by the senders
/// themselves are safe to access concurrently.
///
/// \tparam ReadWriteT The type of the wrapped type.
/// \tparam ReadT The type to use for read-only accesses of the wrapped type. Defaults to \ref
/// ReadWriteT.
/// \tparam Allocator The allocator to use for allocating the internal shared state.
template <typename ReadWriteT = void, typename ReadT = ReadWriteT,
typename Allocator = pika::detail::internal_allocator<>>
class async_rw_mutex;
Expand Down Expand Up @@ -503,18 +526,28 @@ namespace pika::execution::experimental {
struct sender;

public:
/// \brief The type of read-only types accessed through the mutex.
using read_type = std::decay_t<ReadT> const;

/// \brief The type of read-write types accessed through the mutex.
using readwrite_type = std::decay_t<ReadWriteT>;

// TODO: Remove?
using value_type = readwrite_type;

/// \brief The wrapper type sent by read-only-access senders.
using read_access_type = async_rw_mutex_access_wrapper<readwrite_type, read_type,
async_rw_mutex_access_type::read>;

/// \brief The wrapper type sent by read-write-access senders.
using readwrite_access_type = async_rw_mutex_access_wrapper<readwrite_type, read_type,
async_rw_mutex_access_type::readwrite>;

using allocator_type = Allocator;

async_rw_mutex() = delete;

/// \brief Construct a new mutex with the wrapped value initialized to \p u.
template <typename U,
typename = std::enable_if_t<!std::is_same<std::decay_t<U>, async_rw_mutex>::value>>
explicit async_rw_mutex(U&& u, allocator_type const& alloc = {})
Expand All @@ -526,7 +559,15 @@ namespace pika::execution::experimental {
async_rw_mutex& operator=(async_rw_mutex&&) = default;
async_rw_mutex(async_rw_mutex const&) = delete;
async_rw_mutex& operator=(async_rw_mutex const&) = delete;

/// \brief Destroy the mutex.
///
/// The destructor does not wait or require that all accesses through senders have
/// completed. The wrapped value is kept alive in a shared state managed by the senders,
/// until the last access completes, or the destructor of the \ref async_rw_mutex runs,
/// whichever happens later.
~async_rw_mutex() = default;

/// \brief Access the wrapped value in read-only mode through a sender.
sender<async_rw_mutex_access_type::read> read()
{
if (prev_access == async_rw_mutex_access_type::readwrite)
Expand All @@ -551,6 +592,7 @@ namespace pika::execution::experimental {
return {prev_state, state};
}

/// \brief Access the wrapped value in read-write mode through a sender.
sender<async_rw_mutex_access_type::readwrite> readwrite()
{
auto shared_prev_state = std::move(state);
Expand Down
Loading