Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce peer message traffic for ledger data #5126

Open
wants to merge 22 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f0cf1fd
Log the caller / reason for server state changes:
ximinez Sep 2, 2024
756cad9
Class "CanProcess" to keep track of processing of distinct items
ximinez Aug 29, 2024
8a17f16
Drop duplicate outgoing TMGetLedger messages per peer:
ximinez Aug 13, 2024
226cb56
Drop duplicate incoming TMGetLedger messages per peer:
ximinez Aug 14, 2024
d5ec2d3
Drop duplicate incoming TMLedgerData messages:
vlntb Aug 14, 2024
ecfa396
Collapse multiple outgoing TMLedgerData messages with cookies into one
ximinez Aug 21, 2024
e490e57
Improve logging related to ledger acquisition
ximinez Sep 5, 2024
30eee9b
Review feedback from @Bronek:
ximinez Sep 25, 2024
978fecd
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Sep 30, 2024
b8b7b31
[FOLD] Review feedback from @vlntb:
ximinez Oct 11, 2024
43b6e3e
[FOLD] Fix typo in unit test:
ximinez Oct 14, 2024
813ef05
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Oct 15, 2024
89f5a67
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Oct 18, 2024
d7e2d70
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Oct 31, 2024
29de22e
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Nov 4, 2024
e250086
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Nov 5, 2024
da4a30c
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Nov 8, 2024
eee8184
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Nov 13, 2024
85dcf70
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Nov 13, 2024
e23be8e
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Nov 27, 2024
75fcca5
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Dec 3, 2024
20ad383
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Builds/levelization/results/loops.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Loop: xrpld.app xrpld.net
xrpld.app > xrpld.net

Loop: xrpld.app xrpld.overlay
xrpld.overlay == xrpld.app
xrpld.overlay ~= xrpld.app

Loop: xrpld.app xrpld.peerfinder
xrpld.app > xrpld.peerfinder
Expand Down
106 changes: 106 additions & 0 deletions include/xrpl/basics/CanProcess.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.

Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs #include <mutex> for std::unique_lock

Suggested change
#include <mutex>

/** RAII class to check if an Item is already being processed on another thread,
* as indicated by it's presence in a Collection.
*
* If the Item is not in the Collection, it will be added under lock in the
* ctor, and removed under lock in the dtor. The object will be considered
* "usable" and evaluate to `true`.
*
* If the Item is in the Collection, no changes will be made to the collection,
* and the CanProcess object will be considered "unusable".
*
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
* Process or skip a block of code, or set a flag.)
*
* The current use is to avoid lock contention that would be involved in
* processing something associated with the Item.
*
* Examples:
*
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
* {
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
* {
* acquire(hash, ...);
* }
* }
*
* bool
* NetworkOPsImp::recvValidation(
* std::shared_ptr<STValidation> const& val,
* std::string const& source)
* {
* CanProcess check(
* validationsMutex_, pendingValidations_, val->getLedgerHash());
* BypassAccept bypassAccept =
* check.canProcess() ? BypassAccept::no : BypassAccept::yes;
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
* }
*
*/
template <class Mutex, class Collection, class Item>
class CanProcess
{
public:
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
: mtx_(mtx), collection_(collection), item_(item), canProcess_(insert())
{
}

~CanProcess()
{
if (canProcess_)
{
std::unique_lock<Mutex> lock_(mtx_);
collection_.erase(item_);
Copy link
Collaborator

@Bronek Bronek Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about replacing the (likely) O(log N) element lookup with O(1) erase of an iterator ? The iterator which is returned from insert to be specific, and currently ignored. In this case the Item wouldn't have to be stored inside CanProcess object, so that's also one less template parameter and probably also smaller object size (hashes are larger than iterators I guess).

Could even go one step further and replace all data members with std::function<void()> cleanup_ which would capture all that it needs if insert succeeded, or is empty if insert failed. In this case no template parameters would be needed at all.

}
}

bool
canProcess() const

Check warning on line 81 in include/xrpl/basics/CanProcess.h

View check run for this annotation

Codecov / codecov/patch

include/xrpl/basics/CanProcess.h#L81

Added line #L81 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that it is not covered by tests, perhaps we do not need this function ?

{
return canProcess_;

Check warning on line 83 in include/xrpl/basics/CanProcess.h

View check run for this annotation

Codecov / codecov/patch

include/xrpl/basics/CanProcess.h#L83

Added line #L83 was not covered by tests
}

operator bool() const
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use explicit here; it would be consistent with std::optional and most other operator bool inside the project.

{
return canProcess_;
}

private:
bool
insert()
{
std::unique_lock<Mutex> lock_(mtx_);
auto const [_, inserted] = collection_.insert(item_);
return inserted;
}

Mutex& mtx_;
Collection& collection_;
Item const item_;
bool const canProcess_;
};

