From 9fe9e7c9d2ae04fd1cfcdea152798dc63c685cb8 Mon Sep 17 00:00:00 2001 From: Sergey Kuznetsov Date: Thu, 5 Sep 2024 14:58:06 +0100 Subject: [PATCH] fix: Subscription source bugs fix (#1626) For #1620. - Add timeouts for websocket operations for connections to rippled. Without these timeouts if connection hangs for some reason, clio wouldn't know the connection is hanging. - Fix potential data race in choosing new subscription source which will forward messages to users. - Optimise switching between subscription sources. --- src/etl/LoadBalancer.cpp | 14 ++- src/etl/LoadBalancer.hpp | 7 +- src/etl/Source.hpp | 2 +- src/etl/impl/SubscriptionSource.cpp | 35 +++--- src/etl/impl/SubscriptionSource.hpp | 14 ++- src/util/requests/impl/WsConnectionImpl.hpp | 11 +- tests/unit/etl/LoadBalancerTests.cpp | 34 ++---- tests/unit/etl/SubscriptionSourceTests.cpp | 114 ++++++++++++++------ 8 files changed, 147 insertions(+), 84 deletions(-) diff --git a/src/etl/LoadBalancer.cpp b/src/etl/LoadBalancer.cpp index 4af0ef821..d3c287cb6 100644 --- a/src/etl/LoadBalancer.cpp +++ b/src/etl/LoadBalancer.cpp @@ -109,10 +109,13 @@ LoadBalancer::LoadBalancer( validatedLedgers, forwardingTimeout, [this]() { - if (not hasForwardingSource_) + if (not hasForwardingSource_.lock().get()) + chooseForwardingSource(); + }, + [this](bool wasForwarding) { + if (wasForwarding) chooseForwardingSource(); }, - [this]() { chooseForwardingSource(); }, [this]() { if (forwardingCache_.has_value()) forwardingCache_->invalidate(); @@ -315,11 +318,12 @@ void LoadBalancer::chooseForwardingSource() { LOG(log_.info()) << "Choosing a new source to forward subscriptions"; - hasForwardingSource_ = false; + auto hasForwardingSourceLock = hasForwardingSource_.lock(); + hasForwardingSourceLock.get() = false; for (auto& source : sources_) { - if (not hasForwardingSource_ and source->isConnected()) { + if (not hasForwardingSourceLock.get() and source->isConnected()) { source->setForwarding(true); - hasForwardingSource_ = true; + hasForwardingSourceLock.get() = true; } else { source->setForwarding(false); } diff --git a/src/etl/LoadBalancer.hpp b/src/etl/LoadBalancer.hpp index 084f55a91..27f5ecdc7 100644 --- a/src/etl/LoadBalancer.hpp +++ b/src/etl/LoadBalancer.hpp @@ -25,6 +25,7 @@ #include "etl/Source.hpp" #include "etl/impl/ForwardingCache.hpp" #include "feed/SubscriptionManagerInterface.hpp" +#include "util/Mutex.hpp" #include "util/config/Config.hpp" #include "util/log/Logger.hpp" @@ -38,7 +39,6 @@ #include #include -#include #include #include #include @@ -74,7 +74,10 @@ class LoadBalancer { std::optional etlState_; std::uint32_t downloadRanges_ = DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */ - std::atomic_bool hasForwardingSource_{false}; + + // Using mutext instead of atomic_bool because choosing a new source to + // forward messages should be done with a mutual exclusion otherwise there will be a race condition + util::Mutex hasForwardingSource_{false}; public: /** diff --git a/src/etl/Source.hpp b/src/etl/Source.hpp index 70c637b1c..6171ff543 100644 --- a/src/etl/Source.hpp +++ b/src/etl/Source.hpp @@ -51,7 +51,7 @@ namespace etl { class SourceBase { public: using OnConnectHook = std::function; - using OnDisconnectHook = std::function; + using OnDisconnectHook = std::function; using OnLedgerClosedHook = std::function; virtual ~SourceBase() = default; diff --git a/src/etl/impl/SubscriptionSource.cpp b/src/etl/impl/SubscriptionSource.cpp index d0e76f875..c633cbed5 100644 --- a/src/etl/impl/SubscriptionSource.cpp +++ b/src/etl/impl/SubscriptionSource.cpp @@ -24,6 +24,8 @@ #include "rpc/JS.hpp" #include "util/Retry.hpp" #include "util/log/Logger.hpp" +#include "util/prometheus/Label.hpp" +#include "util/prometheus/Prometheus.hpp" #include "util/requests/Types.hpp" #include @@ -66,7 +68,7 @@ SubscriptionSource::SubscriptionSource( OnConnectHook onConnect, OnDisconnectHook onDisconnect, OnLedgerClosedHook onLedgerClosed, - std::chrono::steady_clock::duration const connectionTimeout, + std::chrono::steady_clock::duration const wsTimeout, std::chrono::steady_clock::duration const retryDelay ) : log_(fmt::format("SubscriptionSource[{}:{}]", ip, wsPort)) @@ -74,14 +76,20 @@ SubscriptionSource::SubscriptionSource( , validatedLedgers_(std::move(validatedLedgers)) , subscriptions_(std::move(subscriptions)) , strand_(boost::asio::make_strand(ioContext)) + , wsTimeout_(wsTimeout) , retry_(util::makeRetryExponentialBackoff(retryDelay, RETRY_MAX_DELAY, strand_)) , onConnect_(std::move(onConnect)) , onDisconnect_(std::move(onDisconnect)) , onLedgerClosed_(std::move(onLedgerClosed)) + , lastMessageTimeSecondsSinceEpoch_(PrometheusService::gaugeInt( + "subscription_source_last_message_time", + util::prometheus::Labels({{"source", fmt::format("{}:{}", ip, wsPort)}}), + "Seconds since epoch of the last message received from rippled subscription streams" + )) { wsConnectionBuilder_.addHeader({boost::beast::http::field::user_agent, "clio-client"}) .addHeader({"X-User", "clio-client"}) - .setConnectionTimeout(connectionTimeout); + .setConnectionTimeout(wsTimeout_); } SubscriptionSource::~SubscriptionSource() @@ -167,21 +175,22 @@ SubscriptionSource::subscribe() } wsConnection_ = std::move(connection).value(); - isConnected_ = true; - onConnect_(); - LOG(log_.info()) << "Connected"; auto const& subscribeCommand = getSubscribeCommandJson(); - auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield); + auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield, wsTimeout_); if (writeErrorOpt) { handleError(writeErrorOpt.value(), yield); return; } + isConnected_ = true; + LOG(log_.info()) << "Connected"; + onConnect_(); + retry_.reset(); while (!stop_) { - auto const message = wsConnection_->read(yield); + auto const message = wsConnection_->read(yield, wsTimeout_); if (not message) { handleError(message.error(), yield); return; @@ -256,8 +265,6 @@ SubscriptionSource::handleMessage(std::string const& message) } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) { LOG(log_.debug()) << "Forwarding manifest: " << object; subscriptions_->forwardManifest(object); - } else { - LOG(log_.error()) << "Unknown message: " << object; } } } @@ -278,10 +285,10 @@ void SubscriptionSource::handleError(util::requests::RequestError const& error, boost::asio::yield_context yield) { isConnected_ = false; - isForwarding_ = false; + bool const wasForwarding = isForwarding_.exchange(false); if (not stop_) { - onDisconnect_(); LOG(log_.info()) << "Disconnected"; + onDisconnect_(wasForwarding); } if (wsConnection_ != nullptr) { @@ -312,7 +319,11 @@ SubscriptionSource::logError(util::requests::RequestError const& error) const void SubscriptionSource::setLastMessageTime() { - lastMessageTime_.lock().get() = std::chrono::steady_clock::now(); + lastMessageTimeSecondsSinceEpoch_.get().set( + std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count() + ); + auto lock = lastMessageTime_.lock(); + lock.get() = std::chrono::steady_clock::now(); } void diff --git a/src/etl/impl/SubscriptionSource.hpp b/src/etl/impl/SubscriptionSource.hpp index ce9799e3e..cd7f5dbe7 100644 --- a/src/etl/impl/SubscriptionSource.hpp +++ b/src/etl/impl/SubscriptionSource.hpp @@ -19,12 +19,13 @@ #pragma once -#include "etl/ETLHelpers.hpp" +#include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/Source.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "util/Mutex.hpp" #include "util/Retry.hpp" #include "util/log/Logger.hpp" +#include "util/prometheus/Gauge.hpp" #include "util/requests/Types.hpp" #include "util/requests/WsConnection.hpp" @@ -37,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -71,6 +73,8 @@ class SubscriptionSource { boost::asio::strand strand_; + std::chrono::steady_clock::duration wsTimeout_; + util::Retry retry_; OnConnectHook onConnect_; @@ -83,9 +87,11 @@ class SubscriptionSource { util::Mutex lastMessageTime_; + std::reference_wrapper lastMessageTimeSecondsSinceEpoch_; + std::future runFuture_; - static constexpr std::chrono::seconds CONNECTION_TIMEOUT{30}; + static constexpr std::chrono::seconds WS_TIMEOUT{30}; static constexpr std::chrono::seconds RETRY_MAX_DELAY{30}; static constexpr std::chrono::seconds RETRY_DELAY{1}; @@ -103,7 +109,7 @@ class SubscriptionSource { * @param onNewLedger The onNewLedger hook. Called when a new ledger is received * @param onLedgerClosed The onLedgerClosed hook. Called when the ledger is closed but only if the source is * forwarding - * @param connectionTimeout The connection timeout. Defaults to 30 seconds + * @param wsTimeout A timeout for websocket operations. Defaults to 30 seconds * @param retryDelay The retry delay. Defaults to 1 second */ SubscriptionSource( @@ -115,7 +121,7 @@ class SubscriptionSource { OnConnectHook onConnect, OnDisconnectHook onDisconnect, OnLedgerClosedHook onLedgerClosed, - std::chrono::steady_clock::duration const connectionTimeout = CONNECTION_TIMEOUT, + std::chrono::steady_clock::duration const wsTimeout = WS_TIMEOUT, std::chrono::steady_clock::duration const retryDelay = RETRY_DELAY ); diff --git a/src/util/requests/impl/WsConnectionImpl.hpp b/src/util/requests/impl/WsConnectionImpl.hpp index df66bcaf3..04eb95e01 100644 --- a/src/util/requests/impl/WsConnectionImpl.hpp +++ b/src/util/requests/impl/WsConnectionImpl.hpp @@ -39,8 +39,10 @@ #include #include +#include #include #include +#include #include #include #include @@ -123,15 +125,20 @@ class WsConnectionImpl : public WsConnection { static void withTimeout(Operation&& operation, boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout) { + auto isCompleted = std::make_shared(false); boost::asio::cancellation_signal cancellationSignal; auto cyield = boost::asio::bind_cancellation_slot(cancellationSignal.slot(), yield); boost::asio::steady_timer timer{boost::asio::get_associated_executor(cyield), timeout}; - timer.async_wait([&cancellationSignal](boost::system::error_code errorCode) { - if (!errorCode) + + // The timer below can be called with no error code even if the operation is completed before the timeout, so we + // need an additional flag here + timer.async_wait([&cancellationSignal, isCompleted](boost::system::error_code errorCode) { + if (!errorCode and not *isCompleted) cancellationSignal.emit(boost::asio::cancellation_type::terminal); }); operation(cyield); + *isCompleted = true; } static boost::system::error_code diff --git a/tests/unit/etl/LoadBalancerTests.cpp b/tests/unit/etl/LoadBalancerTests.cpp index 50ad004db..16b1cd967 100644 --- a/tests/unit/etl/LoadBalancerTests.cpp +++ b/tests/unit/etl/LoadBalancerTests.cpp @@ -271,15 +271,12 @@ TEST_F(LoadBalancerOnDisconnectHookTests, source0Disconnects) EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false)); EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true)); EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true)); - sourceFactory_.callbacksAt(0).onDisconnect(); + sourceFactory_.callbacksAt(0).onDisconnect(true); } TEST_F(LoadBalancerOnDisconnectHookTests, source1Disconnects) { - EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true)); - EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true)); - EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false)); - sourceFactory_.callbacksAt(1).onDisconnect(); + sourceFactory_.callbacksAt(1).onDisconnect(false); } TEST_F(LoadBalancerOnDisconnectHookTests, source0DisconnectsAndConnectsBack) @@ -288,29 +285,25 @@ TEST_F(LoadBalancerOnDisconnectHookTests, source0DisconnectsAndConnectsBack) EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false)); EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true)); EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true)); - sourceFactory_.callbacksAt(0).onDisconnect(); + sourceFactory_.callbacksAt(0).onDisconnect(true); sourceFactory_.callbacksAt(0).onConnect(); } TEST_F(LoadBalancerOnDisconnectHookTests, source1DisconnectsAndConnectsBack) { - EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true)); - EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true)); - EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false)); - sourceFactory_.callbacksAt(1).onDisconnect(); - + sourceFactory_.callbacksAt(1).onDisconnect(false); sourceFactory_.callbacksAt(1).onConnect(); } TEST_F(LoadBalancerOnConnectHookTests, bothSourcesDisconnectAndConnectBack) { - EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).Times(2).WillRepeatedly(Return(false)); - EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false)).Times(2); - EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).Times(2).WillRepeatedly(Return(false)); - EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false)).Times(2); - sourceFactory_.callbacksAt(0).onDisconnect(); - sourceFactory_.callbacksAt(1).onDisconnect(); + EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(false)); + EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false)); + EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(false)); + EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false)); + sourceFactory_.callbacksAt(0).onDisconnect(true); + sourceFactory_.callbacksAt(1).onDisconnect(false); EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true)); EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true)); @@ -353,12 +346,7 @@ TEST_F(LoadBalancer3SourcesTests, forwardingUpdate) sourceFactory_.callbacksAt(1).onConnect(); // Source 0 got disconnected - EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(false)); - EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false)); - EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true)); - EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true)); - EXPECT_CALL(sourceFactory_.sourceAt(2), setForwarding(false)); // only source 1 must be forwarding - sourceFactory_.callbacksAt(0).onDisconnect(); + sourceFactory_.callbacksAt(0).onDisconnect(false); } struct LoadBalancerLoadInitialLedgerTests : LoadBalancerOnConnectHookTests { diff --git a/tests/unit/etl/SubscriptionSourceTests.cpp b/tests/unit/etl/SubscriptionSourceTests.cpp index 1752da7f3..9667086cd 100644 --- a/tests/unit/etl/SubscriptionSourceTests.cpp +++ b/tests/unit/etl/SubscriptionSourceTests.cpp @@ -18,31 +18,35 @@ //============================================================================== #include "etl/impl/SubscriptionSource.hpp" -#include "util/AssignRandomPort.hpp" #include "util/Fixtures.hpp" #include "util/MockNetworkValidatedLedgers.hpp" +#include "util/MockPrometheus.hpp" #include "util/MockSubscriptionManager.hpp" #include "util/TestWsServer.hpp" +#include "util/prometheus/Gauge.hpp" #include #include #include #include +#include #include #include #include #include +#include #include #include +#include #include using namespace etl::impl; using testing::MockFunction; using testing::StrictMock; -struct SubscriptionSourceConnectionTests : public NoLoggerFixture { - SubscriptionSourceConnectionTests() +struct SubscriptionSourceConnectionTestsBase : public NoLoggerFixture { + SubscriptionSourceConnectionTestsBase() { subscriptionSource_.run(); } @@ -54,7 +58,7 @@ struct SubscriptionSourceConnectionTests : public NoLoggerFixture { StrictMockSubscriptionManagerSharedPtr subscriptionManager_; StrictMock> onConnectHook_; - StrictMock> onDisconnectHook_; + StrictMock> onDisconnectHook_; StrictMock> onLedgerClosedHook_; SubscriptionSource subscriptionSource_{ @@ -66,8 +70,8 @@ struct SubscriptionSourceConnectionTests : public NoLoggerFixture { onConnectHook_.AsStdFunction(), onDisconnectHook_.AsStdFunction(), onLedgerClosedHook_.AsStdFunction(), - std::chrono::milliseconds(1), - std::chrono::milliseconds(1) + std::chrono::milliseconds(5), + std::chrono::milliseconds(5) }; [[maybe_unused]] TestWsConnection @@ -92,15 +96,17 @@ struct SubscriptionSourceConnectionTests : public NoLoggerFixture { } }; +struct SubscriptionSourceConnectionTests : util::prometheus::WithPrometheus, SubscriptionSourceConnectionTestsBase {}; + TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed) { - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed_Retry_ConnectionFailed) { - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } @@ -112,7 +118,19 @@ TEST_F(SubscriptionSourceConnectionTests, ReadError) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); + ioContext_.run(); +} + +TEST_F(SubscriptionSourceConnectionTests, ReadTimeout) +{ + boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) { + auto connection = serverConnection(yield); + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + }); + + EXPECT_CALL(onConnectHook_, Call()); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } @@ -126,7 +144,7 @@ TEST_F(SubscriptionSourceConnectionTests, ReadError_Reconnect) }); EXPECT_CALL(onConnectHook_, Call()).Times(2); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } @@ -139,14 +157,14 @@ TEST_F(SubscriptionSourceConnectionTests, IsConnected) }); EXPECT_CALL(onConnectHook_, Call()).WillOnce([this]() { EXPECT_TRUE(subscriptionSource_.isConnected()); }); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { EXPECT_FALSE(subscriptionSource_.isConnected()); subscriptionSource_.stop(); }); ioContext_.run(); } -struct SubscriptionSourceReadTests : public SubscriptionSourceConnectionTests { +struct SubscriptionSourceReadTestsBase : public SubscriptionSourceConnectionTestsBase { [[maybe_unused]] TestWsConnection connectAndSendMessage(std::string const message, boost::asio::yield_context yield) { @@ -157,6 +175,8 @@ struct SubscriptionSourceReadTests : public SubscriptionSourceConnectionTests { } }; +struct SubscriptionSourceReadTests : util::prometheus::WithPrometheus, SubscriptionSourceReadTestsBase {}; + TEST_F(SubscriptionSourceReadTests, GotWrongMessage_Reconnect) { boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) { @@ -167,7 +187,7 @@ TEST_F(SubscriptionSourceReadTests, GotWrongMessage_Reconnect) }); EXPECT_CALL(onConnectHook_, Call()).Times(2); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } @@ -179,7 +199,7 @@ TEST_F(SubscriptionSourceReadTests, GotResult) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } @@ -191,7 +211,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndex) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(*networkValidatedLedgers_, push(123)); ioContext_.run(); } @@ -206,7 +226,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAsString_Reconnect) }); EXPECT_CALL(onConnectHook_, Call()).Times(2); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } @@ -220,7 +240,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersAsNumber_Reconn }); EXPECT_CALL(onConnectHook_, Call()).Times(2); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } @@ -242,7 +262,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgers) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); EXPECT_TRUE(subscriptionSource_.hasLedger(123)); @@ -268,7 +288,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersWrongValue_Reco }); EXPECT_CALL(onConnectHook_, Call()).Times(2); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } @@ -286,7 +306,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAndValidatedLedgers) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(*networkValidatedLedgers_, push(123)); ioContext_.run(); @@ -306,7 +326,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosed) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } @@ -321,7 +341,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedForwardingIsSet) EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onLedgerClosedHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { + EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { EXPECT_FALSE(subscriptionSource_.isForwarding()); subscriptionSource_.stop(); }); @@ -336,7 +356,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndex) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(*networkValidatedLedgers_, push(123)); ioContext_.run(); } @@ -351,7 +371,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAsString_Recon }); EXPECT_CALL(onConnectHook_, Call()).Times(2); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } @@ -365,7 +385,7 @@ TEST_F(SubscriptionSourceReadTests, GorLedgerClosedWithValidatedLedgersAsNumber_ }); EXPECT_CALL(onConnectHook_, Call()).Times(2); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } @@ -382,7 +402,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithValidatedLedgers) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); EXPECT_FALSE(subscriptionSource_.hasLedger(0)); @@ -406,7 +426,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAndValidatedLe }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(*networkValidatedLedgers_, push(123)); ioContext_.run(); @@ -425,7 +445,7 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingFalse) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } @@ -440,7 +460,7 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingTrue) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message)); ioContext_.run(); } @@ -456,7 +476,7 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionWithMetaIsForwardingFalse) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message)).Times(0); ioContext_.run(); } @@ -469,7 +489,7 @@ TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingFalse) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } @@ -484,7 +504,7 @@ TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingTrue) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(*subscriptionManager_, forwardValidation(message)); ioContext_.run(); } @@ -497,7 +517,7 @@ TEST_F(SubscriptionSourceReadTests, GotManiefstReceivedIsForwardingFalse) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); } @@ -512,7 +532,7 @@ TEST_F(SubscriptionSourceReadTests, GotManifestReceivedIsForwardingTrue) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(*subscriptionManager_, forwardManifest(message)); ioContext_.run(); } @@ -525,7 +545,7 @@ TEST_F(SubscriptionSourceReadTests, LastMessageTime) }); EXPECT_CALL(onConnectHook_, Call()); - EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); ioContext_.run(); auto const actualLastTimeMessage = subscriptionSource_.lastMessageTime(); @@ -533,3 +553,27 @@ TEST_F(SubscriptionSourceReadTests, LastMessageTime) auto const diff = std::chrono::duration_cast(now - actualLastTimeMessage); EXPECT_LT(diff, std::chrono::milliseconds(100)); } + +struct SubscriptionSourcePrometheusCounterTests : util::prometheus::WithMockPrometheus, + SubscriptionSourceReadTestsBase {}; + +TEST_F(SubscriptionSourcePrometheusCounterTests, LastMessageTime) +{ + auto& lastMessageTimeMock = makeMock( + "subscription_source_last_message_time", fmt::format("{{source=\"127.0.0.1:{}\"}}", wsServer_.port()) + ); + boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) { + auto connection = connectAndSendMessage("some_message", yield); + connection.close(yield); + }); + + EXPECT_CALL(onConnectHook_, Call()); + EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); }); + EXPECT_CALL(lastMessageTimeMock, set).WillOnce([](int64_t value) { + auto const now = + std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) + .count(); + EXPECT_LE(now - value, 1); + }); + ioContext_.run(); +}