Skip to content

Commit

Permalink
fix timeout bug
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle committed Apr 12, 2024
1 parent 21f018b commit ddfeb7f
Showing 1 changed file with 65 additions and 59 deletions.
124 changes: 65 additions & 59 deletions include/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
});
}

coro_io::ExecutorWrapper<> &get_executor() { return executor_wrapper_; }

#ifdef CINATRA_ENABLE_SSL
bool init_ssl(int verify_mode, const std::string &base_path,
const std::string &cert_file, const std::string &sni_hostname) {
Expand Down Expand Up @@ -284,12 +286,13 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
if (!ok) {
co_return resp_data{std::make_error_code(std::errc::protocol_error), 404};
}

auto future = start_timer(conn_timeout_duration_, "connect timer");

data = co_await connect(u);
if (auto ec = co_await wait_future(std::move(future)); ec) {
co_return resp_data{ec, 404};
{
auto time_out_guard =
timer_guard(this, conn_timeout_duration_, "connect timer");
data = co_await connect(u);
}
if (socket_->is_timeout_) {
co_return resp_data{std::make_error_code(std::errc::timed_out), 404};
}
if (!data.net_err) {
data.status = 200;
Expand Down Expand Up @@ -621,23 +624,27 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

void set_max_single_part_size(size_t size) { max_single_part_size_ = size; }

async_simple::Future<async_simple::Unit> start_timer(
std::chrono::steady_clock::duration duration, std::string msg) {
is_timeout_ = false;

async_simple::Promise<async_simple::Unit> promise;
auto fut = promise.getFuture();
struct timer_guard {
timer_guard(coro_http_client *self,
std::chrono::steady_clock::duration duration, std::string msg)
: self(self) {
self->socket_->is_timeout_ = false;

if (enable_timeout_) {
timeout(timer_, std::move(promise), duration, std::move(msg))
.via(&executor_wrapper_)
.detach();
if (self->enable_timeout_) {
self->timeout(self->timer_, duration, std::move(msg))
.start([](auto &&) {
});
}
return;
}
else {
promise.setValue(async_simple::Unit{});
~timer_guard() {
if (self->enable_timeout_ && self->socket_->is_timeout_ == false) {
std::error_code ignore_ec;
self->timer_.cancel(ignore_ec);
}
}
return fut;
}
coro_http_client *self;
};

async_simple::coro::Lazy<std::error_code> wait_future(
async_simple::Future<async_simple::Unit> &&future) {
Expand All @@ -647,10 +654,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::error_code err_code;
timer_.cancel(err_code);
co_await std::move(future);
if (is_timeout_) {
if (socket_->is_timeout_) {
co_return std::make_error_code(std::errc::timed_out);
}

co_return std::error_code{};
}

Expand Down Expand Up @@ -682,18 +688,21 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
size_t size = 0;

if (socket_->has_closed_) {
auto future = start_timer(conn_timeout_duration_, "connect timer");

data = co_await connect(u);
if (ec = co_await wait_future(std::move(future)); ec) {
co_return resp_data{ec, 404};
{
auto time_out_guard =
timer_guard(this, conn_timeout_duration_, "connect timer");
data = co_await connect(u);
}
if (socket_->is_timeout_) {
co_return resp_data{std::make_error_code(std::errc::timed_out), 404};
}
if (data.net_err) {
co_return data;
}
}

auto future = start_timer(req_timeout_duration_, "upload timer");
auto time_out_guard =
timer_guard(this, conn_timeout_duration_, "request timer");
std::tie(ec, size) = co_await async_write(asio::buffer(header_str));
#ifdef INJECT_FOR_HTTP_CLIENT_TEST
if (inject_write_failed == ClientInjectAction::write_failed) {
Expand Down Expand Up @@ -729,10 +738,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
bool is_keep_alive = true;
data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx),
http_method::POST);
if (auto errc = co_await wait_future(std::move(future)); errc) {
ec = errc;
if (socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
}

handle_result(data, ec, is_keep_alive);
co_return data;
}
Expand Down Expand Up @@ -878,18 +886,20 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
size_t size = 0;

if (socket_->has_closed_) {
auto future = start_timer(conn_timeout_duration_, "connect timer");

data = co_await connect(u);
if (ec = co_await wait_future(std::move(future)); ec) {
co_return resp_data{ec, 404};
{
auto guard = timer_guard(this, conn_timeout_duration_, "connect timer");
data = co_await connect(u);
}
if (socket_->is_timeout_) {
co_return resp_data{std::make_error_code(std::errc::timed_out), 404};
}
if (data.net_err) {
co_return data;
}
}

auto future = start_timer(req_timeout_duration_, "upload timer");
auto time_guard =
timer_guard(this, conn_timeout_duration_, "request timer");
std::tie(ec, size) = co_await async_write(asio::buffer(header_str));
if (ec) {
co_return resp_data{ec, 404};
Expand Down Expand Up @@ -943,7 +953,6 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
}
}

if (ec && ec == asio::error::operation_aborted) {
ec = std::make_error_code(std::errc::timed_out);
co_return resp_data{ec, 404};
Expand All @@ -952,8 +961,8 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
bool is_keep_alive = true;
data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx),
http_method::POST);
if (auto errc = co_await wait_future(std::move(future)); errc) {
ec = errc;
if (socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
}

handle_result(data, ec, is_keep_alive);
Expand Down Expand Up @@ -1018,9 +1027,10 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
u.path = uri;
}
if (socket_->has_closed_) {
auto conn_future = start_timer(conn_timeout_duration_, "connect timer");
host_ = proxy_host_.empty() ? u.get_host() : proxy_host_;
port_ = proxy_port_.empty() ? u.get_port() : proxy_port_;
auto guard =
timer_guard(this, conn_timeout_duration_, "connect timer");
if (ec = co_await coro_io::async_connect(&executor_wrapper_,
socket_->impl_, host_, port_);
ec) {
Expand Down Expand Up @@ -1057,9 +1067,6 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
}
socket_->has_closed_ = false;
if (ec = co_await wait_future(std::move(conn_future)); ec) {
break;
}
}

std::vector<asio::const_buffer> vec;
Expand All @@ -1078,7 +1085,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
#ifdef CORO_HTTP_PRINT_REQ_HEAD
CINATRA_LOG_DEBUG << req_head_str;
#endif
auto future = start_timer(req_timeout_duration_, "request timer");
auto guard = timer_guard(this, req_timeout_duration_, "request timer");
if (has_body) {
std::tie(ec, size) = co_await async_write(vec);
}
Expand All @@ -1088,14 +1095,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
if (ec) {
break;
}

data =
co_await handle_read(ec, size, is_keep_alive, std::move(ctx), method);
if (auto errc = co_await wait_future(std::move(future)); errc) {
ec = errc;
}
} while (0);

if (socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
}
handle_result(data, ec, is_keep_alive);
co_return data;
}
Expand Down Expand Up @@ -1177,6 +1182,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
struct socket_t {
asio::ip::tcp::socket impl_;
std::atomic<bool> has_closed_ = true;
bool is_timeout_ = false;
asio::streambuf head_buf_;
asio::streambuf chunked_buf_;
#ifdef CINATRA_ENABLE_SSL
Expand Down Expand Up @@ -1958,17 +1964,19 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}

async_simple::coro::Lazy<bool> timeout(
auto &timer, auto promise, std::chrono::steady_clock::duration duration,
auto &timer, std::chrono::steady_clock::duration duration,
std::string msg) {
auto watcher=std::weak_ptr(socket_);
timer.expires_after(duration);
is_timeout_ = co_await timer.async_await();
if (!is_timeout_) {
promise.setValue(async_simple::Unit());
auto is_timeout = co_await timer.async_await();
if (!is_timeout) {
co_return false;
}
CINATRA_LOG_WARNING << msg << " timeout";
close_socket(*socket_);
promise.setValue(async_simple::Unit());
if (auto socket=watcher.lock();socket) {
socket_->is_timeout_ = true;
CINATRA_LOG_WARNING << msg << " timeout";
close_socket(*socket_);
}
co_return true;
}

Expand Down Expand Up @@ -2023,8 +2031,6 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
#endif
std::string redirect_uri_;
bool enable_follow_redirect_ = false;

bool is_timeout_ = false;
bool enable_timeout_ = false;
std::chrono::steady_clock::duration conn_timeout_duration_ =
std::chrono::seconds(8);
Expand Down

0 comments on commit ddfeb7f

Please sign in to comment.