Skip to content

Commit

Permalink
fix and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos committed Nov 25, 2024
1 parent 762923d commit f461471
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 305 deletions.
34 changes: 34 additions & 0 deletions include/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,40 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
#endif
}

#ifdef INJECT_FOR_HTTP_CLIENT_TEST
async_simple::coro::Lazy<std::error_code> async_write_raw(
std::string_view data) {
auto [ec, _] = co_await async_write(asio::buffer(data));
co_return ec;
}

async_simple::coro::Lazy<resp_data> async_read_raw(
http_method method, bool clear_buffer = false) {
if (clear_buffer) {
body_.clear();
}

char buf[1024];
std::error_code ec{};
size_t size{};
#ifdef CINATRA_ENABLE_SSL
if (has_init_ssl_) {
std::tie(ec, size) = co_await coro_io::async_read_some(
*socket_->ssl_stream_, asio::buffer(buf, 1024));
}
else {
#endif
std::tie(ec, size) = co_await coro_io::async_read_some(
socket_->impl_, asio::buffer(buf, 1024));
#ifdef CINATRA_ENABLE_SSL
}
#endif
body_.append(buf, size);

co_return resp_data{ec, {}, {}, body_};
}
#endif

inline void set_proxy(const std::string &host, const std::string &port) {
proxy_host_ = host;
proxy_port_ = port;
Expand Down
95 changes: 21 additions & 74 deletions include/cinatra/coro_http_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,9 @@
#include "sha1.hpp"
#include "string_resize.hpp"
#include "websocket.hpp"
#include "ylt/metric/counter.hpp"
#include "ylt/metric/gauge.hpp"
#include "ylt/metric/histogram.hpp"
#include "ylt/metric/metric.hpp"
#ifdef CINATRA_ENABLE_GZIP
#include "gzip.hpp"
#endif
#include "metric_conf.hpp"
#include "ylt/coro_io/coro_file.hpp"
#include "ylt/coro_io/coro_io.hpp"

Expand All @@ -52,14 +47,9 @@ class coro_http_connection
request_(parser_, this),
response_(this) {
buffers_.reserve(3);

cinatra_metric_conf::server_total_fd_inc();
}

~coro_http_connection() {
cinatra_metric_conf::server_total_fd_dec();
close();
}
~coro_http_connection() { close(); }

#ifdef CINATRA_ENABLE_SSL
bool init_ssl(const std::string &cert_file, const std::string &key_file,
Expand Down Expand Up @@ -126,21 +116,17 @@ class coro_http_connection
CINATRA_LOG_WARNING << "read http header error: " << ec.message();
}

cinatra_metric_conf::server_failed_req_inc();
close();
break;
}

if (cinatra_metric_conf::enable_metric) {
start = std::chrono::system_clock::now();
cinatra_metric_conf::server_total_req_inc();
}

const char *data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
int head_len = parser_.parse_request(data_ptr, size, 0);
if (head_len <= 0) {
cinatra_metric_conf::server_failed_req_inc();
CINATRA_LOG_ERROR << "parse http header error";
response_.set_status_and_content(status_type::bad_request,
"invalid http protocol");
co_await reply();
close();
break;
}
Expand All @@ -153,9 +139,6 @@ class coro_http_connection
if (type != content_type::chunked && type != content_type::multipart) {
size_t body_len = parser_.body_len();
if (body_len == 0) {
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_recv_bytes_inc(head_len);
}
if (parser_.method() == "GET"sv) {
if (request_.is_upgrade()) {
#ifdef CINATRA_ENABLE_GZIP
Expand All @@ -175,16 +158,6 @@ class coro_http_connection
}
response_.set_delay(true);
}
else {
if (cinatra_metric_conf::enable_metric) {
mid = std::chrono::system_clock::now();
double count =
std::chrono::duration_cast<std::chrono::microseconds>(mid -
start)
.count();
cinatra_metric_conf::server_read_latency_observe(count);
}
}
}
}
else if (body_len <= head_buf_.size()) {
Expand All @@ -194,7 +167,6 @@ class coro_http_connection
memcpy(body_.data(), data_ptr, body_len);
head_buf_.consume(head_buf_.size());
}
cinatra_metric_conf::server_total_recv_bytes_inc(head_len + body_len);
}
else {
size_t part_size = head_buf_.size();
Expand All @@ -209,22 +181,9 @@ class coro_http_connection
size_to_read);
if (ec) {
CINATRA_LOG_ERROR << "async_read error: " << ec.message();
cinatra_metric_conf::server_failed_req_inc();
close();
break;
}
else {
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_recv_bytes_inc(head_len +
body_len);
mid = std::chrono::system_clock::now();
double count =
std::chrono::duration_cast<std::chrono::microseconds>(mid -
start)
.count();
cinatra_metric_conf::server_read_latency_observe(count);
}
}
}
}

