From c06112c89b4c1cf5a09b5f8daa2def756b925889 Mon Sep 17 00:00:00 2001 From: Alex Hultman Date: Sun, 5 Sep 2021 05:06:23 +0000 Subject: [PATCH] Support ASIO below 1.66.0 --- src/eventing/asio.cpp | 124 +++++++++++++++++++++++++++--------------- 1 file changed, 81 insertions(+), 43 deletions(-) diff --git a/src/eventing/asio.cpp b/src/eventing/asio.cpp index 4f70498..6bbdcf7 100644 --- a/src/eventing/asio.cpp +++ b/src/eventing/asio.cpp @@ -27,6 +27,17 @@ extern "C" { #include #include #include +#include + +// New interfaces require boost 1.66.0 +#if BOOST_VERSION < 106600 +#define LIBUS_USE_OLD_ASIO +#define LIBUS_ASIO_DESCRIPTOR boost::asio::posix::stream_descriptor +#define LIBUS_ASIO_LOOP boost::asio::io_service +#else +#define LIBUS_ASIO_DESCRIPTOR boost::asio::posix::descriptor +#define LIBUS_ASIO_LOOP boost::asio::io_context +#endif // setting polls to 1 disables fallthrough int polls = 0; // temporary solution keeping track of outstanding work @@ -39,14 +50,14 @@ struct boost_timer : us_internal_callback_t { unsigned char nr = 0; - boost_timer(boost::asio::io_context *io) : timer(*io) { + boost_timer(LIBUS_ASIO_LOOP *io) : timer(*io) { isValid.reset(this, [](boost_timer *t) {}); } }; -struct boost_block_poll_t : boost::asio::posix::descriptor { +struct boost_block_poll_t : LIBUS_ASIO_DESCRIPTOR { - boost_block_poll_t(boost::asio::io_context *io, us_poll_t *p) : boost::asio::posix::descriptor(*io), p(p) { + boost_block_poll_t(LIBUS_ASIO_LOOP *io, us_poll_t *p) : LIBUS_ASIO_DESCRIPTOR(*io), p(p) { isValid.reset(this, [](boost_block_poll_t *t) {}); } @@ -76,6 +87,8 @@ void us_poll_free(struct us_poll_t *p, struct us_loop_t *loop) { } void poll_for_error(struct boost_block_poll_t *boost_block) { + /* There is no such thing as polling for error in old asio */ +#ifndef LIBUS_USE_OLD_ASIO polls++; boost_block->async_wait(boost::asio::posix::descriptor::wait_type::wait_error, [nr = boost_block->nr, weakBoostBlock = std::weak_ptr(boost_block->isValid)](boost::system::error_code ec) { polls--; @@ -99,57 +112,82 @@ void poll_for_error(struct boost_block_poll_t *boost_block) { us_internal_dispatch_ready_poll(boost_block->p, 1, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE); } }); +#endif +} + +void poll_for_read(struct boost_block_poll_t *boost_block); + +inline void handle_read(const std::weak_ptr &weakBoostBlock, unsigned char nr, boost::system::error_code ec) { + if (ec != boost::asio::error::operation_aborted) { + + // post mortem check + struct boost_block_poll_t *boost_block; + if (auto observe = weakBoostBlock.lock()) { + boost_block = observe.get(); + } else { + return; + } + + // get boost_block from weakptr + if (nr != boost_block->nr) { + return; + } + + poll_for_read(boost_block); + us_internal_dispatch_ready_poll(boost_block->p, ec ? -1 : 0, LIBUS_SOCKET_READABLE); + } } void poll_for_read(struct boost_block_poll_t *boost_block) { polls++; +#ifndef LIBUS_USE_OLD_ASIO boost_block->async_wait(boost::asio::posix::descriptor::wait_type::wait_read, [nr = boost_block->nr, weakBoostBlock = std::weak_ptr(boost_block->isValid)](boost::system::error_code ec) { polls--; + handle_read(weakBoostBlock, nr, ec); + }); +#else + boost_block->async_read_some(boost::asio::null_buffers(), [nr = boost_block->nr, weakBoostBlock = std::weak_ptr(boost_block->isValid)](boost::system::error_code ec, std::size_t) { + polls--; + handle_read(weakBoostBlock, nr, ec); + }); +#endif +} - if (ec != boost::asio::error::operation_aborted) { +void poll_for_write(struct boost_block_poll_t *boost_block); - // post mortem check - struct boost_block_poll_t *boost_block; - if (auto observe = weakBoostBlock.lock()) { - boost_block = observe.get(); - } else { - return; - } +inline void handle_write(const std::weak_ptr &weakBoostBlock, unsigned char nr, boost::system::error_code ec) { + if (ec != boost::asio::error::operation_aborted) { - // get boost_block from weakptr - if (nr != boost_block->nr) { - return; - } + // post mortem check + struct boost_block_poll_t *boost_block; + if (auto observe = weakBoostBlock.lock()) { + boost_block = observe.get(); + } else { + return; + } - poll_for_read(boost_block); - us_internal_dispatch_ready_poll(boost_block->p, ec ? -1 : 0, LIBUS_SOCKET_READABLE); + // get boost_block from weakptr + if (nr != boost_block->nr) { + return; } - }); + poll_for_write(boost_block); + us_internal_dispatch_ready_poll(boost_block->p, ec ? -1 : 0, LIBUS_SOCKET_WRITABLE); + } } void poll_for_write(struct boost_block_poll_t *boost_block) { polls++; +#ifndef LIBUS_USE_OLD_ASIO boost_block->async_wait(boost::asio::posix::descriptor::wait_type::wait_write, [nr = boost_block->nr, weakBoostBlock = std::weak_ptr(boost_block->isValid)](boost::system::error_code ec) { polls--; - - if (ec != boost::asio::error::operation_aborted) { - - // post mortem check - struct boost_block_poll_t *boost_block; - if (auto observe = weakBoostBlock.lock()) { - boost_block = observe.get(); - } else { - return; - } - - // get boost_block from weakptr - if (nr != boost_block->nr) { - return; - } - poll_for_write(boost_block); - us_internal_dispatch_ready_poll(boost_block->p, ec ? -1 : 0, LIBUS_SOCKET_WRITABLE); - } + handle_write(weakBoostBlock, nr, ec); }); +#else + boost_block->async_write_some(boost::asio::null_buffers(), [nr = boost_block->nr, weakBoostBlock = std::weak_ptr(boost_block->isValid)](boost::system::error_code ec, std::size_t) { + polls--; + handle_write(weakBoostBlock, nr, ec); + }); +#endif } void us_poll_start(struct us_poll_t *p, struct us_loop_t *loop, int events) { @@ -213,7 +251,7 @@ LIBUS_SOCKET_DESCRIPTOR us_poll_fd(struct us_poll_t *p) { struct us_loop_t *us_create_loop(void *hint, void (*wakeup_cb)(struct us_loop_t *loop), void (*pre_cb)(struct us_loop_t *loop), void (*post_cb)(struct us_loop_t *loop), unsigned int ext_size) { struct us_loop_t *loop = (struct us_loop_t *) malloc(sizeof(struct us_loop_t) + ext_size); - loop->io = hint ? hint : new boost::asio::io_context(); + loop->io = hint ? hint : new LIBUS_ASIO_LOOP(); loop->is_default = hint != 0; // here we create two unreffed handles - timer and async @@ -232,7 +270,7 @@ void us_loop_free(struct us_loop_t *loop) { us_internal_loop_data_free(loop); if (!loop->is_default) { - delete (boost::asio::io_context *) loop->io; + delete (LIBUS_ASIO_LOOP *) loop->io; } free(loop); @@ -250,13 +288,13 @@ void us_loop_run(struct us_loop_t *loop) { // everywhere so it's negligible for what it solves (we must have pre, post callbacks) while (polls) { us_internal_loop_pre(loop); - size_t num = ((boost::asio::io_context *) loop->io)->run_one(); + size_t num = ((LIBUS_ASIO_LOOP *) loop->io)->run_one(); if (!num) { break; } for (int i = 0; true; i++) { - num = ((boost::asio::io_context *) loop->io)->poll_one(); + num = ((LIBUS_ASIO_LOOP *) loop->io)->poll_one(); if (!num || i == 999) { break; } @@ -267,7 +305,7 @@ void us_loop_run(struct us_loop_t *loop) { struct us_poll_t *us_create_poll(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) { struct us_poll_t *p = (struct us_poll_t *) malloc(sizeof(struct us_poll_t) + ext_size); - p->boost_block = new boost_block_poll_t( (boost::asio::io_context *)loop->io, p); + p->boost_block = new boost_block_poll_t( (LIBUS_ASIO_LOOP *)loop->io, p); return p; } @@ -287,7 +325,7 @@ struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsi struct boost_timer *cb = (struct boost_timer *) malloc(sizeof(struct boost_timer) + ext_size); // inplace construct the timer on this callback_t - new (cb) boost_timer((boost::asio::io_context *)loop->io); + new (cb) boost_timer((LIBUS_ASIO_LOOP *)loop->io); cb->loop = loop; cb->cb_expects_the_loop = 0; @@ -413,7 +451,7 @@ void us_internal_async_wakeup(struct us_internal_async *a) { // really we should use the loops mutex, and have the loops constructor // use its own mutex, then we are guaranteed to have visibility here cb->m.lock(); - boost::asio::io_context *io = (boost::asio::io_context *)cb->loop->io; + LIBUS_ASIO_LOOP *io = (LIBUS_ASIO_LOOP *)cb->loop->io; cb->m.unlock(); // should increase and decrease polls (again, loop mutex)