Skip to content

Commit

Permalink
Cleanup peers
Browse files Browse the repository at this point in the history
  • Loading branch information
marta-lokhova committed Nov 21, 2024
1 parent a679e9c commit cda7a1b
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 69 deletions.
2 changes: 1 addition & 1 deletion src/overlay/OverlayManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* The `StellarMessage` union contains 3 logically distinct kinds of message:
*
* - Messages directed to or from a specific peer, with or without a response:
* HELLO, GET_PEERS, PEERS, DONT_HAVE, ERROR_MSG
* HELLO, PEERS, DONT_HAVE, ERROR_MSG
*
* - One-way broadcast messages informing other peers of an event:
* TRANSACTION and SCP_MESSAGE
Expand Down
2 changes: 2 additions & 0 deletions src/overlay/OverlayManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1161,8 +1161,10 @@ bool
OverlayManagerImpl::checkScheduledAndCache(
std::shared_ptr<CapacityTrackedMessage> tracker)
{
#ifndef BUILD_TESTS
releaseAssert(!threadIsMain() ||
!mApp.getConfig().BACKGROUND_OVERLAY_PROCESSING);
#endif
if (!tracker->maybeGetHash())
{
return false;
Expand Down
4 changes: 0 additions & 4 deletions src/overlay/OverlayMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ OverlayMetrics::OverlayMetrics(Application& app)
, mRecvAuthTimer(app.getMetrics().NewTimer({"overlay", "recv", "auth"}))
, mRecvDontHaveTimer(
app.getMetrics().NewTimer({"overlay", "recv", "dont-have"}))
, mRecvGetPeersTimer(
app.getMetrics().NewTimer({"overlay", "recv", "get-peers"}))
, mRecvPeersTimer(app.getMetrics().NewTimer({"overlay", "recv", "peers"}))
, mRecvGetTxSetTimer(
app.getMetrics().NewTimer({"overlay", "recv", "get-txset"}))
Expand Down Expand Up @@ -114,8 +112,6 @@ OverlayMetrics::OverlayMetrics(Application& app)
app.getMetrics().NewMeter({"overlay", "send", "auth"}, "message"))
, mSendDontHaveMeter(app.getMetrics().NewMeter(
{"overlay", "send", "dont-have"}, "message"))
, mSendGetPeersMeter(app.getMetrics().NewMeter(
{"overlay", "send", "get-peers"}, "message"))
, mSendPeersMeter(
app.getMetrics().NewMeter({"overlay", "send", "peers"}, "message"))
, mSendGetTxSetMeter(app.getMetrics().NewMeter(
Expand Down
2 changes: 0 additions & 2 deletions src/overlay/OverlayMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ struct OverlayMetrics
medida::Timer& mRecvHelloTimer;
medida::Timer& mRecvAuthTimer;
medida::Timer& mRecvDontHaveTimer;
medida::Timer& mRecvGetPeersTimer;
medida::Timer& mRecvPeersTimer;
medida::Timer& mRecvGetTxSetTimer;
medida::Timer& mRecvTxSetTimer;
Expand Down Expand Up @@ -84,7 +83,6 @@ struct OverlayMetrics
medida::Meter& mSendHelloMeter;
medida::Meter& mSendAuthMeter;
medida::Meter& mSendDontHaveMeter;
medida::Meter& mSendGetPeersMeter;
medida::Meter& mSendPeersMeter;
medida::Meter& mSendGetTxSetMeter;
medida::Meter& mSendTransactionMeter;
Expand Down
48 changes: 15 additions & 33 deletions src/overlay/Peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,17 +524,6 @@ Peer::sendGetQuorumSet(uint256 const& setID)
sendMessage(msgPtr);
}

void
Peer::sendGetPeers()
{
ZoneScoped;
releaseAssert(threadIsMain());
StellarMessage newMsg;
newMsg.type(GET_PEERS);
auto msgPtr = std::make_shared<StellarMessage const>(newMsg);
sendMessage(msgPtr);
}

void
Peer::sendGetScpState(uint32 ledgerSeq)
{
Expand Down Expand Up @@ -624,8 +613,6 @@ Peer::msgSummary(StellarMessage const& msg)
case DONT_HAVE:
return fmt::format(FMT_STRING("DONTHAVE {}:{}"), msg.dontHave().type,
hexAbbrev(msg.dontHave().reqHash));
case GET_PEERS:
return "GETPEERS";
case PEERS:
return fmt::format(FMT_STRING("PEERS {:d}"), msg.peers().size());

Expand Down Expand Up @@ -713,9 +700,6 @@ Peer::sendMessage(std::shared_ptr<StellarMessage const> msg, bool log)
case DONT_HAVE:
mOverlayMetrics.mSendDontHaveMeter.Mark();
break;
case GET_PEERS:
mOverlayMetrics.mSendGetPeersMeter.Mark();
break;
case PEERS:
mOverlayMetrics.mSendPeersMeter.Mark();
break;
Expand Down Expand Up @@ -927,7 +911,6 @@ Peer::recvAuthenticatedMessage(AuthenticatedMessage&& msg)
cat = AUTH_ACTION_QUEUE;
break;
// control messages
case GET_PEERS:
case PEERS:
case ERROR_MSG:
case SEND_MORE:
Expand Down Expand Up @@ -1093,6 +1076,13 @@ Peer::recvRawMessage(std::shared_ptr<CapacityTrackedMessage> msgTracker)
return;
}

if (stellarMsg.type() == PEERS && getRole() == REMOTE_CALLED_US)
{
drop(fmt::format("received {}", stellarMsg.type()),
Peer::DropDirection::WE_DROPPED_REMOTE);
return;
}

releaseAssert(isAuthenticated(guard) || stellarMsg.type() == HELLO ||
stellarMsg.type() == AUTH ||
stellarMsg.type() == ERROR_MSG);
Expand Down Expand Up @@ -1130,13 +1120,6 @@ Peer::recvRawMessage(std::shared_ptr<CapacityTrackedMessage> msgTracker)
}
break;

case GET_PEERS:
{
auto t = mOverlayMetrics.mRecvGetPeersTimer.TimeScope();
recvGetPeers(stellarMsg);
}
break;

case PEERS:
{
auto t = mOverlayMetrics.mRecvPeersTimer.TimeScope();
Expand Down Expand Up @@ -1804,21 +1787,20 @@ Peer::recvAuth(StellarMessage const& msg)
sendGetScpState(low);
}

void
Peer::recvGetPeers(StellarMessage const& msg)
{
ZoneScoped;
releaseAssert(threadIsMain());

sendPeers();
}

void
Peer::recvPeers(StellarMessage const& msg)
{
ZoneScoped;
releaseAssert(threadIsMain());

if (mPeersReceived)
{q
drop(fmt::format("too many msgs {}", msg.type()),
Peer::DropDirection::WE_DROPPED_REMOTE);
return;
}
mPeersReceived = true;

for (auto const& peer : msg.peers())
{
if (peer.port == 0 || peer.port > UINT16_MAX)
Expand Down
4 changes: 1 addition & 3 deletions src/overlay/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class Peer : public std::enable_shared_from_this<Peer>,
std::chrono::milliseconds(1);
static constexpr std::chrono::nanoseconds PEER_METRICS_RATE_UNIT =
std::chrono::seconds(1);
static constexpr uint32_t FIRST_VERSION_REQUIRED_FOR_PROTOCOL_20 = 32;

// The reporting will be based on the previous
// PEER_METRICS_WINDOW_SIZE-second time window.
Expand Down Expand Up @@ -262,6 +261,7 @@ class Peer : public std::enable_shared_from_this<Peer>,
std::shared_ptr<TxAdverts> mTxAdverts;
QueryInfo mQSetQueryInfo;
QueryInfo mTxSetQueryInfo;
bool mPeersReceived{false};

static Hash pingIDfromTimePoint(VirtualClock::time_point const& tp);
void pingPeer();
Expand All @@ -276,7 +276,6 @@ class Peer : public std::enable_shared_from_this<Peer>,
void updatePeerRecordAfterAuthentication();
void recvAuth(StellarMessage const& msg);
void recvDontHave(StellarMessage const& msg);
void recvGetPeers(StellarMessage const& msg);
void recvHello(Hello const& elo);
void recvPeers(StellarMessage const& msg);
void recvSurveyRequestMessage(StellarMessage const& msg);
Expand Down Expand Up @@ -343,7 +342,6 @@ class Peer : public std::enable_shared_from_this<Peer>,
std::string msgSummary(StellarMessage const& stellarMsg);
void sendGetTxSet(uint256 const& setID);
void sendGetQuorumSet(uint256 const& setID);
void sendGetPeers();
void sendGetScpState(uint32 ledgerSeq);
void sendErrorAndDrop(ErrorCode error, std::string const& message);
void sendTxDemand(TxDemandVector&& demands);
Expand Down
1 change: 1 addition & 0 deletions src/overlay/test/LoopbackPeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ class LoopbackPeer : public Peer
using Peer::sendAuth;
using Peer::sendAuthenticatedMessage;
using Peer::sendMessage;
using Peer::sendPeers;

friend class LoopbackPeerConnection;
};
Expand Down
45 changes: 44 additions & 1 deletion src/overlay/test/OverlayTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,49 @@ TEST_CASE("failed auth", "[overlay][connections]")
testutil::shutdownWorkScheduler(*app1);
}

TEST_CASE("peers during auth", "[overlay][connections]")
{
VirtualClock clock;
Config const& cfg1 = getTestConfig(0);
Config const& cfg2 = getTestConfig(1);
auto app1 = createTestApplication(clock, cfg1);
auto app2 = createTestApplication(clock, cfg2);
// Put a peer into Acceptor's DB to trigger sending of peers during auth
app2->getOverlayManager().getPeerManager().ensureExists(
PeerBareAddress{"1.1.1.1", 11625});

LoopbackPeerConnection conn(*app1, *app2);
testutil::crankSome(clock);

REQUIRE(conn.getInitiator()->isAuthenticatedForTesting());
REQUIRE(conn.getAcceptor()->isAuthenticatedForTesting());

StellarMessage newMsg;
newMsg.type(PEERS);
std::string dropReason;
SECTION("inbound")
{
dropReason = "received PEERS";
conn.getInitiator()->sendMessage(
std::make_shared<StellarMessage>(newMsg));
}
SECTION("outbound")
{
dropReason = "too many msgs PEERS";
conn.getAcceptor()->sendMessage(
std::make_shared<StellarMessage>(newMsg));
}

testutil::crankFor(clock, std::chrono::seconds(1));

REQUIRE(!conn.getInitiator()->isConnectedForTesting());
REQUIRE(!conn.getAcceptor()->isConnectedForTesting());
REQUIRE(conn.getAcceptor()->getDropReason() == dropReason);

testutil::shutdownWorkScheduler(*app2);
testutil::shutdownWorkScheduler(*app1);
}

TEST_CASE("outbound queue filtering", "[overlay][connections]")
{
auto networkID = sha256(getTestConfig().NETWORK_PASSPHRASE);
Expand Down Expand Up @@ -1773,7 +1816,7 @@ TEST_CASE("drop peers who straggle", "[overlay][connections][straggler]")
sendTimer.async_wait([straggler](asio::error_code const& error) {
if (!error)
{
straggler->sendGetPeers();
straggler->sendGetTxSet(Hash());
}
});
testutil::crankFor(clock, dur);
Expand Down
10 changes: 5 additions & 5 deletions src/overlay/test/TCPPeerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,20 @@ TEST_CASE("TCPPeer can communicate", "[overlay]")
s->stopOverlayTick();

// Now drop peer, ensure ERROR containing "drop reason" is properly flushed
auto& recvGetPeers =
n1->getOverlayManager().getOverlayMetrics().mRecvGetPeersTimer;
auto& recvGetTxSet =
n1->getOverlayManager().getOverlayMetrics().mRecvGetTxSetTimer;
auto& recvError =
n1->getOverlayManager().getOverlayMetrics().mRecvErrorTimer;
auto prevPeers = recvGetPeers.count();
auto prevTxSet = recvGetTxSet.count();
auto prevError = recvError.count();
p0->sendGetPeers();
p0->sendGetTxSet(Hash());
p0->sendErrorAndDrop(ERR_MISC, "test drop");
s->crankForAtLeast(std::chrono::seconds(1), false);
REQUIRE(!p0->isConnectedForTesting());
REQUIRE(!p1->isConnectedForTesting());

// p1 must have received getPeers, Error
REQUIRE(recvGetPeers.count() == prevPeers + 1);
REQUIRE(recvGetTxSet.count() == prevTxSet + 1);
REQUIRE(recvError.count() == prevError + 1);
s->stopAllNodes();
}
Expand Down
17 changes: 0 additions & 17 deletions src/test/TestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,6 @@ shutdownWorkScheduler(Application& app)
}
}

void
injectSendPeersAndReschedule(VirtualClock::time_point& end, VirtualClock& clock,
VirtualTimer& timer,
LoopbackPeerConnection& connection)
{
connection.getInitiator()->sendGetPeers();
if (clock.now() < end && connection.getInitiator()->isConnectedForTesting())
{
timer.expires_from_now(std::chrono::milliseconds(10));
timer.async_wait(
[&]() {
injectSendPeersAndReschedule(end, clock, timer, connection);
},
&VirtualTimer::onFailureNoop);
}
}

std::vector<Asset>
getInvalidAssets(SecretKey const& issuer)
{
Expand Down
3 changes: 0 additions & 3 deletions src/test/TestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ namespace testutil
{
void crankSome(VirtualClock& clock);
void crankFor(VirtualClock& clock, VirtualClock::duration duration);
void injectSendPeersAndReschedule(VirtualClock::time_point& end,
VirtualClock& clock, VirtualTimer& timer,
LoopbackPeerConnection& connection);

void shutdownWorkScheduler(Application& app);

Expand Down

0 comments on commit cda7a1b

Please sign in to comment.