Skip to content

Commit

Permalink
Async ops now support cancel_after and similar tokens
Browse files Browse the repository at this point in the history
All initiations have now an associated executor.

close #330
  • Loading branch information
anarthal authored Aug 8, 2024
1 parent a20fc3e commit 36dc476
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 145 deletions.
124 changes: 23 additions & 101 deletions example/timeouts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,22 @@
#include <boost/mysql/handshake_params.hpp>
#include <boost/mysql/row_view.hpp>
#include <boost/mysql/tcp_ssl.hpp>
#include <boost/mysql/throw_on_error.hpp>

#include <boost/asio/as_tuple.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/cancel_after.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/use_awaitable.hpp>

#include <chrono>
#include <exception>
#include <iostream>
#include <stdexcept>

#if defined(BOOST_ASIO_HAS_CO_AWAIT) && !defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
#if defined(BOOST_ASIO_HAS_CO_AWAIT)

#include <boost/asio/experimental/awaitable_operators.hpp>

using namespace boost::asio::experimental::awaitable_operators;
using boost::asio::use_awaitable;
using boost::mysql::error_code;

constexpr std::chrono::milliseconds TIMEOUT(8000);

void print_employee(boost::mysql::row_view employee)
{
std::cout << "Employee '" << employee.at(0) << " " // first_name (string)
Expand All @@ -46,112 +36,46 @@ void print_employee(boost::mysql::row_view employee)
}

/**
* Helper functions to check whether an async operation, launched in parallel with
* a timer, was successful, resulted in an error or timed out. The timer is always the first operation.
* If the variant holds the first alternative, the timer fired before
* the async operation completed, which means a timeout. We'll be using as_tuple with use_awaitable to be able
* to use boost::mysql::throw_on_error and include server diagnostics in the thrown exceptions.
*/
template <class T>
T check_error(
std::variant<std::monostate, std::tuple<error_code, T>>&& op_result,
const boost::mysql::diagnostics& diag = {}
)
{
if (op_result.index() == 0)
{
throw std::runtime_error("Operation timed out");
}
auto [ec, res] = std::get<1>(std::move(op_result));
boost::mysql::throw_on_error(ec, diag);
return res;
}

void check_error(
const std::variant<std::monostate, std::tuple<error_code>>& op_result,
const boost::mysql::diagnostics& diag
)
{
if (op_result.index() == 0)
{
throw std::runtime_error("Operation timed out");
}
auto [ec] = std::get<1>(op_result);
boost::mysql::throw_on_error(ec, diag);
}

// Using this completion token instead of plain use_awaitable prevents
// co_await from throwing exceptions. Instead, co_await will return a std::tuple<error_code>
// with a non-zero code on error. We will then use boost::mysql::throw_on_error
// to throw exceptions with embedded diagnostics, if available. If you
// employ plain use_awaitable, you will get boost::system::system_error exceptions
// instead of boost::mysql::error_with_diagnostics exceptions. This is a limitation of use_awaitable.
constexpr auto tuple_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable);