#endif
7 changes: 7 additions & 0 deletions include/xrpl/basics/base_uint.h
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,13 @@ to_string(base_uint<Bits, Tag> const& a)
return strHex(a.cbegin(), a.cend());
}

template <std::size_t Bits, class Tag>
inline std::string
to_short_string(base_uint<Bits, Tag> const& a)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit]: Adding checks for the to_short_string in the base_unit_test next to existing to_string cases would be good.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't call that a nit. Missing test coverage is pretty significant. Thanks for catching it. Fixed.

{
return strHex(a.cbegin(), a.cend()).substr(0, 8) + "...";
}

template <std::size_t Bits, class Tag>
inline std::ostream&
operator<<(std::ostream& out, base_uint<Bits, Tag> const& u)
Expand Down
10 changes: 10 additions & 0 deletions include/xrpl/proto/ripple.proto
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,18 @@ message TMLedgerData
required uint32 ledgerSeq = 2;
required TMLedgerInfoType type = 3;
repeated TMLedgerNode nodes = 4;
// If the peer supports "responseCookies", this field will
// never be populated.
optional uint32 requestCookie = 5;
optional TMReplyError error = 6;
// The old field is called "requestCookie", but this is
// a response, so this name makes more sense
repeated uint32 responseCookies = 7;
// If a TMGetLedger request was received without a "requestCookie",
// and the peer supports it, this flag will be set to true to
// indicate that the receiver should process the result in addition
// to forwarding it to its "responseCookies" peers.
optional bool directResponse = 8;
}

message TMPing
Expand Down
2 changes: 2 additions & 0 deletions include/xrpl/protocol/LedgerHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ struct LedgerHeader

// If validated is false, it means "not yet validated."
// Once validated is true, it will never be set false at a later time.
// NOTE: If you are accessing this directly, you are probably doing it
// wrong. Use LedgerMaster::isValidated().
// VFALCO TODO Make this not mutable
bool mutable validated = false;
bool accepted = false;
Expand Down
28 changes: 28 additions & 0 deletions src/test/app/HashRouter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,33 @@ class HashRouter_test : public beast::unit_test::suite
BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
}

void
testProcessPeer()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, 5s);
uint256 const key(1);
HashRouter::PeerShortID peer1 = 1;
HashRouter::PeerShortID peer2 = 2;
auto const timeout = 2s;

BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
++stopwatch;
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
}

public:
void
run() override
Expand All @@ -252,6 +279,7 @@ class HashRouter_test : public beast::unit_test::suite
testSetFlags();
testRelay();
testProcess();
testProcessPeer();
}
};

Expand Down
5 changes: 5 additions & 0 deletions src/test/app/LedgerReplay_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ class TestPeer : public Peer
{
return false;
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}

bool ledgerReplayEnabled_;
PublicKey nodePublicKey_;
Expand Down
5 changes: 5 additions & 0 deletions src/test/basics/base_uint_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ struct base_uint_test : beast::unit_test::suite
uset.insert(u);
BEAST_EXPECT(raw.size() == u.size());
BEAST_EXPECT(to_string(u) == "0102030405060708090A0B0C");
BEAST_EXPECT(to_short_string(u) == "01020304...");
BEAST_EXPECT(*u.data() == 1);
BEAST_EXPECT(u.signum() == 1);
BEAST_EXPECT(!!u);
Expand All @@ -173,6 +174,7 @@ struct base_uint_test : beast::unit_test::suite
test96 v{~u};
uset.insert(v);
BEAST_EXPECT(to_string(v) == "FEFDFCFBFAF9F8F7F6F5F4F3");
BEAST_EXPECT(to_short_string(v) == "FEFDFCFB...");
BEAST_EXPECT(*v.data() == 0xfe);
BEAST_EXPECT(v.signum() == 1);
BEAST_EXPECT(!!v);
Expand All @@ -193,6 +195,7 @@ struct base_uint_test : beast::unit_test::suite
test96 z{beast::zero};
uset.insert(z);
BEAST_EXPECT(to_string(z) == "000000000000000000000000");
BEAST_EXPECT(to_short_string(z) == "00000000...");
BEAST_EXPECT(*z.data() == 0);
BEAST_EXPECT(*z.begin() == 0);
BEAST_EXPECT(*std::prev(z.end(), 1) == 0);
Expand All @@ -213,6 +216,7 @@ struct base_uint_test : beast::unit_test::suite
BEAST_EXPECT(n == z);
n--;
BEAST_EXPECT(to_string(n) == "FFFFFFFFFFFFFFFFFFFFFFFF");
BEAST_EXPECT(to_short_string(n) == "FFFFFFFF...");
n = beast::zero;
BEAST_EXPECT(n == z);

