diff --git a/src/main/AppConnector.cpp b/src/main/AppConnector.cpp index a5953fc6ea..6c8d0abbde 100644 --- a/src/main/AppConnector.cpp +++ b/src/main/AppConnector.cpp @@ -7,6 +7,7 @@ #include "overlay/BanManager.h" #include "overlay/OverlayManager.h" #include "overlay/OverlayMetrics.h" +#include "overlay/Peer.h" #include "util/Timer.h" namespace stellar @@ -129,4 +130,11 @@ AppConnector::getOverlayMetrics() return mApp.getOverlayManager().getOverlayMetrics(); } +bool +AppConnector::checkScheduledAndCache( + std::shared_ptr msgTracker) +{ + return mApp.getOverlayManager().checkScheduledAndCache(msgTracker); +} + } \ No newline at end of file diff --git a/src/main/AppConnector.h b/src/main/AppConnector.h index 6e6d19ce52..9cf1236fdc 100644 --- a/src/main/AppConnector.h +++ b/src/main/AppConnector.h @@ -14,6 +14,7 @@ struct OverlayMetrics; class SorobanNetworkConfig; class SorobanMetrics; struct LedgerTxnDelta; +class CapacityTrackedMessage; // Helper class to isolate access to Application; all function helpers must // either be called from main or be thread-sade @@ -51,5 +52,8 @@ class AppConnector Config const& getConfig() const; bool overlayShuttingDown() const; OverlayMetrics& getOverlayMetrics(); + // This method is always exclusively called from one thread + bool + checkScheduledAndCache(std::shared_ptr msgTracker); }; } \ No newline at end of file diff --git a/src/main/Config.cpp b/src/main/Config.cpp index 3abb600a2b..9923da9020 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -152,7 +152,7 @@ Config::Config() : NODE_SEED(SecretKey::random()) LEDGER_PROTOCOL_MIN_VERSION_INTERNAL_ERROR_REPORT = 18; OVERLAY_PROTOCOL_MIN_VERSION = 33; - OVERLAY_PROTOCOL_VERSION = 35; + OVERLAY_PROTOCOL_VERSION = 36; VERSION_STR = STELLAR_CORE_VERSION; diff --git a/src/overlay/Floodgate.cpp b/src/overlay/Floodgate.cpp index ffe1cb8052..749fa5c507 100644 --- a/src/overlay/Floodgate.cpp +++ b/src/overlay/Floodgate.cpp @@ -56,10 +56,10 @@ Floodgate::clearBelow(uint32_t maxLedger) } bool -Floodgate::addRecord(StellarMessage const& msg, Peer::pointer peer, Hash& index) +Floodgate::addRecord(StellarMessage const& msg, Peer::pointer peer, + Hash const& index) { ZoneScoped; - index = xdrBlake2(msg); if (mShuttingDown) { return false; @@ -145,21 +145,29 @@ Floodgate::broadcast(std::shared_ptr msg, else { mSendFromBroadcast.Mark(); - std::weak_ptr weak( - std::static_pointer_cast(peer.second)); - // This is an async operation, and peer might get dropped by the - // time we actually try to send the message. This is fine, as - // sendMessage will just be a no-op in that case - mApp.postOnMainThread( - [msg, weak, log = !broadcasted]() { - auto strong = weak.lock(); - if (strong) - { - strong->sendMessage(msg, log); - } - }, - fmt::format(FMT_STRING("broadcast to {}"), - peer.second->toString())); + + if (msg->type() == SCP_MESSAGE) + { + peer.second->sendMessage(msg, !broadcasted); + } + else + { + // This is an async operation, and peer might get dropped by + // the time we actually try to send the message. This is + // fine, as sendMessage will just be a no-op in that case + std::weak_ptr weak( + std::static_pointer_cast(peer.second)); + mApp.postOnMainThread( + [msg, weak, log = !broadcasted]() { + auto strong = weak.lock(); + if (strong) + { + strong->sendMessage(msg, log); + } + }, + fmt::format(FMT_STRING("broadcast to {}"), + peer.second->toString())); + } } broadcasted = true; } diff --git a/src/overlay/Floodgate.h b/src/overlay/Floodgate.h index 17763b3cb2..bff9f82d8d 100644 --- a/src/overlay/Floodgate.h +++ b/src/overlay/Floodgate.h @@ -57,7 +57,7 @@ class Floodgate // returns true if this is a new record // fills msgID with msg's hash bool addRecord(StellarMessage const& msg, Peer::pointer fromPeer, - Hash& msgID); + Hash const& msgID); // returns true if msg was sent to at least one peer // The hash required for transactions diff --git a/src/overlay/FlowControl.cpp b/src/overlay/FlowControl.cpp index e5f8e7c4fd..3d70c0c0af 100644 --- a/src/overlay/FlowControl.cpp +++ b/src/overlay/FlowControl.cpp @@ -35,7 +35,7 @@ FlowControl::FlowControl(AppConnector& connector, bool useBackgroundThread) : mFlowControlCapacity(connector.getConfig(), mNodeID) , mFlowControlBytesCapacity( connector.getConfig(), mNodeID, - connector.getOverlayManager().getFlowControlBytesConfig().mTotal) + connector.getOverlayManager().getFlowControlBytesTotal()) , mOverlayMetrics(connector.getOverlayManager().getOverlayMetrics()) , mAppConnector(connector) , mUseBackgroundThread(useBackgroundThread) @@ -240,7 +240,6 @@ SendMoreCapacity FlowControl::endMessageProcessing(StellarMessage const& msg) { ZoneScoped; - releaseAssert(threadIsMain()); std::lock_guard guard(mFlowControlMutex); mFloodDataProcessed += mFlowControlCapacity.releaseLocalCapacity(msg); @@ -252,9 +251,8 @@ FlowControl::endMessageProcessing(StellarMessage const& msg) bool shouldSendMore = mFloodDataProcessed == mAppConnector.getConfig().FLOW_CONTROL_SEND_MORE_BATCH_SIZE; - auto const byteBatchSize = mAppConnector.getOverlayManager() - .getFlowControlBytesConfig() - .mBatchSize; + auto const byteBatchSize = + OverlayManager::getFlowControlBytesBatch(mAppConnector.getConfig()); shouldSendMore = shouldSendMore || mFloodDataProcessedBytes >= byteBatchSize; @@ -560,7 +558,6 @@ bool FlowControl::stopThrottling() { std::lock_guard guard(mFlowControlMutex); - releaseAssert(threadIsMain()); if (mLastThrottle) { CLOG_DEBUG(Overlay, "Stop throttling reading from peer {}", diff --git a/src/overlay/OverlayManager.h b/src/overlay/OverlayManager.h index cc6844f9bb..8b5b6bf912 100644 --- a/src/overlay/OverlayManager.h +++ b/src/overlay/OverlayManager.h @@ -4,6 +4,7 @@ // under the Apache License, Version 2.0. See the COPYING file at the root // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 +#include "crypto/BLAKE2.h" #include "overlay/Peer.h" /** @@ -22,7 +23,7 @@ * The `StellarMessage` union contains 3 logically distinct kinds of message: * * - Messages directed to or from a specific peer, with or without a response: - * HELLO, GET_PEERS, PEERS, DONT_HAVE, ERROR_MSG + * HELLO, PEERS, DONT_HAVE, ERROR_MSG * * - One-way broadcast messages informing other peers of an event: * TRANSACTION and SCP_MESSAGE @@ -54,12 +55,6 @@ struct StellarMessage; class OverlayManager { public: - struct AdjustedFlowControlConfig - { - uint32_t mTotal; - uint32_t mBatchSize; - }; - static int constexpr MIN_INBOUND_FACTOR = 3; static std::unique_ptr create(Application& app); @@ -67,6 +62,7 @@ class OverlayManager // Drop all PeerRecords from the Database static void dropAll(Database& db); static bool isFloodMessage(StellarMessage const& msg); + static uint32_t getFlowControlBytesBatch(Config const& cfg); // Flush all FloodGate and ItemFetcher state for ledgers older than // `ledgerSeq`. @@ -90,18 +86,17 @@ class OverlayManager // Returns true if this is a new message // fills msgID with msg's hash virtual bool recvFloodedMsgID(StellarMessage const& msg, Peer::pointer peer, - Hash& msgID) = 0; + Hash const& msgID) = 0; bool recvFloodedMsg(StellarMessage const& msg, Peer::pointer peer) { - Hash msgID; - return recvFloodedMsgID(msg, peer, msgID); + return recvFloodedMsgID(msg, peer, xdrBlake2(msg)); } // Process incoming transaction, pass it down to the transaction queue - virtual void recvTransaction(StellarMessage const& msg, - Peer::pointer peer) = 0; + virtual void recvTransaction(StellarMessage const& msg, Peer::pointer peer, + Hash const& index) = 0; // removes msgID from the floodgate's internal state // as it's not tracked anymore, calling "broadcast" with a (now forgotten) @@ -207,9 +202,16 @@ class OverlayManager virtual void recordMessageMetric(StellarMessage const& stellarMsg, Peer::pointer peer) = 0; - virtual AdjustedFlowControlConfig getFlowControlBytesConfig() const = 0; + virtual uint32_t getFlowControlBytesTotal() const = 0; + virtual ~OverlayManager() { } + + // Is message already referenced by the scheduler + // This method is always called from one thread, therefore no cache + // synchorization is needed + virtual bool + checkScheduledAndCache(std::shared_ptr tracker) = 0; }; } diff --git a/src/overlay/OverlayManagerImpl.cpp b/src/overlay/OverlayManagerImpl.cpp index a777cee998..7cfbdd5d90 100644 --- a/src/overlay/OverlayManagerImpl.cpp +++ b/src/overlay/OverlayManagerImpl.cpp @@ -321,6 +321,7 @@ OverlayManagerImpl::OverlayManagerImpl(Application& app) mApp.getConfig().TARGET_PEER_CONNECTIONS, mSurveyManager) , mResolvingPeersWithBackoff(true) , mResolvingPeersRetryCount(0) + , mScheduledMessages(100000, true) { mPeerSources[PeerType::INBOUND] = std::make_unique( mPeerManager, RandomPeerSource::nextAttemptCutoff(PeerType::INBOUND)); @@ -356,9 +357,10 @@ OverlayManagerImpl::start() mTxDemandsManager.start(); } -OverlayManager::AdjustedFlowControlConfig -OverlayManagerImpl::getFlowControlBytesConfig() const +uint32_t +OverlayManagerImpl::getFlowControlBytesTotal() const { + releaseAssert(threadIsMain()); auto const maxTxSize = mApp.getHerder().getMaxTxSize(); releaseAssert(maxTxSize > 0); auto const& cfg = mApp.getConfig(); @@ -373,17 +375,26 @@ OverlayManagerImpl::getFlowControlBytesConfig() const INITIAL_FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES >= maxTxSize)) { - return {static_cast(maxTxSize) + - INITIAL_FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES, - INITIAL_FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES}; + return maxTxSize + INITIAL_FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES; } - return {INITIAL_PEER_FLOOD_READING_CAPACITY_BYTES, - INITIAL_FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES}; + return INITIAL_PEER_FLOOD_READING_CAPACITY_BYTES; } // If flow control parameters were provided, return them - return {cfg.PEER_FLOOD_READING_CAPACITY_BYTES, - cfg.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES}; + return cfg.PEER_FLOOD_READING_CAPACITY_BYTES; +} + +uint32_t +OverlayManager::getFlowControlBytesBatch(Config const& cfg) +{ + if (cfg.PEER_FLOOD_READING_CAPACITY_BYTES == 0 && + cfg.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES == 0) + { + return INITIAL_FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES; + } + + // If flow control parameters were provided, return them + return cfg.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES; } void @@ -1140,15 +1151,40 @@ OverlayManagerImpl::shufflePeerList(std::vector& peerList) bool OverlayManagerImpl::recvFloodedMsgID(StellarMessage const& msg, - Peer::pointer peer, Hash& msgID) + Peer::pointer peer, Hash const& msgID) { ZoneScoped; return mFloodGate.addRecord(msg, peer, msgID); } +bool +OverlayManagerImpl::checkScheduledAndCache( + std::shared_ptr tracker) +{ +#ifndef BUILD_TESTS + releaseAssert(!threadIsMain() || + !mApp.getConfig().BACKGROUND_OVERLAY_PROCESSING); +#endif + if (!tracker->maybeGetHash()) + { + return false; + } + auto index = tracker->maybeGetHash().value(); + if (mScheduledMessages.exists(index)) + { + if (mScheduledMessages.get(index).lock()) + { + return true; + } + } + mScheduledMessages.put(index, + std::weak_ptr(tracker)); + return false; +} + void OverlayManagerImpl::recvTransaction(StellarMessage const& msg, - Peer::pointer peer) + Peer::pointer peer, Hash const& index) { ZoneScoped; auto transaction = TransactionFrameBase::makeTransactionFromWire( @@ -1157,8 +1193,7 @@ OverlayManagerImpl::recvTransaction(StellarMessage const& msg, { // record that this peer sent us this transaction // add it to the floodmap so that this peer gets credit for it - Hash msgID; - recvFloodedMsgID(msg, peer, msgID); + recvFloodedMsgID(msg, peer, index); mTxDemandsManager.recordTxPullLatency(transaction->getFullHash(), peer); @@ -1171,7 +1206,7 @@ OverlayManagerImpl::recvTransaction(StellarMessage const& msg, addResult.code == TransactionQueue::AddResultCode::ADD_STATUS_DUPLICATE)) { - forgetFloodedMsg(msgID); + forgetFloodedMsg(index); CLOG_DEBUG(Overlay, "Peer::recvTransaction Discarded transaction {} from {}", hexAbbrev(transaction->getFullHash()), peer->toString()); diff --git a/src/overlay/OverlayManagerImpl.h b/src/overlay/OverlayManagerImpl.h index 421574922a..1cf0e14963 100644 --- a/src/overlay/OverlayManagerImpl.h +++ b/src/overlay/OverlayManagerImpl.h @@ -113,9 +113,9 @@ class OverlayManagerImpl : public OverlayManager void clearLedgersBelow(uint32_t ledgerSeq, uint32_t lclSeq) override; bool recvFloodedMsgID(StellarMessage const& msg, Peer::pointer peer, - Hash& msgID) override; - void recvTransaction(StellarMessage const& msg, - Peer::pointer peer) override; + Hash const& msgID) override; + void recvTransaction(StellarMessage const& msg, Peer::pointer peer, + Hash const& index) override; void forgetFloodedMsg(Hash const& msgID) override; void recvTxDemand(FloodDemand const& dmd, Peer::pointer peer) override; bool broadcastMessage(std::shared_ptr msg, @@ -181,6 +181,8 @@ class OverlayManagerImpl : public OverlayManager std::future mResolvedPeers; bool mResolvingPeersWithBackoff; int mResolvingPeersRetryCount; + RandomEvictionCache> + mScheduledMessages; void triggerPeerResolution(); std::pair, bool> @@ -209,10 +211,13 @@ class OverlayManagerImpl : public OverlayManager void extractPeersFromMap(std::map const& peerMap, std::vector& result); void shufflePeerList(std::vector& peerList); - AdjustedFlowControlConfig getFlowControlBytesConfig() const override; + uint32_t getFlowControlBytesTotal() const override; // Returns `true` iff the overlay can accept the outbound peer at `address`. // Logs whenever a peer cannot be accepted. bool canAcceptOutboundPeer(PeerBareAddress const& address) const; + + bool checkScheduledAndCache( + std::shared_ptr tracker) override; }; } diff --git a/src/overlay/OverlayMetrics.cpp b/src/overlay/OverlayMetrics.cpp index 6125acae76..ea2c8db944 100644 --- a/src/overlay/OverlayMetrics.cpp +++ b/src/overlay/OverlayMetrics.cpp @@ -43,8 +43,6 @@ OverlayMetrics::OverlayMetrics(Application& app) , mRecvAuthTimer(app.getMetrics().NewTimer({"overlay", "recv", "auth"})) , mRecvDontHaveTimer( app.getMetrics().NewTimer({"overlay", "recv", "dont-have"})) - , mRecvGetPeersTimer( - app.getMetrics().NewTimer({"overlay", "recv", "get-peers"})) , mRecvPeersTimer(app.getMetrics().NewTimer({"overlay", "recv", "peers"})) , mRecvGetTxSetTimer( app.getMetrics().NewTimer({"overlay", "recv", "get-txset"})) @@ -114,8 +112,6 @@ OverlayMetrics::OverlayMetrics(Application& app) app.getMetrics().NewMeter({"overlay", "send", "auth"}, "message")) , mSendDontHaveMeter(app.getMetrics().NewMeter( {"overlay", "send", "dont-have"}, "message")) - , mSendGetPeersMeter(app.getMetrics().NewMeter( - {"overlay", "send", "get-peers"}, "message")) , mSendPeersMeter( app.getMetrics().NewMeter({"overlay", "send", "peers"}, "message")) , mSendGetTxSetMeter(app.getMetrics().NewMeter( diff --git a/src/overlay/OverlayMetrics.h b/src/overlay/OverlayMetrics.h index 64cb9dadc8..691a3d0b54 100644 --- a/src/overlay/OverlayMetrics.h +++ b/src/overlay/OverlayMetrics.h @@ -44,7 +44,6 @@ struct OverlayMetrics medida::Timer& mRecvHelloTimer; medida::Timer& mRecvAuthTimer; medida::Timer& mRecvDontHaveTimer; - medida::Timer& mRecvGetPeersTimer; medida::Timer& mRecvPeersTimer; medida::Timer& mRecvGetTxSetTimer; medida::Timer& mRecvTxSetTimer; @@ -84,7 +83,6 @@ struct OverlayMetrics medida::Meter& mSendHelloMeter; medida::Meter& mSendAuthMeter; medida::Meter& mSendDontHaveMeter; - medida::Meter& mSendGetPeersMeter; medida::Meter& mSendPeersMeter; medida::Meter& mSendGetTxSetMeter; medida::Meter& mSendTransactionMeter; diff --git a/src/overlay/Peer.cpp b/src/overlay/Peer.cpp index cb54af4ddd..c331c8d170 100644 --- a/src/overlay/Peer.cpp +++ b/src/overlay/Peer.cpp @@ -5,6 +5,7 @@ #include "overlay/Peer.h" #include "BanManager.h" +#include "crypto/BLAKE2.h" #include "crypto/CryptoError.h" #include "crypto/Hex.h" #include "crypto/KeyUtils.h" @@ -50,6 +51,7 @@ using namespace soci; static constexpr VirtualClock::time_point PING_NOT_SENT = VirtualClock::time_point::min(); +static constexpr uint32_t QUERY_RESPONSE_MULTIPLIER = 5; Peer::Peer(Application& app, PeerRole role) : mAppConnector(app.getAppConnector()) @@ -77,8 +79,8 @@ Peer::Peer(Application& app, PeerRole role) std::copy(bytes.begin(), bytes.end(), mSendNonce.begin()); } -Peer::MsgCapacityTracker::MsgCapacityTracker(std::weak_ptr peer, - StellarMessage const& msg) +CapacityTrackedMessage::CapacityTrackedMessage(std::weak_ptr peer, + StellarMessage const& msg) : mWeakPeer(peer), mMsg(msg) { auto self = mWeakPeer.lock(); @@ -87,9 +89,19 @@ Peer::MsgCapacityTracker::MsgCapacityTracker(std::weak_ptr peer, throw std::runtime_error("Invalid peer"); } self->beginMessageProcessing(mMsg); + if (mMsg.type() == SCP_MESSAGE || mMsg.type() == TRANSACTION) + { + mMaybeHash = xdrBlake2(msg); + } +} + +std::optional +CapacityTrackedMessage::maybeGetHash() const +{ + return mMaybeHash; } -Peer::MsgCapacityTracker::~MsgCapacityTracker() +CapacityTrackedMessage::~CapacityTrackedMessage() { auto self = mWeakPeer.lock(); if (self) @@ -99,7 +111,7 @@ Peer::MsgCapacityTracker::~MsgCapacityTracker() } StellarMessage const& -Peer::MsgCapacityTracker::getMessage() +CapacityTrackedMessage::getMessage() const { return mMsg; } @@ -143,7 +155,6 @@ Peer::beginMessageProcessing(StellarMessage const& msg) void Peer::endMessageProcessing(StellarMessage const& msg) { - releaseAssert(threadIsMain()); RECURSIVE_LOCK_GUARD(mStateMutex, guard); if (shouldAbort(guard)) @@ -513,17 +524,6 @@ Peer::sendGetQuorumSet(uint256 const& setID) sendMessage(msgPtr); } -void -Peer::sendGetPeers() -{ - ZoneScoped; - releaseAssert(threadIsMain()); - StellarMessage newMsg; - newMsg.type(GET_PEERS); - auto msgPtr = std::make_shared(newMsg); - sendMessage(msgPtr); -} - void Peer::sendGetScpState(uint32 ledgerSeq) { @@ -589,7 +589,6 @@ void Peer::sendSendMore(uint32_t numMessages, uint32_t numBytes) { ZoneScoped; - releaseAssert(threadIsMain()); auto m = std::make_shared(); m->type(SEND_MORE_EXTENDED); @@ -614,8 +613,6 @@ Peer::msgSummary(StellarMessage const& msg) case DONT_HAVE: return fmt::format(FMT_STRING("DONTHAVE {}:{}"), msg.dontHave().type, hexAbbrev(msg.dontHave().reqHash)); - case GET_PEERS: - return "GETPEERS"; case PEERS: return fmt::format(FMT_STRING("PEERS {:d}"), msg.peers().size()); @@ -685,7 +682,6 @@ void Peer::sendMessage(std::shared_ptr msg, bool log) { ZoneScoped; - releaseAssert(threadIsMain()); CLOG_TRACE(Overlay, "send: {} to : {}", msgSummary(*msg), mAppConnector.getConfig().toShortString(mPeerID)); @@ -704,9 +700,6 @@ Peer::sendMessage(std::shared_ptr msg, bool log) case DONT_HAVE: mOverlayMetrics.mSendDontHaveMeter.Mark(); break; - case GET_PEERS: - mOverlayMetrics.mSendGetPeersMeter.Mark(); - break; case PEERS: mOverlayMetrics.mSendPeersMeter.Mark(); break; @@ -761,6 +754,7 @@ Peer::sendMessage(std::shared_ptr msg, bool log) releaseAssert(mFlowControl); if (OverlayManager::isFloodMessage(*msg)) { + releaseAssert(threadIsMain()); mFlowControl->addMsgAndMaybeTrimQueue(msg); maybeExecuteInBackground( "Peer::sendMessage maybeSendNextBatch", @@ -899,22 +893,13 @@ Peer::recvAuthenticatedMessage(AuthenticatedMessage&& msg) } } - // Verify SCP signatures when in the background - if (useBackgroundThread() && msg.v0().message.type() == SCP_MESSAGE) - { - auto& envelope = msg.v0().message.envelope(); - PubKeyUtils::verifySig(envelope.statement.nodeID, envelope.signature, - xdr::xdr_to_opaque(mNetworkID, ENVELOPE_TYPE_SCP, - envelope.statement)); - } - // NOTE: Additionally, we may use state snapshots to verify TRANSACTION type // messages in the background. // Start tracking capacity here, so read throttling is applied // appropriately. Flow control might not be started at that time - auto msgTracker = std::make_shared(shared_from_this(), - msg.v0().message); + auto msgTracker = std::make_shared( + shared_from_this(), msg.v0().message); std::string cat; Scheduler::ActionType type = Scheduler::ActionType::NORMAL_ACTION; @@ -926,7 +911,6 @@ Peer::recvAuthenticatedMessage(AuthenticatedMessage&& msg) cat = AUTH_ACTION_QUEUE; break; // control messages - case GET_PEERS: case PEERS: case ERROR_MSG: case SEND_MORE: @@ -969,6 +953,22 @@ Peer::recvAuthenticatedMessage(AuthenticatedMessage&& msg) // scheduler queue auto queueName = isAuthenticated(guard) ? cat : AUTH_ACTION_QUEUE; type = isAuthenticated(guard) ? type : Scheduler::ActionType::NORMAL_ACTION; + + // If a message is already scheduled, drop + if (mAppConnector.checkScheduledAndCache(msgTracker)) + { + return true; + } + + // Verify SCP signatures when in the background + if (useBackgroundThread() && msg.v0().message.type() == SCP_MESSAGE) + { + auto& envelope = msg.v0().message.envelope(); + PubKeyUtils::verifySig(envelope.statement.nodeID, envelope.signature, + xdr::xdr_to_opaque(mNetworkID, ENVELOPE_TYPE_SCP, + envelope.statement)); + } + // Subtle: move `msgTracker` shared_ptr into the lambda, to ensure // its destructor is invoked from main thread only. Note that we can't use // unique_ptr here, because std::function requires its callable @@ -986,7 +986,7 @@ Peer::recvAuthenticatedMessage(AuthenticatedMessage&& msg) } void -Peer::recvMessage(std::shared_ptr msgTracker) +Peer::recvMessage(std::shared_ptr msgTracker) { ZoneScoped; releaseAssert(threadIsMain()); @@ -1017,7 +1017,7 @@ Peer::recvMessage(std::shared_ptr msgTracker) try { - recvRawMessage(stellarMsg); + recvRawMessage(msgTracker); } catch (CryptoError const& e) { @@ -1047,11 +1047,12 @@ Peer::recvSendMore(StellarMessage const& msg) } void -Peer::recvRawMessage(StellarMessage const& stellarMsg) +Peer::recvRawMessage(std::shared_ptr msgTracker) { ZoneScoped; releaseAssert(threadIsMain()); + auto const& stellarMsg = msgTracker->getMessage(); auto peerStr = toString(); ZoneText(peerStr.c_str(), peerStr.size()); @@ -1075,6 +1076,13 @@ Peer::recvRawMessage(StellarMessage const& stellarMsg) return; } + if (stellarMsg.type() == PEERS && getRole() == REMOTE_CALLED_US) + { + drop(fmt::format("received {}", stellarMsg.type()), + Peer::DropDirection::WE_DROPPED_REMOTE); + return; + } + releaseAssert(isAuthenticated(guard) || stellarMsg.type() == HELLO || stellarMsg.type() == AUTH || stellarMsg.type() == ERROR_MSG); @@ -1112,13 +1120,6 @@ Peer::recvRawMessage(StellarMessage const& stellarMsg) } break; - case GET_PEERS: - { - auto t = mOverlayMetrics.mRecvGetPeersTimer.TimeScope(); - recvGetPeers(stellarMsg); - } - break; - case PEERS: { auto t = mOverlayMetrics.mRecvPeersTimer.TimeScope(); @@ -1180,7 +1181,7 @@ Peer::recvRawMessage(StellarMessage const& stellarMsg) case TRANSACTION: { auto t = mOverlayMetrics.mRecvTransactionTimer.TimeScope(); - recvTransaction(stellarMsg); + recvTransaction(*msgTracker); } break; @@ -1201,7 +1202,7 @@ Peer::recvRawMessage(StellarMessage const& stellarMsg) case SCP_MESSAGE: { auto t = mOverlayMetrics.mRecvSCPMessageTimer.TimeScope(); - recvSCPMessage(stellarMsg); + recvSCPMessage(*msgTracker); } break; @@ -1252,11 +1253,32 @@ Peer::recvDontHave(StellarMessage const& msg) msg.dontHave().type, msg.dontHave().reqHash, shared_from_this()); } +bool +Peer::process(QueryInfo& queryInfo) +{ + auto const& cfg = mAppConnector.getConfig(); + std::chrono::seconds const QUERY_WINDOW = + cfg.getExpectedLedgerCloseTime() * cfg.MAX_SLOTS_TO_REMEMBER; + uint32_t const QUERIES_PER_WINDOW = + QUERY_WINDOW.count() * QUERY_RESPONSE_MULTIPLIER; + if (mAppConnector.now() - queryInfo.mLastTimeStamp >= QUERY_WINDOW) + { + queryInfo.mLastTimeStamp = mAppConnector.now(); + queryInfo.mNumQueries = 0; + } + return queryInfo.mNumQueries < QUERIES_PER_WINDOW; +} + void Peer::recvGetTxSet(StellarMessage const& msg) { ZoneScoped; releaseAssert(threadIsMain()); + if (!process(mTxSetQueryInfo)) + { + return; + } + auto self = shared_from_this(); if (auto txSet = mAppConnector.getHerder().getTxSet(msg.txSetHash())) { @@ -1289,6 +1311,8 @@ Peer::recvGetTxSet(StellarMessage const& msg) : GENERALIZED_TX_SET; sendDontHave(messageType, msg.txSetHash()); } + + mTxSetQueryInfo.mNumQueries++; } void @@ -1310,11 +1334,13 @@ Peer::recvGeneralizedTxSet(StellarMessage const& msg) } void -Peer::recvTransaction(StellarMessage const& msg) +Peer::recvTransaction(CapacityTrackedMessage const& msg) { ZoneScoped; releaseAssert(threadIsMain()); - mAppConnector.getOverlayManager().recvTransaction(msg, shared_from_this()); + releaseAssert(msg.maybeGetHash()); + mAppConnector.getOverlayManager().recvTransaction( + msg.getMessage(), shared_from_this(), msg.maybeGetHash().value()); } Hash @@ -1384,6 +1410,10 @@ Peer::recvGetSCPQuorumSet(StellarMessage const& msg) { ZoneScoped; releaseAssert(threadIsMain()); + if (!process(mQSetQueryInfo)) + { + return; + } SCPQuorumSetPtr qset = mAppConnector.getHerder().getQSet(msg.qSetHash()); @@ -1397,6 +1427,7 @@ Peer::recvGetSCPQuorumSet(StellarMessage const& msg) sendDontHave(SCP_QUORUMSET, msg.qSetHash()); // do we want to ask other people for it? } + mQSetQueryInfo.mNumQueries++; } void Peer::recvSCPQuorumSet(StellarMessage const& msg) @@ -1409,13 +1440,13 @@ Peer::recvSCPQuorumSet(StellarMessage const& msg) } void -Peer::recvSCPMessage(StellarMessage const& msg) +Peer::recvSCPMessage(CapacityTrackedMessage const& msg) { ZoneScoped; releaseAssert(threadIsMain()); - SCPEnvelope const& envelope = msg.envelope(); + SCPEnvelope const& envelope = msg.getMessage().envelope(); - auto type = msg.envelope().statement.pledges.type(); + auto type = msg.getMessage().envelope().statement.pledges.type(); auto t = (type == SCP_ST_PREPARE ? mOverlayMetrics.mRecvSCPPrepareTimer.TimeScope() : (type == SCP_ST_CONFIRM @@ -1445,15 +1476,16 @@ Peer::recvSCPMessage(StellarMessage const& msg) ZoneText(codeStr.c_str(), codeStr.size()); // add it to the floodmap so that this peer gets credit for it - Hash msgID; - mAppConnector.getOverlayManager().recvFloodedMsgID(msg, shared_from_this(), - msgID); + releaseAssert(msg.maybeGetHash()); + mAppConnector.getOverlayManager().recvFloodedMsgID( + msg.getMessage(), shared_from_this(), msg.maybeGetHash().value()); auto res = mAppConnector.getHerder().recvSCPEnvelope(envelope); if (res == Herder::ENVELOPE_STATUS_DISCARDED) { // the message was discarded, remove it from the floodmap as well - mAppConnector.getOverlayManager().forgetFloodedMsg(msgID); + mAppConnector.getOverlayManager().forgetFloodedMsg( + msg.maybeGetHash().value()); } } @@ -1734,7 +1766,7 @@ Peer::recvAuth(StellarMessage const& msg) } uint32_t fcBytes = - mAppConnector.getOverlayManager().getFlowControlBytesConfig().mTotal; + mAppConnector.getOverlayManager().getFlowControlBytesTotal(); // Subtle: after successful auth, must send sendMore message first to // tell the other peer about the local node's reading capacity. @@ -1755,21 +1787,20 @@ Peer::recvAuth(StellarMessage const& msg) sendGetScpState(low); } -void -Peer::recvGetPeers(StellarMessage const& msg) -{ - ZoneScoped; - releaseAssert(threadIsMain()); - - sendPeers(); -} - void Peer::recvPeers(StellarMessage const& msg) { ZoneScoped; releaseAssert(threadIsMain()); + if (mPeersReceived) + { + drop(fmt::format("too many msgs {}", msg.type()), + Peer::DropDirection::WE_DROPPED_REMOTE); + return; + } + mPeersReceived = true; + for (auto const& peer : msg.peers()) { if (peer.port == 0 || peer.port > UINT16_MAX) diff --git a/src/overlay/Peer.h b/src/overlay/Peer.h index 0f0ee1c33c..53426d53ac 100644 --- a/src/overlay/Peer.h +++ b/src/overlay/Peer.h @@ -38,6 +38,7 @@ class LoopbackPeer; struct OverlayMetrics; class FlowControl; class TxAdverts; +class CapacityTrackedMessage; // Peer class represents a connected peer (either inbound or outbound) // @@ -71,7 +72,6 @@ class Peer : public std::enable_shared_from_this, std::chrono::milliseconds(1); static constexpr std::chrono::nanoseconds PEER_METRICS_RATE_UNIT = std::chrono::seconds(1); - static constexpr uint32_t FIRST_VERSION_REQUIRED_FOR_PROTOCOL_20 = 32; // The reporting will be based on the previous // PEER_METRICS_WINDOW_SIZE-second time window. @@ -89,6 +89,12 @@ class Peer : public std::enable_shared_from_this, CLOSING = 4 }; + struct QueryInfo + { + VirtualClock::time_point mLastTimeStamp; + uint32_t mNumQueries{0}; + }; + static inline int format_as(PeerState const& s) { @@ -168,17 +174,6 @@ class Peer : public std::enable_shared_from_this, // the protected state. Peer state lacking synchronization should be moved // to the private section below. protected: - class MsgCapacityTracker : private NonMovableOrCopyable - { - std::weak_ptr const mWeakPeer; - StellarMessage const mMsg; - - public: - MsgCapacityTracker(std::weak_ptr peer, StellarMessage const& msg); - StellarMessage const& getMessage(); - ~MsgCapacityTracker(); - }; - AppConnector& mAppConnector; Hash const mNetworkID; @@ -264,6 +259,9 @@ class Peer : public std::enable_shared_from_this, VirtualTimer mDelayedExecutionTimer; std::shared_ptr mTxAdverts; + QueryInfo mQSetQueryInfo; + QueryInfo mTxSetQueryInfo; + bool mPeersReceived{false}; static Hash pingIDfromTimePoint(VirtualClock::time_point const& tp); void pingPeer(); @@ -271,14 +269,13 @@ class Peer : public std::enable_shared_from_this, VirtualClock::time_point mPingSentTime; std::chrono::milliseconds mLastPing; - void recvRawMessage(StellarMessage const& msg); + void recvRawMessage(std::shared_ptr msgTracker); virtual void recvError(StellarMessage const& msg); void updatePeerRecordAfterEcho(); void updatePeerRecordAfterAuthentication(); void recvAuth(StellarMessage const& msg); void recvDontHave(StellarMessage const& msg); - void recvGetPeers(StellarMessage const& msg); void recvHello(Hello const& elo); void recvPeers(StellarMessage const& msg); void recvSurveyRequestMessage(StellarMessage const& msg); @@ -290,10 +287,10 @@ class Peer : public std::enable_shared_from_this, void recvGetTxSet(StellarMessage const& msg); void recvTxSet(StellarMessage const& msg); void recvGeneralizedTxSet(StellarMessage const& msg); - void recvTransaction(StellarMessage const& msg); + void recvTransaction(CapacityTrackedMessage const& msgTracker); void recvGetSCPQuorumSet(StellarMessage const& msg); void recvSCPQuorumSet(StellarMessage const& msg); - void recvSCPMessage(StellarMessage const& msg); + void recvSCPMessage(CapacityTrackedMessage const& msgTracker); void recvGetSCPState(StellarMessage const& msg); void recvFloodAdvert(StellarMessage const& msg); void recvFloodDemand(StellarMessage const& msg); @@ -304,8 +301,9 @@ class Peer : public std::enable_shared_from_this, void sendDontHave(MessageType type, uint256 const& itemID); void sendPeers(); void sendError(ErrorCode error, std::string const& message); + bool process(QueryInfo& queryInfo); - void recvMessage(std::shared_ptr msgTracker); + void recvMessage(std::shared_ptr msgTracker); // NB: This is a move-argument because the write-buffer has to travel // with the write-request through the async IO system, and we might have @@ -344,7 +342,6 @@ class Peer : public std::enable_shared_from_this, std::string msgSummary(StellarMessage const& stellarMsg); void sendGetTxSet(uint256 const& setID); void sendGetQuorumSet(uint256 const& setID); - void sendGetPeers(); void sendGetScpState(uint32 ledgerSeq); void sendErrorAndDrop(ErrorCode error, std::string const& message); void sendTxDemand(TxDemandVector&& demands); @@ -439,6 +436,7 @@ class Peer : public std::enable_shared_from_this, friend class LoopbackPeer; friend class PeerStub; + friend class CapacityTrackedMessage; #ifdef BUILD_TESTS std::shared_ptr @@ -499,4 +497,24 @@ class Peer : public std::enable_shared_from_this, } } }; + +// CapacityTrackedMessage is a helper class to track when the message is done +// being processed by core using RAII. On destruction, it will automatically +// signal completion to Peer. This allows Peer to track available capacity, and +// request more traffic. CapacityTrackedMessage also optionally stores a BLAKE2 +// hash of the message, so overlay can decide if a duplicate message can be +// dropped as early as possible. Note: this class has side effects; +// specifically, it may trigger a send of SEND_MORE message on destruction +class CapacityTrackedMessage : private NonMovableOrCopyable +{ + std::weak_ptr const mWeakPeer; + StellarMessage const mMsg; + std::optional mMaybeHash; + + public: + CapacityTrackedMessage(std::weak_ptr peer, StellarMessage const& msg); + StellarMessage const& getMessage() const; + ~CapacityTrackedMessage(); + std::optional maybeGetHash() const; +}; } diff --git a/src/overlay/test/ItemFetcherTests.cpp b/src/overlay/test/ItemFetcherTests.cpp index 34b2dff0bf..e2d138a488 100644 --- a/src/overlay/test/ItemFetcherTests.cpp +++ b/src/overlay/test/ItemFetcherTests.cpp @@ -465,8 +465,8 @@ TEST_CASE("next peer strategy", "[overlay][ItemFetcher]") { StellarMessage msg(SCP_MESSAGE); msg.envelope() = hundredEnvelope1; - auto index = sha256(xdr::xdr_to_opaque(msg)); - app->getOverlayManager().recvFloodedMsgID(msg, peer1, index); + app->getOverlayManager().recvFloodedMsgID(msg, peer1, + xdrBlake2(msg)); tracker->tryNextPeer(); REQUIRE(askCount == 2); auto trPeer1b = tracker->getLastAskedPeer(); diff --git a/src/overlay/test/LoopbackPeer.cpp b/src/overlay/test/LoopbackPeer.cpp index cb5f0db21c..c5a2594378 100644 --- a/src/overlay/test/LoopbackPeer.cpp +++ b/src/overlay/test/LoopbackPeer.cpp @@ -278,7 +278,7 @@ LoopbackPeer::recvMessage(xdr::msg_ptr const& msg) } void -LoopbackPeer::recvMessage(std::shared_ptr msgTracker) +LoopbackPeer::recvMessage(std::shared_ptr msgTracker) { mAppConnector.postOnMainThread( [self = shared_from_this(), msgTracker]() { @@ -565,8 +565,7 @@ LoopbackPeer::checkCapacity(std::shared_ptr otherPeer) const return otherPeer->getConfig().PEER_FLOOD_READING_CAPACITY == getFlowControl()->getCapacity().getOutboundCapacity() && otherPeer->mAppConnector.getOverlayManager() - .getFlowControlBytesConfig() - .mTotal == + .getFlowControlBytesTotal() == getFlowControl()->getCapacityBytes().getOutboundCapacity(); } } diff --git a/src/overlay/test/LoopbackPeer.h b/src/overlay/test/LoopbackPeer.h index 97490a9da3..0f2c40551b 100644 --- a/src/overlay/test/LoopbackPeer.h +++ b/src/overlay/test/LoopbackPeer.h @@ -65,7 +65,7 @@ class LoopbackPeer : public Peer } LoopbackPeer(Application& app, PeerRole role); - void recvMessage(std::shared_ptr msgTracker); + void recvMessage(std::shared_ptr msgTracker); static std::pair, std::shared_ptr> @@ -152,11 +152,11 @@ class LoopbackPeer : public Peer std::string getIP() const; - using Peer::MsgCapacityTracker; using Peer::recvMessage; using Peer::sendAuth; using Peer::sendAuthenticatedMessage; using Peer::sendMessage; + using Peer::sendPeers; friend class LoopbackPeerConnection; }; diff --git a/src/overlay/test/OverlayTests.cpp b/src/overlay/test/OverlayTests.cpp index 7d29ae7c66..9c04a429a1 100644 --- a/src/overlay/test/OverlayTests.cpp +++ b/src/overlay/test/OverlayTests.cpp @@ -188,7 +188,8 @@ TEST_CASE("flow control byte capacity", "[overlay][flowcontrol]") REQUIRE(conn.getAcceptor()->checkCapacity(conn.getInitiator())); uint64_t expectedCapacity{0}; - expectedCapacity = cfg2.PEER_FLOOD_READING_CAPACITY_BYTES - txSize; + expectedCapacity = + app2->getOverlayManager().getFlowControlBytesTotal() - txSize; SECTION("basic capacity accounting") { @@ -202,9 +203,8 @@ TEST_CASE("flow control byte capacity", "[overlay][flowcontrol]") ->getCapacityBytes() .getOutboundCapacity() == expectedCapacity); REQUIRE(conn.getInitiator()->getTxQueueByteCount() == 0); - auto msgTracker = - std::make_shared( - conn.getAcceptor(), tx1); + auto msgTracker = std::make_shared( + conn.getAcceptor(), tx1); conn.getAcceptor()->recvMessage(msgTracker); REQUIRE(conn.getAcceptor() ->getFlowControl() @@ -238,7 +238,7 @@ TEST_CASE("flow control byte capacity", "[overlay][flowcontrol]") ->getCapacityBytes() .getCapacity() .mFloodCapacity == - cfg2.PEER_FLOOD_READING_CAPACITY_BYTES); + app2->getOverlayManager().getFlowControlBytesTotal()); if (shouldRequestMore) { REQUIRE(conn.getInitiator()->checkCapacity(conn.getAcceptor())); @@ -249,7 +249,8 @@ TEST_CASE("flow control byte capacity", "[overlay][flowcontrol]") ->getFlowControl() ->getCapacityBytes() .getOutboundCapacity() == - (cfg2.PEER_FLOOD_READING_CAPACITY_BYTES - txSize)); + (app2->getOverlayManager().getFlowControlBytesTotal() - + txSize)); } REQUIRE(conn.getAcceptor()->checkCapacity(conn.getInitiator())); } @@ -279,6 +280,12 @@ TEST_CASE("flow control byte capacity", "[overlay][flowcontrol]") cfg2.FLOW_CONTROL_SEND_MORE_BATCH_SIZE = 1; test(true); } + SECTION("automatic calculation of byte configs") + { + cfg2.PEER_FLOOD_READING_CAPACITY_BYTES = 0; + cfg2.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES = 0; + test(false); + } SECTION("mixed versions") { cfg1.OVERLAY_PROTOCOL_VERSION = cfg1.OVERLAY_PROTOCOL_MIN_VERSION; @@ -763,6 +770,49 @@ TEST_CASE("failed auth", "[overlay][connections]") testutil::shutdownWorkScheduler(*app1); } +TEST_CASE("peers during auth", "[overlay][connections]") +{ + VirtualClock clock; + Config const& cfg1 = getTestConfig(0); + Config const& cfg2 = getTestConfig(1); + auto app1 = createTestApplication(clock, cfg1); + auto app2 = createTestApplication(clock, cfg2); + // Put a peer into Acceptor's DB to trigger sending of peers during auth + app2->getOverlayManager().getPeerManager().ensureExists( + PeerBareAddress{"1.1.1.1", 11625}); + + LoopbackPeerConnection conn(*app1, *app2); + testutil::crankSome(clock); + + REQUIRE(conn.getInitiator()->isAuthenticatedForTesting()); + REQUIRE(conn.getAcceptor()->isAuthenticatedForTesting()); + + StellarMessage newMsg; + newMsg.type(PEERS); + std::string dropReason; + SECTION("inbound") + { + dropReason = "received PEERS"; + conn.getInitiator()->sendMessage( + std::make_shared(newMsg)); + } + SECTION("outbound") + { + dropReason = "too many msgs PEERS"; + conn.getAcceptor()->sendMessage( + std::make_shared(newMsg)); + } + + testutil::crankFor(clock, std::chrono::seconds(1)); + + REQUIRE(!conn.getInitiator()->isConnectedForTesting()); + REQUIRE(!conn.getAcceptor()->isConnectedForTesting()); + REQUIRE(conn.getAcceptor()->getDropReason() == dropReason); + + testutil::shutdownWorkScheduler(*app2); + testutil::shutdownWorkScheduler(*app1); +} + TEST_CASE("outbound queue filtering", "[overlay][connections]") { auto networkID = sha256(getTestConfig().NETWORK_PASSPHRASE); @@ -1766,7 +1816,7 @@ TEST_CASE("drop peers who straggle", "[overlay][connections][straggler]") sendTimer.async_wait([straggler](asio::error_code const& error) { if (!error) { - straggler->sendGetPeers(); + straggler->sendGetTxSet(Hash()); } }); testutil::crankFor(clock, dur); diff --git a/src/overlay/test/TCPPeerTests.cpp b/src/overlay/test/TCPPeerTests.cpp index c8b35c62d7..bb38dfb4d9 100644 --- a/src/overlay/test/TCPPeerTests.cpp +++ b/src/overlay/test/TCPPeerTests.cpp @@ -139,21 +139,17 @@ TEST_CASE("TCPPeer can communicate", "[overlay]") s->stopOverlayTick(); // Now drop peer, ensure ERROR containing "drop reason" is properly flushed - auto& recvGetPeers = - n1->getOverlayManager().getOverlayMetrics().mRecvGetPeersTimer; - auto& recvError = - n1->getOverlayManager().getOverlayMetrics().mRecvErrorTimer; - auto prevPeers = recvGetPeers.count(); - auto prevError = recvError.count(); - p0->sendGetPeers(); + auto& msgWrite = n0->getOverlayManager().getOverlayMetrics().mMessageWrite; + auto prevMsgWrite = msgWrite.count(); + + p0->sendGetTxSet(Hash()); p0->sendErrorAndDrop(ERR_MISC, "test drop"); s->crankForAtLeast(std::chrono::seconds(1), false); REQUIRE(!p0->isConnectedForTesting()); REQUIRE(!p1->isConnectedForTesting()); - // p1 must have received getPeers, Error - REQUIRE(recvGetPeers.count() == prevPeers + 1); - REQUIRE(recvError.count() == prevError + 1); + // p0 actually sent GET_TX_SET and ERROR + REQUIRE(msgWrite.count() == prevMsgWrite + 2); s->stopAllNodes(); } diff --git a/src/protocol-curr/xdr b/src/protocol-curr/xdr index 529d5176f2..a41b2db15e 160000 --- a/src/protocol-curr/xdr +++ b/src/protocol-curr/xdr @@ -1 +1 @@ -Subproject commit 529d5176f24c73eeccfa5eba481d4e89c19b1181 +Subproject commit a41b2db15ea34a9f9da5326b996bb8a7ceb5740f diff --git a/src/protocol-next/xdr b/src/protocol-next/xdr index 9a174be684..734bcccdbb 160000 --- a/src/protocol-next/xdr +++ b/src/protocol-next/xdr @@ -1 +1 @@ -Subproject commit 9a174be684d999db0de3ba9cb29c4c3a7e1cb169 +Subproject commit 734bcccdbb6d1f7e794793ad3b8be51f3ba76f92 diff --git a/src/test/TestUtils.cpp b/src/test/TestUtils.cpp index cd5f655f77..b351112bd8 100644 --- a/src/test/TestUtils.cpp +++ b/src/test/TestUtils.cpp @@ -50,23 +50,6 @@ shutdownWorkScheduler(Application& app) } } -void -injectSendPeersAndReschedule(VirtualClock::time_point& end, VirtualClock& clock, - VirtualTimer& timer, - LoopbackPeerConnection& connection) -{ - connection.getInitiator()->sendGetPeers(); - if (clock.now() < end && connection.getInitiator()->isConnectedForTesting()) - { - timer.expires_from_now(std::chrono::milliseconds(10)); - timer.async_wait( - [&]() { - injectSendPeersAndReschedule(end, clock, timer, connection); - }, - &VirtualTimer::onFailureNoop); - } -} - std::vector getInvalidAssets(SecretKey const& issuer) { diff --git a/src/test/TestUtils.h b/src/test/TestUtils.h index 83f3e6d4f9..f134956125 100644 --- a/src/test/TestUtils.h +++ b/src/test/TestUtils.h @@ -22,9 +22,6 @@ namespace testutil { void crankSome(VirtualClock& clock); void crankFor(VirtualClock& clock, VirtualClock::duration duration); -void injectSendPeersAndReschedule(VirtualClock::time_point& end, - VirtualClock& clock, VirtualTimer& timer, - LoopbackPeerConnection& connection); void shutdownWorkScheduler(Application& app);