/**
* We use Boost.Asio's cancellation capabilities to implement timeouts for our
* asynchronous operations. This is not something specific to Boost.MySQL, and
* We use Boost.Asio's cancel_after completion token to cancel operations
* after a certain time has elapsed. This is not something specific to Boost.MySQL, and
* can be used with any other asynchronous operation that follows Asio's model.
*
* Each time we invoke an asynchronous operation, we also call timer_type::async_wait.
* We then use Asio's overload for operator || to run the timer wait and the async operation
* in parallel. Once the first of them finishes, the other operation is cancelled
* (the behavior is similar to JavaScripts's Promise.race).
* If we co_await the awaitable returned by operator ||, we get a std::variant<std::monostate, T>,
* where T is the async operation's result type. If the timer wait finishes first (we have a
* timeout), the variant will hold the std::monostate at index 0; otherwise, it will have the async
* operation's result at index 1. The function check_error throws an exception in the case of
* timeout and extracts the operation's result otherwise.
* If the operation times out, it will fail with a boost::asio::error::operation_aborted
* error code.
*
* If any of the MySQL specific operations result in a timeout, the connection is left
* in an unspecified state. You should close it and re-open it to get it working again.
*/
boost::asio::awaitable<void> coro_main(
boost::mysql::tcp_ssl_connection& conn,
boost::asio::ip::tcp::resolver& resolver,
boost::asio::steady_timer& timer,
const boost::mysql::handshake_params& params,
const char* hostname,
const char* company_id
)
{
boost::mysql::diagnostics diag;
using boost::asio::cancel_after;
constexpr std::chrono::seconds timeout(8);

// TODO: thrown exceptions don't contain diagnostics.
// Should be solved by https://github.com/boostorg/mysql/issues/329

// Resolve hostname
timer.expires_after(TIMEOUT);
auto endpoints = check_error(co_await (
timer.async_wait(use_awaitable) ||
resolver.async_resolve(hostname, boost::mysql::default_port_string, tuple_awaitable)
));

// Connect to server. Note that we need to reset the timer before using it again.
timer.expires_after(TIMEOUT);
auto op_result = co_await (
timer.async_wait(use_awaitable) ||
conn.async_connect(*endpoints.begin(), params, diag, tuple_awaitable)
);
check_error(op_result, diag);
auto endpoints = co_await resolver
.async_resolve(hostname, boost::mysql::default_port_string, cancel_after(timeout));

// Connect to server
co_await conn.async_connect(*endpoints.begin(), params, cancel_after(timeout));

// We will be using company_id, which is untrusted user input, so we will use a prepared
// statement.
auto stmt_op_result = co_await (
timer.async_wait(use_awaitable) ||
conn.async_prepare_statement(
"SELECT first_name, last_name, salary FROM employee WHERE company_id = ?",
diag,
tuple_awaitable
)
boost::mysql::statement stmt = co_await conn.async_prepare_statement(
"SELECT first_name, last_name, salary FROM employee WHERE company_id = ?",
cancel_after(timeout)
);
boost::mysql::statement stmt = check_error(std::move(stmt_op_result), diag);

// Execute the statement
boost::mysql::results result;
timer.expires_after(TIMEOUT);
op_result = co_await (
timer.async_wait(use_awaitable) ||
conn.async_execute(stmt.bind(company_id), result, diag, tuple_awaitable)
);
check_error(op_result, diag);
co_await conn.async_execute(stmt.bind(company_id), result, cancel_after(timeout));

// Print all the obtained rows
for (boost::mysql::row_view employee : result.rows())
Expand All @@ -160,8 +84,7 @@ boost::asio::awaitable<void> coro_main(
}

// Notify the MySQL server we want to quit, then close the underlying connection.
op_result = co_await (timer.async_wait(use_awaitable) || conn.async_close(diag, tuple_awaitable));
check_error(op_result, diag);
co_await conn.async_close(cancel_after(timeout));
}

void main_impl(int argc, char** argv)
Expand All @@ -182,7 +105,6 @@ void main_impl(int argc, char** argv)
boost::asio::io_context ctx;
boost::asio::ssl::context ssl_ctx(boost::asio::ssl::context::tls_client);
boost::mysql::tcp_ssl_connection conn(ctx, ssl_ctx);
boost::asio::steady_timer timer(ctx.get_executor());

// Connection parameters
boost::mysql::handshake_params params(
Expand All @@ -197,8 +119,8 @@ void main_impl(int argc, char** argv)
// The entry point. We pass in a function returning a boost::asio::awaitable<void>, as required.
boost::asio::co_spawn(
ctx.get_executor(),
[&conn, &resolver, &timer, params, hostname, company_id] {
return coro_main(conn, resolver, timer, params, hostname, company_id);
[&conn, &resolver, params, hostname, company_id] {
return coro_main(conn, resolver, params, hostname, company_id);
},
// If any exception is thrown in the coroutine body, rethrow it.
[](std::exception_ptr ptr) {
Expand Down
28 changes: 20 additions & 8 deletions include/boost/mysql/connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <boost/mysql/pool_params.hpp>

#include <boost/mysql/detail/access.hpp>
#include <boost/mysql/detail/async_helpers.hpp>
#include <boost/mysql/detail/config.hpp>
#include <boost/mysql/detail/connection_pool_fwd.hpp>

Expand Down Expand Up @@ -265,8 +266,10 @@ class connection_pool
return std::chrono::seconds(30);
}

struct initiate_run
struct initiate_run : detail::initiation_base
{
using detail::initiation_base::initiation_base;

template <class Handler>
void operator()(Handler&& h, std::shared_ptr<detail::pool_impl> self)
{
Expand All @@ -280,8 +283,10 @@ class connection_pool
asio::any_completion_handler<void(error_code)> handler
);

struct initiate_get_connection
struct initiate_get_connection : detail::initiation_base
{
using detail::initiation_base::initiation_base;

template <class Handler>
void operator()(
Handler&& h,
Expand Down Expand Up @@ -309,7 +314,7 @@ class connection_pool
CompletionToken&& token
)
-> decltype(asio::async_initiate<CompletionToken, void(error_code, pooled_connection)>(
initiate_get_connection{},
std::declval<initiate_get_connection>(),
token,
diag,
impl_,
Expand All @@ -318,7 +323,7 @@ class connection_pool
{
BOOST_ASSERT(valid());
return asio::async_initiate<CompletionToken, void(error_code, pooled_connection)>(
initiate_get_connection{},
initiate_get_connection{get_executor()},
token,
diag,
impl_,
Expand Down Expand Up @@ -538,12 +543,19 @@ class connection_pool
* `~pooled_connection` and \ref pooled_connection::return_without_reset.
*/
template <BOOST_ASIO_COMPLETION_TOKEN_FOR(void(::boost::mysql::error_code)) CompletionToken>
auto async_run(CompletionToken&& token) BOOST_MYSQL_RETURN_TYPE(
decltype(asio::async_initiate<CompletionToken, void(error_code)>(initiate_run{}, token, impl_))
)
auto async_run(CompletionToken&& token)
BOOST_MYSQL_RETURN_TYPE(decltype(asio::async_initiate<CompletionToken, void(error_code)>(
std::declval<initiate_run>(),
token,
impl_
)))
{
BOOST_ASSERT(valid());
return asio::async_initiate<CompletionToken, void(error_code)>(initiate_run{}, token, impl_);
return asio::async_initiate<CompletionToken, void(error_code)>(
initiate_run{get_executor()},
token,
impl_
);
}

/// \copydoc async_get_connection(diagnostics&,CompletionToken&&)
Expand Down
33 changes: 33 additions & 0 deletions include/boost/mysql/detail/async_helpers.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_MYSQL_DETAIL_ASYNC_HELPERS_HPP
#define BOOST_MYSQL_DETAIL_ASYNC_HELPERS_HPP

#include <boost/asio/any_io_executor.hpp>

namespace boost {
namespace mysql {
namespace detail {

// Base class for initiation objects. Includes a bound executor, so they're compatible
// with asio::cancel_after and similar
struct initiation_base
{
asio::any_io_executor ex;

initiation_base(asio::any_io_executor ex) noexcept : ex(std::move(ex)) {}

using executor_type = asio::any_io_executor;
const executor_type& get_executor() const noexcept { return ex; }
};

} // namespace detail
} // namespace mysql
} // namespace boost

#endif
Loading

0 comments on commit 36dc476

Please sign in to comment.