Expand Down Expand Up @@ -358,37 +317,44 @@ class coro_http_connection

while (true) {
size_t left_size = head_buf_.size();
auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
std::string_view left_content{data_ptr, left_size};
auto next_data_ptr =
asio::buffer_cast<const char *>(head_buf_.data());
std::string_view left_content{next_data_ptr, left_size};
size_t pos = left_content.find(TWO_CRCF);
if (pos == std::string_view::npos) {
break;
}
http_parser parser;
int head_len = parser.parse_request(data_ptr, size, 0);
int head_len = parser.parse_request(next_data_ptr, left_size, 0);
if (head_len <= 0) {
CINATRA_LOG_ERROR << "parse http header error";
response_.set_status_and_content(status_type::bad_request,
"invalid http protocol");
co_await reply();
close();
break;
}

head_buf_.consume(pos + TWO_CRCF.length());

std::string_view key = {
parser_.method().data(),
parser_.method().length() + 1 + parser_.url().length()};
std::string_view next_key = {
parser.method().data(),
parser.method().length() + 1 + parser.url().length()};

coro_http_request req(parser, this);
coro_http_response resp(this);
resp.need_date_head(response_.need_date());
if (auto handler = router_.get_handler(key); handler) {
if (auto handler = router_.get_handler(next_key); handler) {
router_.route(handler, req, resp, key);
}
else {
if (auto coro_handler = router_.get_coro_handler(key);
if (auto coro_handler = router_.get_coro_handler(next_key);
coro_handler) {
co_await router_.route_coro(coro_handler, req, resp, key);
}
else {
resp.set_status(status_type::not_found);
}
}

resp.build_resp_str(resp_str_);
Expand All @@ -409,14 +375,6 @@ class coro_http_connection
}
}

if (cinatra_metric_conf::enable_metric) {
mid = std::chrono::system_clock::now();
double count =
std::chrono::duration_cast<std::chrono::microseconds>(mid - start)
.count();
cinatra_metric_conf::server_req_latency_observe(count);
}

response_.clear();
request_.clear();
buffers_.clear();
Expand All @@ -430,10 +388,6 @@ class coro_http_connection
}

async_simple::coro::Lazy<bool> reply(bool need_to_bufffer = true) {
if (response_.status() >= status_type::bad_request) {
if (cinatra_metric_conf::enable_metric)
cinatra_metric_conf::server_failed_req_inc();
}
std::error_code ec;
size_t size;
if (multi_buf_) {
Expand All @@ -444,18 +398,12 @@ class coro_http_connection
for (auto &buf : buffers_) {
send_size += buf.size();
}
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_send_bytes_inc(send_size);
}
std::tie(ec, size) = co_await async_write(buffers_);
}
else {
if (need_to_bufffer) {
response_.build_resp_str(resp_str_);
}
if (cinatra_metric_conf::enable_metric) {
cinatra_metric_conf::server_total_send_bytes_inc(resp_str_.size());
}
std::tie(ec, size) = co_await async_write(asio::buffer(resp_str_));
}

Expand Down Expand Up @@ -513,8 +461,6 @@ class coro_http_connection
co_return true;
}

bool sync_reply() { return async_simple::coro::syncAwait(reply()); }

