Skip to content

Commit

Permalink
Made the example thread-safe
Browse files Browse the repository at this point in the history
  • Loading branch information
anarthal committed Nov 30, 2023
1 parent d354475 commit 4ce448f
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 26 deletions.
46 changes: 32 additions & 14 deletions example/connection_pool/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
#include <boost/mysql/pool_params.hpp>

#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/system/detail/error_code.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/system/error_code.hpp>

#include <cstddef>
#include <cstdlib>
#include <iostream>
#include <memory>
Expand Down Expand Up @@ -45,6 +46,9 @@

using namespace notes;

// The number of threads to use
static constexpr std::size_t num_threads = 5;

int main(int argc, char* argv[])
{
// Check command line arguments.
Expand All @@ -59,9 +63,10 @@ int main(int argc, char* argv[])
const char* mysql_password = argv[2];
const char* mysql_hostname = argv[3];

// An event loop, where the application will run. The server is single-
// threaded, so we set the concurrency hint to 1
boost::asio::io_context ioc{1};
// An event loop, where the application will run.
// We will use the main thread to run the pool, too, so we use
// one thread less than configured
boost::asio::thread_pool th_pool(num_threads - 1);

// Configuration for the connection pool
boost::mysql::pool_params pool_prms{
Expand All @@ -77,35 +82,48 @@ int main(int argc, char* argv[])
// Database to use when connecting
"boost_mysql_examples",
};
auto shared_st = std::make_shared<shared_state>(boost::mysql::connection_pool(ioc, std::move(pool_prms)));

// Create the connection pool
auto shared_st = std::make_shared<shared_state>(boost::mysql::connection_pool(
// Using thread_safe will create a strand for the connection pool.
// This allows us to share the pool between sessions, which may run
// concurrently, on different threads.
boost::mysql::pool_executor_params::thread_safe(th_pool.get_executor()),

// Pool config
std::move(pool_prms)
));

// A signal_set allows us to intercept SIGINT and SIGTERM and
// exit gracefully
boost::asio::signal_set signals{ioc.get_executor(), SIGINT, SIGTERM};
boost::asio::signal_set signals{th_pool.get_executor(), SIGINT, SIGTERM};

// Launch the MySQL pool
shared_st->pool.async_run(boost::asio::detached);

// Start listening for HTTP connections. This will run until the context is stopped
auto ec = launch_server(ioc, shared_st);
auto ec = launch_server(th_pool.get_executor(), shared_st);
if (ec)
{
std::cerr << "Error launching server: " << ec << std::endl;
exit(EXIT_FAILURE);
}

// Capture SIGINT and SIGTERM to perform a clean shutdown
signals.async_wait([shared_st, &ioc](boost::system::error_code, int) {
signals.async_wait([shared_st, &th_pool](boost::system::error_code, int) {
// Stop the connection pool. This will cause
shared_st->pool.cancel();

// Stop the io_context. This will cause run() to return
ioc.stop();
// Stop the execution context. This will cause main to exit
th_pool.stop();
});

// Run the io_context. This will block until the context is stopped by
// a signal and all outstanding async tasks are finished.
ioc.run();
// Attach the current thread to the thread pool. This will block
// until stop() is called
th_pool.attach();

// Wait until all threads have exited
th_pool.join();

// (If we get here, it means we got a SIGINT or SIGTERM)
return EXIT_SUCCESS;
Expand Down
30 changes: 21 additions & 9 deletions example/connection_pool/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
#include <boost/mysql/error_with_diagnostics.hpp>
#include <boost/mysql/string_view.hpp>

#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/address.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/beast/http/error.hpp>
#include <boost/beast/http/message.hpp>
Expand Down Expand Up @@ -426,20 +428,28 @@ static void run_http_session(

// Implements the server's accept loop. The server will
// listen for connections until stopped.
static void do_accept(std::shared_ptr<asio::ip::tcp::acceptor> acceptor, std::shared_ptr<shared_state> st)
static void do_accept(
asio::any_io_executor executor, // The original executor (without strands)
std::shared_ptr<asio::ip::tcp::acceptor> acceptor,
std::shared_ptr<shared_state> st
)
{
acceptor->async_accept([st, acceptor](error_code ec, asio::ip::tcp::socket sock) {
acceptor->async_accept([executor, st, acceptor](error_code ec, asio::ip::tcp::socket sock) {
// If there was an error accepting the connection, exit our loop
if (ec)
return log_error(ec, "accept");

// Launch a new session for this connection. Each session gets its
// own stackful coroutine, so we can get back to listening for new connections.
boost::asio::spawn(
sock.get_executor(),
// Every session gets its own strand. This prevents data races.
asio::make_strand(executor),

// The actual coroutine
[st, socket = std::move(sock)](boost::asio::yield_context yield) mutable {
run_http_session(std::move(socket), st, yield);
run_http_session(std::move(socket), std::move(st), yield);
},

// All errors in the session are handled via error codes or by catching
// exceptions explicitly. An unhandled exception here means an error.
// Rethrowing it will propagate the exception, making io_context::run()
Expand All @@ -451,18 +461,20 @@ static void do_accept(std::shared_ptr<asio::ip::tcp::acceptor> acceptor, std::sh
);

// Accept a new connection
do_accept(acceptor, st);
do_accept(executor, acceptor, st);
});
}

} // namespace

error_code notes::launch_server(boost::asio::io_context& ctx, std::shared_ptr<shared_state> st)
error_code notes::launch_server(boost::asio::any_io_executor ex, std::shared_ptr<shared_state> st)
{
error_code ec;

// An object that allows us to acept incoming TCP connections
auto acceptor = std::make_shared<asio::ip::tcp::acceptor>(ctx);
// An object that allows us to acept incoming TCP connections.
// Since we're in a multi-threaded environment, we create a strand for the acceptor,
// so all accept handlers are run serialized
auto acceptor = std::make_shared<asio::ip::tcp::acceptor>(asio::make_strand(ex));

// The endpoint where the server will listen. Edit this if you want to
// change the address or port we bind to.
Expand All @@ -489,7 +501,7 @@ error_code notes::launch_server(boost::asio::io_context& ctx, std::shared_ptr<sh
return ec;

// Launch the acceptor loop
do_accept(std::move(acceptor), std::move(st));
do_accept(std::move(ex), std::move(acceptor), std::move(st));

// Done
return error_code();
Expand Down
8 changes: 5 additions & 3 deletions example/connection_pool/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <boost/mysql/connection_pool.hpp>

#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/system/error_code.hpp>

Expand All @@ -35,9 +36,10 @@ struct shared_state

// Launches a HTTP server that will listen on 0.0.0.0:4000.
// If the server fails to launch (e.g. because the port is aleady in use),
// returns a non-zero error code. The server runs in the background
// until ctx is stopped
boost::system::error_code launch_server(boost::asio::io_context& ctx, std::shared_ptr<shared_state> state);
// returns a non-zero error code. ex should identify the io_context or thread_pool
// where the server should run. The server is run until the underlying execution
// context is stopped.
boost::system::error_code launch_server(boost::asio::any_io_executor ex, std::shared_ptr<shared_state> state);

} // namespace notes

Expand Down

0 comments on commit 4ce448f

Please sign in to comment.