Skip to content

Commit

Permalink
Support ASIO below 1.66.0
Browse files Browse the repository at this point in the history
  • Loading branch information
uNetworkingAB committed Sep 5, 2021
1 parent f6b00aa commit c06112c
Showing 1 changed file with 81 additions and 43 deletions.
124 changes: 81 additions & 43 deletions src/eventing/asio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ extern "C" {
#include <iostream>
#include <mutex>
#include <memory>
#include <boost/version.hpp>

// New interfaces require boost 1.66.0
#if BOOST_VERSION < 106600
#define LIBUS_USE_OLD_ASIO
#define LIBUS_ASIO_DESCRIPTOR boost::asio::posix::stream_descriptor
#define LIBUS_ASIO_LOOP boost::asio::io_service
#else
#define LIBUS_ASIO_DESCRIPTOR boost::asio::posix::descriptor
#define LIBUS_ASIO_LOOP boost::asio::io_context
#endif

// setting polls to 1 disables fallthrough
int polls = 0; // temporary solution keeping track of outstanding work
Expand All @@ -39,14 +50,14 @@ struct boost_timer : us_internal_callback_t {

unsigned char nr = 0;

boost_timer(boost::asio::io_context *io) : timer(*io) {
boost_timer(LIBUS_ASIO_LOOP *io) : timer(*io) {
isValid.reset(this, [](boost_timer *t) {});
}
};

struct boost_block_poll_t : boost::asio::posix::descriptor {
struct boost_block_poll_t : LIBUS_ASIO_DESCRIPTOR {

boost_block_poll_t(boost::asio::io_context *io, us_poll_t *p) : boost::asio::posix::descriptor(*io), p(p) {
boost_block_poll_t(LIBUS_ASIO_LOOP *io, us_poll_t *p) : LIBUS_ASIO_DESCRIPTOR(*io), p(p) {
isValid.reset(this, [](boost_block_poll_t *t) {});
}

Expand Down Expand Up @@ -76,6 +87,8 @@ void us_poll_free(struct us_poll_t *p, struct us_loop_t *loop) {
}

void poll_for_error(struct boost_block_poll_t *boost_block) {
/* There is no such thing as polling for error in old asio */
#ifndef LIBUS_USE_OLD_ASIO
polls++;
boost_block->async_wait(boost::asio::posix::descriptor::wait_type::wait_error, [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](boost::system::error_code ec) {
polls--;
Expand All @@ -99,57 +112,82 @@ void poll_for_error(struct boost_block_poll_t *boost_block) {
us_internal_dispatch_ready_poll(boost_block->p, 1, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE);
}
});
#endif
}

void poll_for_read(struct boost_block_poll_t *boost_block);

inline void handle_read(const std::weak_ptr<boost_block_poll_t> &weakBoostBlock, unsigned char nr, boost::system::error_code ec) {
if (ec != boost::asio::error::operation_aborted) {

// post mortem check
struct boost_block_poll_t *boost_block;
if (auto observe = weakBoostBlock.lock()) {
boost_block = observe.get();
} else {
return;
}

// get boost_block from weakptr
if (nr != boost_block->nr) {
return;
}

poll_for_read(boost_block);
us_internal_dispatch_ready_poll(boost_block->p, ec ? -1 : 0, LIBUS_SOCKET_READABLE);
}
}

void poll_for_read(struct boost_block_poll_t *boost_block) {
polls++;
#ifndef LIBUS_USE_OLD_ASIO
boost_block->async_wait(boost::asio::posix::descriptor::wait_type::wait_read, [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](boost::system::error_code ec) {
polls--;
handle_read(weakBoostBlock, nr, ec);
});
#else
boost_block->async_read_some(boost::asio::null_buffers(), [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](boost::system::error_code ec, std::size_t) {
polls--;
handle_read(weakBoostBlock, nr, ec);
});
#endif
}

if (ec != boost::asio::error::operation_aborted) {
void poll_for_write(struct boost_block_poll_t *boost_block);

// post mortem check
struct boost_block_poll_t *boost_block;
if (auto observe = weakBoostBlock.lock()) {
boost_block = observe.get();
} else {
return;
}
inline void handle_write(const std::weak_ptr<boost_block_poll_t> &weakBoostBlock, unsigned char nr, boost::system::error_code ec) {
if (ec != boost::asio::error::operation_aborted) {

// get boost_block from weakptr
if (nr != boost_block->nr) {
return;
}
// post mortem check
struct boost_block_poll_t *boost_block;
if (auto observe = weakBoostBlock.lock()) {
boost_block = observe.get();
} else {
return;
}

poll_for_read(boost_block);
us_internal_dispatch_ready_poll(boost_block->p, ec ? -1 : 0, LIBUS_SOCKET_READABLE);
// get boost_block from weakptr
if (nr != boost_block->nr) {
return;
}
});
poll_for_write(boost_block);
us_internal_dispatch_ready_poll(boost_block->p, ec ? -1 : 0, LIBUS_SOCKET_WRITABLE);
}
}

void poll_for_write(struct boost_block_poll_t *boost_block) {
polls++;
#ifndef LIBUS_USE_OLD_ASIO
boost_block->async_wait(boost::asio::posix::descriptor::wait_type::wait_write, [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](boost::system::error_code ec) {
polls--;

if (ec != boost::asio::error::operation_aborted) {

// post mortem check
struct boost_block_poll_t *boost_block;
if (auto observe = weakBoostBlock.lock()) {
boost_block = observe.get();
} else {
return;
}

// get boost_block from weakptr
if (nr != boost_block->nr) {
return;
}
poll_for_write(boost_block);
us_internal_dispatch_ready_poll(boost_block->p, ec ? -1 : 0, LIBUS_SOCKET_WRITABLE);
}
handle_write(weakBoostBlock, nr, ec);
});
#else
boost_block->async_write_some(boost::asio::null_buffers(), [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](boost::system::error_code ec, std::size_t) {
polls--;
handle_write(weakBoostBlock, nr, ec);
});
#endif
}

void us_poll_start(struct us_poll_t *p, struct us_loop_t *loop, int events) {
Expand Down Expand Up @@ -213,7 +251,7 @@ LIBUS_SOCKET_DESCRIPTOR us_poll_fd(struct us_poll_t *p) {
struct us_loop_t *us_create_loop(void *hint, void (*wakeup_cb)(struct us_loop_t *loop), void (*pre_cb)(struct us_loop_t *loop), void (*post_cb)(struct us_loop_t *loop), unsigned int ext_size) {
struct us_loop_t *loop = (struct us_loop_t *) malloc(sizeof(struct us_loop_t) + ext_size);

loop->io = hint ? hint : new boost::asio::io_context();
loop->io = hint ? hint : new LIBUS_ASIO_LOOP();
loop->is_default = hint != 0;

// here we create two unreffed handles - timer and async
Expand All @@ -232,7 +270,7 @@ void us_loop_free(struct us_loop_t *loop) {
us_internal_loop_data_free(loop);

if (!loop->is_default) {
delete (boost::asio::io_context *) loop->io;
delete (LIBUS_ASIO_LOOP *) loop->io;
}

free(loop);
Expand All @@ -250,13 +288,13 @@ void us_loop_run(struct us_loop_t *loop) {
// everywhere so it's negligible for what it solves (we must have pre, post callbacks)
while (polls) {
us_internal_loop_pre(loop);
size_t num = ((boost::asio::io_context *) loop->io)->run_one();
size_t num = ((LIBUS_ASIO_LOOP *) loop->io)->run_one();
if (!num) {
break;
}

for (int i = 0; true; i++) {
num = ((boost::asio::io_context *) loop->io)->poll_one();
num = ((LIBUS_ASIO_LOOP *) loop->io)->poll_one();
if (!num || i == 999) {
break;
}
Expand All @@ -267,7 +305,7 @@ void us_loop_run(struct us_loop_t *loop) {

struct us_poll_t *us_create_poll(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
struct us_poll_t *p = (struct us_poll_t *) malloc(sizeof(struct us_poll_t) + ext_size);
p->boost_block = new boost_block_poll_t( (boost::asio::io_context *)loop->io, p);
p->boost_block = new boost_block_poll_t( (LIBUS_ASIO_LOOP *)loop->io, p);

return p;
}
Expand All @@ -287,7 +325,7 @@ struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsi
struct boost_timer *cb = (struct boost_timer *) malloc(sizeof(struct boost_timer) + ext_size);

// inplace construct the timer on this callback_t
new (cb) boost_timer((boost::asio::io_context *)loop->io);
new (cb) boost_timer((LIBUS_ASIO_LOOP *)loop->io);

cb->loop = loop;
cb->cb_expects_the_loop = 0;
Expand Down Expand Up @@ -413,7 +451,7 @@ void us_internal_async_wakeup(struct us_internal_async *a) {
// really we should use the loops mutex, and have the loops constructor
// use its own mutex, then we are guaranteed to have visibility here
cb->m.lock();
boost::asio::io_context *io = (boost::asio::io_context *)cb->loop->io;
LIBUS_ASIO_LOOP *io = (LIBUS_ASIO_LOOP *)cb->loop->io;
cb->m.unlock();

// should increase and decrease polls (again, loop mutex)
Expand Down

0 comments on commit c06112c

Please sign in to comment.