async_simple::coro::Lazy<bool> begin_chunked() {
response_.set_delay(true);
response_.set_status(status_type::ok);
Expand Down Expand Up @@ -709,6 +655,7 @@ class coro_http_connection

switch (type) {
case cinatra::ws_frame_type::WS_ERROR_FRAME:
close();
result.ec = std::make_error_code(std::errc::protocol_error);
break;
case cinatra::ws_frame_type::WS_OPENING_FRAME:
Expand Down Expand Up @@ -790,7 +737,7 @@ class coro_http_connection
inflate_str_.clear();
if (!cinatra::gzip_codec::inflate({payload.data(), payload.size()},
inflate_str_)) {
CINATRA_LOG_ERROR << "uncompuress data error";
CINATRA_LOG_ERROR << "compress data error";
result.ec = std::make_error_code(std::errc::protocol_error);
return false;
}
Expand Down
68 changes: 11 additions & 57 deletions include/cinatra/coro_http_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "ylt/coro_io/coro_io.hpp"
#include "ylt/coro_io/io_context_pool.hpp"
#include "ylt/coro_io/load_blancer.hpp"
#include "ylt/metric/system_metric.hpp"

namespace cinatra {
enum class file_resp_format_type {
Expand Down Expand Up @@ -182,29 +181,6 @@ class coro_http_server {
}
}

void use_metrics(bool enable_json = false,
std::string url_path = "/metrics") {
init_metrics();
using root = ylt::metric::metric_collector_t<
ylt::metric::default_static_metric_manager,
ylt::metric::system_metric_manager>;
set_http_handler<http_method::GET>(
url_path,
[enable_json](coro_http_request &req, coro_http_response &res) {
std::string str;
#ifdef CINATRA_ENABLE_METRIC_JSON
if (enable_json) {
str = root::serialize_to_json();
res.set_content_type<resp_content_type::json>();
}
else
#endif
str = root::serialize();

res.set_status_and_content(status_type::ok, std::move(str));
});
}

template <http_method... method, typename... Aspects>
void set_http_proxy_handler(std::string url_path,
std::vector<std::string_view> hosts,
Expand Down Expand Up @@ -277,25 +253,29 @@ class coro_http_server {
break;
}

co_await load_blancer->send_request(
[&req, result](
coro_http_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
auto ret = co_await load_blancer->send_request(
[&req, result](coro_http_client &client, std::string_view host)
-> async_simple::coro::Lazy<std::error_code> {
auto r =
co_await client.write_websocket(std::string(result.data));
if (r.net_err) {
co_return;
co_return r.net_err;
}
auto data = co_await client.read_websocket();
if (data.net_err) {
co_return;
co_return data.net_err;
}
auto ec = co_await req.get_conn()->write_websocket(
std::string(result.data));
if (ec) {
co_return;
co_return ec;
}
co_return std::error_code{};
});
if (!ret.has_value()) {
req.get_conn()->close();
break;
}
}
},
std::forward<Aspects>(aspects)...);
Expand Down Expand Up @@ -932,32 +912,6 @@ class coro_http_server {
address_ = std::move(address);
}

private:
void init_metrics() {
using namespace ylt::metric;

cinatra_metric_conf::enable_metric = true;
default_static_metric_manager::instance().create_metric_static<counter_t>(
cinatra_metric_conf::server_total_req, "");
default_static_metric_manager::instance().create_metric_static<counter_t>(
cinatra_metric_conf::server_failed_req, "");
default_static_metric_manager::instance().create_metric_static<counter_t>(
cinatra_metric_conf::server_total_recv_bytes, "");
default_static_metric_manager::instance().create_metric_static<counter_t>(
cinatra_metric_conf::server_total_send_bytes, "");
default_static_metric_manager::instance().create_metric_static<gauge_t>(
cinatra_metric_conf::server_total_fd, "");
default_static_metric_manager::instance().create_metric_static<histogram_t>(
cinatra_metric_conf::server_req_latency, "",
std::vector<double>{30, 40, 50, 60, 70, 80, 90, 100, 150});
default_static_metric_manager::instance().create_metric_static<histogram_t>(
cinatra_metric_conf::server_read_latency, "",
std::vector<double>{3, 5, 7, 9, 13, 18, 23, 35, 50});
#if defined(__GNUC__)
ylt::metric::start_system_metric();
#endif
}

private:
std::unique_ptr<coro_io::io_context_pool> pool_;
asio::io_context *out_ctx_ = nullptr;
Expand Down
Loading

0 comments on commit f461471

Please sign in to comment.