Skip to content

Commit

Permalink
client/http: More coro conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
Tectu committed Nov 25, 2024
1 parent 6b77e64 commit bdb3b2f
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 96 deletions.
14 changes: 8 additions & 6 deletions lib/malloy/client/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,14 @@ namespace malloy::client
req.set(malloy::http::field::user_agent, m_cfg.user_agent);

// Run
conn->run(
std::to_string(req.port()).c_str(),
req,
std::move(prom),
std::forward<Callback>(cb),
std::forward<Filter>(filter));
boost::asio::co_spawn(
*m_ioc,
conn->run(std::move(req), std::move(prom), std::forward<Callback>(cb), std::forward<Filter>(filter)),
[conn](std::exception_ptr e) { // ToDo: Do we need to capture conn to keep it alive here?!
if (e)
std::rethrow_exception(e);
}
);
});

return err_channel;
Expand Down
105 changes: 36 additions & 69 deletions lib/malloy/client/http/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
#include <future>
#include <optional>



#include <iostream>

namespace malloy::client::http
{

Expand All @@ -31,9 +27,6 @@ namespace malloy::client::http
class connection
{
public:
using resp_t = typename Filter::response_type;
using callback_t = Callback;

connection(std::shared_ptr<spdlog::logger> logger, boost::asio::io_context& io_ctx, const std::uint64_t body_limit) :
m_logger(std::move(logger)),
m_io_ctx(io_ctx)
Expand All @@ -46,104 +39,70 @@ namespace malloy::client::http
m_parser.body_limit(body_limit);
}

// Start the asynchronous operation
void
run(
char const* port,
malloy::http::request<ReqBody> req,
std::promise<malloy::error_code> err_channel,
callback_t&& cb,
Filter&& filter
)
{
boost::asio::co_spawn(
m_io_ctx,
run_coro(std::string(port), std::move(req), std::move(err_channel), std::move(cb), std::move(filter)),
[me = derived().shared_from_this()](std::exception_ptr e) {
if (e)
std::rethrow_exception(e);
}
);
}

// ToDo: Remove port parameter, we can get that from the req
// ToDo: Return something like std::expected<http::response, error_code> instead
boost::asio::awaitable<void>
run_coro(
std::string port, // ToDo
run(
malloy::http::request<ReqBody> req,
std::promise<malloy::error_code> err_channel,
callback_t&& cb,
Callback&& cb,
Filter&& filter
)
{
namespace http = boost::beast::http;

auto executor = co_await boost::asio::this_coro::executor;
auto resolver = boost::asio::ip::tcp::resolver{ executor };
//auto stream = boost::beast::tcp_stream{ executor };

// Look up the domain name
auto const results = co_await resolver.async_resolve(req.base()[malloy::http::field::host], port);

// Set the timeout.
derived().stream().expires_after(std::chrono::seconds(30));
auto resolver = boost::asio::ip::tcp::resolver{ executor };
auto const results = co_await resolver.async_resolve(req.base()[malloy::http::field::host], std::to_string(req.port()));

// Make the connection on the IP address we get from a lookup
co_await derived().stream().async_connect(results);
set_stream_timeout(std::chrono::seconds(30)); // ToDo: Don't hard-code
co_await boost::beast::get_lowest_layer(derived().stream()).async_connect(results);

// Call "connected" hook
co_await derived().hook_connected();

// Set the timeout.
derived().stream().expires_after(std::chrono::seconds(30));

// Send the HTTP request to the remote host
co_await http::async_write(derived().stream(), req);

// This buffer is used for reading and must be persisted
//boost::beast::flat_buffer buffer;

// Declare a container to hold the response
//http::response<http::dynamic_body> res;

// Receive the HTTP response
//co_await http::async_read(derived().stream(), buffer, res);

#if 0
(void)cb;
(void)filter;
set_stream_timeout(std::chrono::seconds(30)); // ToDo: Don't hard-code
co_await boost::beast::http::async_write(derived().stream(), req);

// Write the message to standard out
std::cout << res << std::endl;
#else
// ToDo
m_req_filter = std::move(filter);
//m_req = std::move(req);
m_cb.emplace(std::move(cb));

// Pick a body and parse it from the stream
// ToDo: Have a look at using boost::beast::http::response<boost::beast::http::dynamic_body> instead!
// This would probably involve something like:
//
// boost::beast::flat_buffer buffer;
// http::response<http::dynamic_body> res;
// co_await http::async_read(derived().stream(), buffer, res);
//
auto bodies = filter.body_for(m_parser.get().base());
co_await std::visit([this](auto&& body) -> boost::asio::awaitable<void> {
co_await std::visit(
[filter = std::move(filter), this](auto&& body) -> boost::asio::awaitable<void> {
using body_t = std::decay_t<decltype(body)>;

auto parser = std::make_shared<boost::beast::http::response_parser<body_t>>(std::move(m_parser));
m_req_filter.setup_body(parser->get().base(), parser->get().body());
filter.setup_body(parser->get().base(), parser->get().body());

boost::beast::flat_buffer buffer;

co_await boost::beast::http::async_read(
derived().stream(),
m_buffer,
buffer,
*parser,
boost::asio::use_awaitable);

(*m_cb)(malloy::http::response<body_t>{parser->release()});
},
std::move(bodies)
);
#endif

// Gracefully close the socket
boost::beast::error_code ec;
derived().stream().socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
set_stream_timeout(std::chrono::seconds(30)); // ToDo: Don't hard-code
// ToDo: This should be co_await too!
boost::beast::get_lowest_layer(derived().stream()).socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);

// not_connected happens sometimes
// so don't bother reporting it.
Expand All @@ -160,12 +119,20 @@ namespace malloy::client::http
protected:
std::shared_ptr<spdlog::logger> m_logger;

template<typename Rep, typename Period>
void
set_stream_timeout(const std::chrono::duration<Rep, Period> duration)
{
// ToDo: Which one is correct?!?!

//derived().stream().expires_after(duration);
boost::beast::get_lowest_layer(derived().stream()).expires_after(duration);
}

private:
boost::asio::io_context& m_io_ctx;
boost::beast::http::response_parser<boost::beast::http::empty_body> m_parser;
std::optional<callback_t> m_cb; // ToDo: Get rid of this, no longer required
Filter m_req_filter; // ToDo: Get rid of this, no longer required
boost::beast::flat_buffer m_buffer; // ToDo: Get rid of this, no longer required
std::optional<Callback> m_cb; // ToDo: Get rid of this, no longer required

[[nodiscard]]
constexpr
Expand Down
25 changes: 4 additions & 21 deletions lib/malloy/client/http/connection_tls.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,16 @@ namespace malloy::client::http
return m_stream;
}

void
// ToDo: Return error code!
boost::asio::awaitable<void>
hook_connected()
{
// Perform the TLS handshake
m_stream.async_handshake(
boost::asio::ssl::stream_base::client,
boost::beast::bind_front_handler(
&connection_tls::on_handshake,
this->shared_from_this()
)
);
parent_t::set_stream_timeout(std::chrono::seconds(3)); // ToDo: Do not hard-code!
co_await m_stream.async_handshake(boost::asio::ssl::stream_base::client);
}

private:
boost::beast::ssl_stream<malloy::tcp::stream<>> m_stream;

void
on_handshake(const boost::beast::error_code ec)
{
if (ec)
return parent_t::m_logger->error("on_handshake(): {}", ec.message());

// Set a timeout on the operation
boost::beast::get_lowest_layer(m_stream).expires_after(std::chrono::seconds(30));

// Send the HTTP request to the remote host
parent_t::send_request();
}
};
}

0 comments on commit bdb3b2f

Please sign in to comment.