From ddfeb7fc7700cdccc804705131212599c79e4f71 Mon Sep 17 00:00:00 2001 From: poor-circle Date: Fri, 12 Apr 2024 16:25:47 +0800 Subject: [PATCH] fix timeout bug --- include/cinatra/coro_http_client.hpp | 124 ++++++++++++++------------- 1 file changed, 65 insertions(+), 59 deletions(-) diff --git a/include/cinatra/coro_http_client.hpp b/include/cinatra/coro_http_client.hpp index ad8a3bc3..29657f39 100644 --- a/include/cinatra/coro_http_client.hpp +++ b/include/cinatra/coro_http_client.hpp @@ -197,6 +197,8 @@ class coro_http_client : public std::enable_shared_from_this { }); } + coro_io::ExecutorWrapper<> &get_executor() { return executor_wrapper_; } + #ifdef CINATRA_ENABLE_SSL bool init_ssl(int verify_mode, const std::string &base_path, const std::string &cert_file, const std::string &sni_hostname) { @@ -284,12 +286,13 @@ class coro_http_client : public std::enable_shared_from_this { if (!ok) { co_return resp_data{std::make_error_code(std::errc::protocol_error), 404}; } - - auto future = start_timer(conn_timeout_duration_, "connect timer"); - - data = co_await connect(u); - if (auto ec = co_await wait_future(std::move(future)); ec) { - co_return resp_data{ec, 404}; + { + auto time_out_guard = + timer_guard(this, conn_timeout_duration_, "connect timer"); + data = co_await connect(u); + } + if (socket_->is_timeout_) { + co_return resp_data{std::make_error_code(std::errc::timed_out), 404}; } if (!data.net_err) { data.status = 200; @@ -621,23 +624,27 @@ class coro_http_client : public std::enable_shared_from_this { void set_max_single_part_size(size_t size) { max_single_part_size_ = size; } - async_simple::Future start_timer( - std::chrono::steady_clock::duration duration, std::string msg) { - is_timeout_ = false; - - async_simple::Promise promise; - auto fut = promise.getFuture(); + struct timer_guard { + timer_guard(coro_http_client *self, + std::chrono::steady_clock::duration duration, std::string msg) + : self(self) { + self->socket_->is_timeout_ = false; - if (enable_timeout_) { - timeout(timer_, std::move(promise), duration, std::move(msg)) - .via(&executor_wrapper_) - .detach(); + if (self->enable_timeout_) { + self->timeout(self->timer_, duration, std::move(msg)) + .start([](auto &&) { + }); + } + return; } - else { - promise.setValue(async_simple::Unit{}); + ~timer_guard() { + if (self->enable_timeout_ && self->socket_->is_timeout_ == false) { + std::error_code ignore_ec; + self->timer_.cancel(ignore_ec); + } } - return fut; - } + coro_http_client *self; + }; async_simple::coro::Lazy wait_future( async_simple::Future &&future) { @@ -647,10 +654,9 @@ class coro_http_client : public std::enable_shared_from_this { std::error_code err_code; timer_.cancel(err_code); co_await std::move(future); - if (is_timeout_) { + if (socket_->is_timeout_) { co_return std::make_error_code(std::errc::timed_out); } - co_return std::error_code{}; } @@ -682,18 +688,21 @@ class coro_http_client : public std::enable_shared_from_this { size_t size = 0; if (socket_->has_closed_) { - auto future = start_timer(conn_timeout_duration_, "connect timer"); - - data = co_await connect(u); - if (ec = co_await wait_future(std::move(future)); ec) { - co_return resp_data{ec, 404}; + { + auto time_out_guard = + timer_guard(this, conn_timeout_duration_, "connect timer"); + data = co_await connect(u); + } + if (socket_->is_timeout_) { + co_return resp_data{std::make_error_code(std::errc::timed_out), 404}; } if (data.net_err) { co_return data; } } - auto future = start_timer(req_timeout_duration_, "upload timer"); + auto time_out_guard = + timer_guard(this, conn_timeout_duration_, "request timer"); std::tie(ec, size) = co_await async_write(asio::buffer(header_str)); #ifdef INJECT_FOR_HTTP_CLIENT_TEST if (inject_write_failed == ClientInjectAction::write_failed) { @@ -729,10 +738,9 @@ class coro_http_client : public std::enable_shared_from_this { bool is_keep_alive = true; data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx), http_method::POST); - if (auto errc = co_await wait_future(std::move(future)); errc) { - ec = errc; + if (socket_->is_timeout_) { + ec = std::make_error_code(std::errc::timed_out); } - handle_result(data, ec, is_keep_alive); co_return data; } @@ -878,18 +886,20 @@ class coro_http_client : public std::enable_shared_from_this { size_t size = 0; if (socket_->has_closed_) { - auto future = start_timer(conn_timeout_duration_, "connect timer"); - - data = co_await connect(u); - if (ec = co_await wait_future(std::move(future)); ec) { - co_return resp_data{ec, 404}; + { + auto guard = timer_guard(this, conn_timeout_duration_, "connect timer"); + data = co_await connect(u); + } + if (socket_->is_timeout_) { + co_return resp_data{std::make_error_code(std::errc::timed_out), 404}; } if (data.net_err) { co_return data; } } - auto future = start_timer(req_timeout_duration_, "upload timer"); + auto time_guard = + timer_guard(this, conn_timeout_duration_, "request timer"); std::tie(ec, size) = co_await async_write(asio::buffer(header_str)); if (ec) { co_return resp_data{ec, 404}; @@ -943,7 +953,6 @@ class coro_http_client : public std::enable_shared_from_this { } } } - if (ec && ec == asio::error::operation_aborted) { ec = std::make_error_code(std::errc::timed_out); co_return resp_data{ec, 404}; @@ -952,8 +961,8 @@ class coro_http_client : public std::enable_shared_from_this { bool is_keep_alive = true; data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx), http_method::POST); - if (auto errc = co_await wait_future(std::move(future)); errc) { - ec = errc; + if (socket_->is_timeout_) { + ec = std::make_error_code(std::errc::timed_out); } handle_result(data, ec, is_keep_alive); @@ -1018,9 +1027,10 @@ class coro_http_client : public std::enable_shared_from_this { u.path = uri; } if (socket_->has_closed_) { - auto conn_future = start_timer(conn_timeout_duration_, "connect timer"); host_ = proxy_host_.empty() ? u.get_host() : proxy_host_; port_ = proxy_port_.empty() ? u.get_port() : proxy_port_; + auto guard = + timer_guard(this, conn_timeout_duration_, "connect timer"); if (ec = co_await coro_io::async_connect(&executor_wrapper_, socket_->impl_, host_, port_); ec) { @@ -1057,9 +1067,6 @@ class coro_http_client : public std::enable_shared_from_this { } } socket_->has_closed_ = false; - if (ec = co_await wait_future(std::move(conn_future)); ec) { - break; - } } std::vector vec; @@ -1078,7 +1085,7 @@ class coro_http_client : public std::enable_shared_from_this { #ifdef CORO_HTTP_PRINT_REQ_HEAD CINATRA_LOG_DEBUG << req_head_str; #endif - auto future = start_timer(req_timeout_duration_, "request timer"); + auto guard = timer_guard(this, req_timeout_duration_, "request timer"); if (has_body) { std::tie(ec, size) = co_await async_write(vec); } @@ -1088,14 +1095,12 @@ class coro_http_client : public std::enable_shared_from_this { if (ec) { break; } - data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx), method); - if (auto errc = co_await wait_future(std::move(future)); errc) { - ec = errc; - } } while (0); - + if (socket_->is_timeout_) { + ec = std::make_error_code(std::errc::timed_out); + } handle_result(data, ec, is_keep_alive); co_return data; } @@ -1177,6 +1182,7 @@ class coro_http_client : public std::enable_shared_from_this { struct socket_t { asio::ip::tcp::socket impl_; std::atomic has_closed_ = true; + bool is_timeout_ = false; asio::streambuf head_buf_; asio::streambuf chunked_buf_; #ifdef CINATRA_ENABLE_SSL @@ -1958,17 +1964,19 @@ class coro_http_client : public std::enable_shared_from_this { } async_simple::coro::Lazy timeout( - auto &timer, auto promise, std::chrono::steady_clock::duration duration, + auto &timer, std::chrono::steady_clock::duration duration, std::string msg) { + auto watcher=std::weak_ptr(socket_); timer.expires_after(duration); - is_timeout_ = co_await timer.async_await(); - if (!is_timeout_) { - promise.setValue(async_simple::Unit()); + auto is_timeout = co_await timer.async_await(); + if (!is_timeout) { co_return false; } - CINATRA_LOG_WARNING << msg << " timeout"; - close_socket(*socket_); - promise.setValue(async_simple::Unit()); + if (auto socket=watcher.lock();socket) { + socket_->is_timeout_ = true; + CINATRA_LOG_WARNING << msg << " timeout"; + close_socket(*socket_); + } co_return true; } @@ -2023,8 +2031,6 @@ class coro_http_client : public std::enable_shared_from_this { #endif std::string redirect_uri_; bool enable_follow_redirect_ = false; - - bool is_timeout_ = false; bool enable_timeout_ = false; std::chrono::steady_clock::duration conn_timeout_duration_ = std::chrono::seconds(8);