Skip to content

Commit

Permalink
client/http: More coro conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
Tectu committed Nov 25, 2024
1 parent 6b77e64 commit 08c2dde
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 63 deletions.
73 changes: 31 additions & 42 deletions lib/malloy/client/http/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
#include <future>
#include <optional>



#include <iostream>

namespace malloy::client::http
{

Expand Down Expand Up @@ -46,7 +42,7 @@ namespace malloy::client::http
m_parser.body_limit(body_limit);
}

// Start the asynchronous operation
// This is a shim to call the new coroutine style from non-coro code. This should eventually be nuked.
void
run(
char const* port,
Expand Down Expand Up @@ -75,75 +71,60 @@ namespace malloy::client::http
Filter&& filter
)
{
namespace http = boost::beast::http;

auto executor = co_await boost::asio::this_coro::executor;
auto resolver = boost::asio::ip::tcp::resolver{ executor };
//auto stream = boost::beast::tcp_stream{ executor };

// Look up the domain name
auto resolver = boost::asio::ip::tcp::resolver{ executor };
auto const results = co_await resolver.async_resolve(req.base()[malloy::http::field::host], port);

// Set the timeout.
derived().stream().expires_after(std::chrono::seconds(30));

// Make the connection on the IP address we get from a lookup
co_await derived().stream().async_connect(results);
set_stream_timeout(std::chrono::seconds(30)); // ToDo: Don't hard-code
co_await boost::beast::get_lowest_layer(derived().stream()).async_connect(results);

// Call "connected" hook
co_await derived().hook_connected();

// Set the timeout.
derived().stream().expires_after(std::chrono::seconds(30));

// Send the HTTP request to the remote host
co_await http::async_write(derived().stream(), req);

// This buffer is used for reading and must be persisted
//boost::beast::flat_buffer buffer;

// Declare a container to hold the response
//http::response<http::dynamic_body> res;
set_stream_timeout(std::chrono::seconds(30)); // ToDo: Don't hard-code
co_await boost::beast::http::async_write(derived().stream(), req);

// Receive the HTTP response
//co_await http::async_read(derived().stream(), buffer, res);

#if 0
(void)cb;
(void)filter;

// Write the message to standard out
std::cout << res << std::endl;
#else
// ToDo
m_req_filter = std::move(filter);
//m_req = std::move(req);
m_cb.emplace(std::move(cb));

// Pick a body and parse it from the stream
// ToDo: Have a look at using boost::beast::http::response<boost::beast::http::dynamic_body> instead!
// This would probably involve something like:
//
// boost::beast::flat_buffer buffer;
// http::response<http::dynamic_body> res;
// co_await http::async_read(derived().stream(), buffer, res);
//
auto bodies = filter.body_for(m_parser.get().base());
co_await std::visit([this](auto&& body) -> boost::asio::awaitable<void> {
co_await std::visit(
[filter = std::move(filter), this](auto&& body) -> boost::asio::awaitable<void> {
using body_t = std::decay_t<decltype(body)>;

auto parser = std::make_shared<boost::beast::http::response_parser<body_t>>(std::move(m_parser));
m_req_filter.setup_body(parser->get().base(), parser->get().body());
filter.setup_body(parser->get().base(), parser->get().body());

boost::beast::flat_buffer buffer;

co_await boost::beast::http::async_read(
derived().stream(),
m_buffer,
buffer,
*parser,
boost::asio::use_awaitable);

(*m_cb)(malloy::http::response<body_t>{parser->release()});
},
std::move(bodies)
);
#endif

// Gracefully close the socket
boost::beast::error_code ec;
derived().stream().socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
set_stream_timeout(std::chrono::seconds(30)); // ToDo: Don't hard-code
// ToDo: This should be co_await too!
boost::beast::get_lowest_layer(derived().stream()).socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);

// not_connected happens sometimes
// so don't bother reporting it.
Expand All @@ -160,12 +141,20 @@ namespace malloy::client::http
protected:
std::shared_ptr<spdlog::logger> m_logger;

template<typename Rep, typename Period>
void
set_stream_timeout(const std::chrono::duration<Rep, Period> duration)
{
// ToDo: Which one is correct?!?!

//derived().stream().expires_after(duration);
boost::beast::get_lowest_layer(derived().stream()).expires_after(duration);
}

private:
boost::asio::io_context& m_io_ctx;
boost::beast::http::response_parser<boost::beast::http::empty_body> m_parser;
std::optional<callback_t> m_cb; // ToDo: Get rid of this, no longer required
Filter m_req_filter; // ToDo: Get rid of this, no longer required
boost::beast::flat_buffer m_buffer; // ToDo: Get rid of this, no longer required

[[nodiscard]]
constexpr
Expand Down
25 changes: 4 additions & 21 deletions lib/malloy/client/http/connection_tls.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,16 @@ namespace malloy::client::http
return m_stream;
}

void
// ToDo: Return error code!
boost::asio::awaitable<void>
hook_connected()
{
// Perform the TLS handshake
m_stream.async_handshake(
boost::asio::ssl::stream_base::client,
boost::beast::bind_front_handler(
&connection_tls::on_handshake,
this->shared_from_this()
)
);
parent_t::set_stream_timeout(std::chrono::seconds(3)); // ToDo: Do not hard-code!
co_await m_stream.async_handshake(boost::asio::ssl::stream_base::client);
}

private:
boost::beast::ssl_stream<malloy::tcp::stream<>> m_stream;

void
on_handshake(const boost::beast::error_code ec)
{
if (ec)
return parent_t::m_logger->error("on_handshake(): {}", ec.message());

// Set a timeout on the operation
boost::beast::get_lowest_layer(m_stream).expires_after(std::chrono::seconds(30));

// Send the HTTP request to the remote host
parent_t::send_request();
}
};
}

0 comments on commit 08c2dde

Please sign in to comment.