Expand All @@ -223,6 +227,7 @@ struct base_uint_test : beast::unit_test::suite
test96 x{zm1 ^ zp1};
uset.insert(x);
BEAST_EXPECTS(to_string(x) == "FFFFFFFFFFFFFFFFFFFFFFFE", to_string(x));
BEAST_EXPECTS(to_short_string(x) == "FFFFFFFF...", to_short_string(x));

BEAST_EXPECT(uset.size() == 4);

Expand Down
4 changes: 2 additions & 2 deletions src/test/overlay/ProtocolVersion_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ class ProtocolVersion_test : public beast::unit_test::suite
negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2));
BEAST_EXPECT(
negotiateProtocolVersion(
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") ==
make_protocol(2, 2));
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") ==
make_protocol(2, 3));
BEAST_EXPECT(
negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") ==
std::nullopt);
Expand Down
5 changes: 5 additions & 0 deletions src/test/overlay/reduce_relay_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ class PeerPartial : public Peer
removeTxQueue(const uint256&) override
{
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}
};

/** Manually advanced clock. */
Expand Down
3 changes: 2 additions & 1 deletion src/xrpld/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,8 @@
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{
if (!positions && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED);
app_.getOPs().setMode(

Check warning on line 1077 in src/xrpld/app/consensus/RCLConsensus.cpp

View check run for this annotation

Codecov / codecov/patch

src/xrpld/app/consensus/RCLConsensus.cpp#L1077

Added line #L1077 was not covered by tests
OperatingMode::CONNECTED, "updateOperatingMode: no positions");
}

void
Expand Down
18 changes: 18 additions & 0 deletions src/xrpld/app/ledger/InboundLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,24 @@
std::unique_ptr<PeerSet> mPeerSet;
};

inline std::string
to_string(InboundLedger::Reason reason)
{
using enum InboundLedger::Reason;
switch (reason)
{
case HISTORY:

Check warning on line 205 in src/xrpld/app/ledger/InboundLedger.h

View check run for this annotation

Codecov / codecov/patch

src/xrpld/app/ledger/InboundLedger.h#L205

Added line #L205 was not covered by tests
return "HISTORY";
case GENERIC:
return "GENERIC";
case CONSENSUS:
return "CONSENSUS";
default:
assert(false);

Check warning on line 212 in src/xrpld/app/ledger/InboundLedger.h

View check run for this annotation

Codecov / codecov/patch

src/xrpld/app/ledger/InboundLedger.h#L211-L212

Added lines #L211 - L212 were not covered by tests
return "unknown";
}
}

} // namespace ripple

#endif
21 changes: 15 additions & 6 deletions src/xrpld/app/ledger/detail/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,14 @@

if (!wasProgress)
{
checkLocal();
if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
assert(isDone());
JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;

Check warning on line 401 in src/xrpld/app/ledger/detail/InboundLedger.cpp

View check run for this annotation

Codecov / codecov/patch

src/xrpld/app/ledger/detail/InboundLedger.cpp#L401

Added line #L401 was not covered by tests
}

mByHash = true;

Expand Down Expand Up @@ -502,15 +509,17 @@

if (auto stream = journal_.debug())
{
stream << "Trigger acquiring ledger " << hash_;
std::stringstream ss;
ss << "Trigger acquiring ledger " << hash_;
if (peer)
stream << " from " << peer;
ss << " from " << peer;

if (complete_ || failed_)
stream << "complete=" << complete_ << " failed=" << failed_;
ss << " complete=" << complete_ << " failed=" << failed_;
else
stream << "header=" << mHaveHeader << " tx=" << mHaveTransactions
<< " as=" << mHaveState;
ss << " header=" << mHaveHeader << " tx=" << mHaveTransactions
<< " as=" << mHaveState;
stream << ss.str();
}

if (!mHaveHeader)
Expand Down
Loading
Loading