From e544b7fd7a634d902912c7d62ef64d827b87cacc Mon Sep 17 00:00:00 2001 From: Pau Puerta Date: Mon, 25 Jul 2022 11:30:35 +0200 Subject: [PATCH 1/2] Add optional keep-alive mechanism to WS connections --- include/crow/routing.h | 13 +++++++++++-- include/crow/websocket.h | 31 ++++++++++++++++++++++++++++++- tests/unittest.cpp | 15 +++++++++++++++ 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/include/crow/routing.h b/include/crow/routing.h index c01b56705..542157912 100644 --- a/include/crow/routing.h +++ b/include/crow/routing.h @@ -451,12 +451,12 @@ namespace crow void handle_upgrade(const request& req, response&, SocketAdaptor&& adaptor) override { max_payload_ = max_payload_override_ ? max_payload_ : app_->websocket_max_payload(); - new crow::websocket::Connection(req, std::move(adaptor), app_, max_payload_, open_handler_, message_handler_, close_handler_, error_handler_, accept_handler_); + new crow::websocket::Connection(req, std::move(adaptor), app_, max_payload_, open_handler_, message_handler_, close_handler_, error_handler_, timeout_handler_, accept_handler_); } #ifdef CROW_ENABLE_SSL void handle_upgrade(const request& req, response&, SSLAdaptor&& adaptor) override { - new crow::websocket::Connection(req, std::move(adaptor), app_, max_payload_, open_handler_, message_handler_, close_handler_, error_handler_, accept_handler_); + new crow::websocket::Connection(req, std::move(adaptor), app_, max_payload_, open_handler_, message_handler_, close_handler_, error_handler_, timeout_handler_, accept_handler_); } #endif @@ -496,6 +496,14 @@ namespace crow return *this; } + template + self_t& ontimeout(Func f, uint64_t timeout_in_seconds = 5) + { + timeout_handler_.first = f; + timeout_handler_.second = timeout_in_seconds; + return *this; + } + template self_t& onaccept(Func f) { @@ -509,6 +517,7 @@ namespace crow std::function message_handler_; std::function close_handler_; std::function error_handler_; + std::pair, uint64_t> timeout_handler_; std::function accept_handler_; uint64_t max_payload_; bool max_payload_override_ = false; diff --git a/include/crow/websocket.h b/include/crow/websocket.h index f961055a4..63ceccf3a 100644 --- a/include/crow/websocket.h +++ b/include/crow/websocket.h @@ -76,6 +76,7 @@ namespace crow std::function message_handler, std::function close_handler, std::function error_handler, + std::pair, uint64_t> receiver_timeout_handler, std::function accept_handler): adaptor_(std::move(adaptor)), handler_(handler), @@ -84,7 +85,9 @@ namespace crow message_handler_(std::move(message_handler)), close_handler_(std::move(close_handler)), error_handler_(std::move(error_handler)), - accept_handler_(std::move(accept_handler)) + timeout_handler_(std::move(receiver_timeout_handler)), + accept_handler_(std::move(accept_handler)), + task_timer_(adaptor_.get_io_service()) { if (!utility::string_equals(req.get_header_value("upgrade"), "websocket")) { @@ -258,6 +261,26 @@ namespace crow do_read(); } + void start_deadline(/*int timeout = 5*/) + { + cancel_deadline_timer(); + + if (close_connection_ || !timeout_handler_.first) return; + + task_timer_.set_default_timeout(timeout_handler_.second); + task_id_ = task_timer_.schedule([this] { + timeout_handler_.first(*this, "timeout"); + }); + CROW_LOG_DEBUG << this << " websocket timer added: " << &task_timer_ << ' ' << task_id_; + } + + void cancel_deadline_timer() + { + if (!timeout_handler_.first) return; + CROW_LOG_DEBUG << this << " websocket timer cancelled: " << &task_timer_ << ' ' << task_id_; + task_timer_.cancel(task_id_); + } + /// Read a websocket message. /// @@ -276,6 +299,8 @@ namespace crow return; } + start_deadline(); + is_reading = true; switch (state_) { @@ -686,7 +711,11 @@ namespace crow std::function message_handler_; std::function close_handler_; std::function error_handler_; + std::pair,uint64_t> timeout_handler_; std::function accept_handler_; + + detail::task_timer task_timer_; + detail::task_timer::identifier_type task_id_; }; } // namespace websocket } // namespace crow diff --git a/tests/unittest.cpp b/tests/unittest.cpp index 65ceb503c..b63fe22b4 100644 --- a/tests/unittest.cpp +++ b/tests/unittest.cpp @@ -2496,6 +2496,10 @@ TEST_CASE("websocket") else if (isbin && message == "Hello bin") conn.send_binary("Hello back bin"); }) + .ontimeout([&](websocket::connection& conn, const std::string&) { + CROW_LOG_INFO << "Websocket Time Out"; + conn.send_text("TimeOut"); + }, 2 /* seconds */) .onclose([&](websocket::connection&, const std::string&) { CROW_LOG_INFO << "Closing websocket"; }); @@ -2619,6 +2623,17 @@ TEST_CASE("websocket") std::string checkstring(std::string(buf).substr(0, 12)); CHECK(checkstring == "\x81\x0AHello back"); } + + //----------TimeOut---------- + { + std::fill_n(buf, 2048, 0); + CROW_LOG_INFO << "Waiting Time Out"; + c.receive(asio::buffer(buf, 2048)); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + std::string checkstring(std::string(buf).substr(0, 10)); + CHECK(checkstring == "\x81\x07TimeOut"); + } + //----------Close---------- { std::fill_n(buf, 2048, 0); From 4b0874f7903ae32e2bfdbb95c658b2906ee848cf Mon Sep 17 00:00:00 2001 From: Pau Puerta Date: Mon, 25 Jul 2022 13:13:44 +0200 Subject: [PATCH 2/2] formatting --- include/crow/websocket.h | 4 ++-- tests/unittest.cpp | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/include/crow/websocket.h b/include/crow/websocket.h index 63ceccf3a..073203dfc 100644 --- a/include/crow/websocket.h +++ b/include/crow/websocket.h @@ -261,7 +261,7 @@ namespace crow do_read(); } - void start_deadline(/*int timeout = 5*/) + void start_deadline(/*int timeout = 5*/) { cancel_deadline_timer(); @@ -711,7 +711,7 @@ namespace crow std::function message_handler_; std::function close_handler_; std::function error_handler_; - std::pair,uint64_t> timeout_handler_; + std::pair, uint64_t> timeout_handler_; std::function accept_handler_; detail::task_timer task_timer_; diff --git a/tests/unittest.cpp b/tests/unittest.cpp index b63fe22b4..5d8620717 100644 --- a/tests/unittest.cpp +++ b/tests/unittest.cpp @@ -2499,7 +2499,8 @@ TEST_CASE("websocket") .ontimeout([&](websocket::connection& conn, const std::string&) { CROW_LOG_INFO << "Websocket Time Out"; conn.send_text("TimeOut"); - }, 2 /* seconds */) + }, + 2 /* seconds */) .onclose([&](websocket::connection&, const std::string&) { CROW_LOG_INFO << "Closing websocket"; });