Skip to content

Commit

Permalink
Adds CATCHUP_SKIP_KNOWN_RESULTS_FOR_TESTING config option to skip app…
Browse files Browse the repository at this point in the history
…lication of failed transactions and signature verification for all transactions during catchup.
  • Loading branch information
ThomasBrady committed Nov 27, 2024
1 parent 21ce88b commit 3780d5b
Show file tree
Hide file tree
Showing 19 changed files with 476 additions and 78 deletions.
83 changes: 82 additions & 1 deletion src/catchup/ApplyCheckpointWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,42 @@ ApplyCheckpointWork::openInputFiles()
mTxIn.open(ti.localPath_nogz());
mTxHistoryEntry = TransactionHistoryEntry();
mHeaderHistoryEntry = LedgerHeaderHistoryEntry();
#ifdef BUILD_TESTS
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS_FOR_TESTING)
{
mTxResultIn = std::make_optional<XDRInputFileStream>();
FileTransferInfo tri(mDownloadDir, FileType::HISTORY_FILE_TYPE_RESULTS,
mCheckpoint);
if (!tri.localPath_nogz().empty() &&
std::filesystem::exists(tri.localPath_nogz()))
{
CLOG_DEBUG(History, "Replaying transaction results from {}",
tri.localPath_nogz());

try
{
mTxResultIn->open(tri.localPath_nogz());
}
catch (std::exception const& e)
{
CLOG_DEBUG(History,
"Failed to open transaction results file: {}. All "
"transactions will be applied.",
e.what());
}
mTxHistoryResultEntry =
std::make_optional<TransactionHistoryResultEntry>();
}
else
{
CLOG_DEBUG(History,
"Results file {} not found for checkpoint {} . All "
"transactions will be applied for this checkpoint.",
tri.localPath_nogz(), mCheckpoint);
mTxHistoryResultEntry = std::nullopt;
}
}
#endif
mFilesOpen = true;
}

Expand Down Expand Up @@ -138,6 +174,43 @@ ApplyCheckpointWork::getCurrentTxSet()
return TxSetXDRFrame::makeEmpty(lm.getLastClosedLedgerHeader());
}

#ifdef BUILD_TESTS
std::optional<TransactionResultSet>
ApplyCheckpointWork::getCurrentTxResultSet()
{
ZoneScoped;
auto& lm = mApp.getLedgerManager();
auto seq = lm.getLastClosedLedgerNum() + 1;
// Check mTxResultSet prior to loading next result set.
// This order is important because it accounts for ledger "gaps"
// in the history archives (which are caused by ledgers with empty tx
// sets, as those are not uploaded).
while (mTxResultIn && mTxResultIn->readOne(*mTxHistoryResultEntry))
{
if (mTxHistoryResultEntry)
{
if (mTxHistoryResultEntry->ledgerSeq < seq)
{
CLOG_DEBUG(History, "Advancing past txresultset for ledger {}",
mTxHistoryResultEntry->ledgerSeq);
}
else if (mTxHistoryResultEntry->ledgerSeq > seq)
{
break;
}
else
{
releaseAssert(mTxHistoryResultEntry->ledgerSeq == seq);
CLOG_DEBUG(History, "Loaded txresultset for ledger {}", seq);
return std::make_optional(mTxHistoryResultEntry->txResultSet);
}
}
}
CLOG_DEBUG(History, "No txresultset for ledger {}", seq);
return std::nullopt;
}
#endif // BUILD_TESTS

std::shared_ptr<LedgerCloseData>
ApplyCheckpointWork::getNextLedgerCloseData()
{
Expand Down Expand Up @@ -216,6 +289,14 @@ ApplyCheckpointWork::getNextLedgerCloseData()
CLOG_DEBUG(History, "Ledger {} has {} transactions", header.ledgerSeq,
txset->sizeTxTotal());

std::optional<TransactionResultSet> txres = std::nullopt;
#ifdef BUILD_TESTS
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS_FOR_TESTING)
{
txres = getCurrentTxResultSet();
}
#endif

// We've verified the ledgerHeader (in the "trusted part of history"
// sense) in CATCHUP_VERIFY phase; we now need to check that the
// txhash we're about to apply is the one denoted by that ledger
Expand Down Expand Up @@ -246,7 +327,7 @@ ApplyCheckpointWork::getNextLedgerCloseData()

return std::make_shared<LedgerCloseData>(
header.ledgerSeq, txset, header.scpValue,
std::make_optional<Hash>(mHeaderHistoryEntry.hash));
std::make_optional<Hash>(mHeaderHistoryEntry.hash), txres);
}

BasicWork::State
Expand Down
31 changes: 21 additions & 10 deletions src/catchup/ApplyCheckpointWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@ class TmpDir;
struct LedgerHeaderHistoryEntry;

/**
* This class is responsible for applying transactions stored in files on
* temporary directory (downloadDir) to local ledger. It requires two sets of
* files - ledgers and transactions - int .xdr format. Transaction files are
* used to read transactions that will be used and ledger files are used to
* This class is responsible for applying transactions stored in files in the
* temporary directory (downloadDir) to local the ledger. It requires two sets
* of files - ledgers and transactions - in .xdr format. Transaction files are
* used to read transactions that will be applied and ledger files are used to
* check if ledger hashes are matching.
*
* It may also require a third set of files - transaction results - to use in
* accelerated replay, where failed transactions are not applied and successful
* transactions are applied without verifying their signatures.
*
* In each run it skips or applies transactions from one ledger. Skipping occurs
* when ledger to be applied is older than LCL from local ledger. At LCL
* boundary checks are made to confirm that ledgers from files knit up with
* LCL. If everything is OK, an apply ledger operation is performed. Then
* another check is made - if new local ledger matches corresponding ledger from
* file.
* when the ledger to be applied is older than the LCL of the local ledger. At
* LCL, boundary checks are made to confirm that the ledgers from the files knit
* up with LCL. If everything is OK, an apply ledger operation is performed.
* Then another check is made - if the new local ledger matches corresponding
* the ledger from file.
*
* Constructor of this class takes some important parameters:
* The constructor of this class takes some important parameters:
* * downloadDir - directory containing ledger and transaction files
* * range - LedgerRange to apply, must be checkpoint-aligned,
* and cover at most one checkpoint.
Expand All @@ -49,6 +53,10 @@ class ApplyCheckpointWork : public BasicWork
XDRInputFileStream mHdrIn;
XDRInputFileStream mTxIn;
TransactionHistoryEntry mTxHistoryEntry;
#ifdef BUILD_TESTS
std::optional<XDRInputFileStream> mTxResultIn;
std::optional<TransactionHistoryResultEntry> mTxHistoryResultEntry;
#endif // BUILD_TESTS
LedgerHeaderHistoryEntry mHeaderHistoryEntry;
OnFailureCallback mOnFailure;

Expand All @@ -57,6 +65,9 @@ class ApplyCheckpointWork : public BasicWork
std::shared_ptr<ConditionalWork> mConditionalWork;

TxSetXDRFrameConstPtr getCurrentTxSet();
#ifdef BUILD_TESTS
std::optional<TransactionResultSet> getCurrentTxResultSet();
#endif // BUILD_TESTS
void openInputFiles();

std::shared_ptr<LedgerCloseData> getNextLedgerCloseData();
Expand Down
15 changes: 8 additions & 7 deletions src/catchup/CatchupConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace stellar
{

// Each catchup can be configured by two parameters destination ledger
// Each catchup can be configured by two parameters: destination ledger
// (and its hash, if known) and count of ledgers to apply.
// Value of count can be adjusted in different ways during catchup. If applying
// count ledgers would mean going before the last closed ledger - it is
Expand All @@ -31,12 +31,13 @@ namespace stellar
// and catchup to that instead of destination ledger. This is useful when
// doing offline commandline catchups with stellar-core catchup command.
//
// Catchup can be done in two modes - ONLINE nad OFFLINE. In ONLINE mode node
// is connected to the network. If receives ledgers during catchup and applies
// them after history is applied. Also additional closing ledger is required
// to mark catchup as complete and node as synced. In OFFLINE mode node is not
// connected to network, so new ledgers are not being externalized. Only
// buckets and transactions from history archives are applied.
// Catchup can be done in two modes - ONLINE and OFFLINE. In ONLINE mode, the
// node is connected to the network. It receives ledgers during catchup and
// applies them after history is applied. Also, an additional closing ledger is
// required to mark catchup as complete and the node as synced. In OFFLINE mode,
// the node is not connected to network, so new ledgers are not being
// externalized. Only buckets and transactions from history archives are
// applied.
class CatchupConfiguration
{
public:
Expand Down
14 changes: 7 additions & 7 deletions src/catchup/CatchupWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ using WorkSeqPtr = std::shared_ptr<WorkSequence>;

// CatchupWork does all the necessary work to perform any type of catchup.
// It accepts CatchupConfiguration structure to know from which ledger to which
// one do the catchup and if it involves only applying ledgers or ledgers and
// one to do the catchup and if it involves only applying ledgers or ledgers and
// buckets.
//
// First thing it does is to get a history state which allows to calculate
// proper destination ledger (in case CatchupConfiguration::CURRENT) was used
// and to get list of buckets that should be in database on that ledger.
// First, it gets a history state, which allows it to calculate a
// proper destination ledger (in case CatchupConfiguration::CURRENT)
// and get a list of buckets that should be in the database on that ledger.
//
// Next step is downloading and verifying ledgers (if verifyMode is set to
// VERIFY_BUFFERED_LEDGERS it can also verify against ledgers currently
// Next, it downloads and verifies ledgers (if verifyMode is set to
// VERIFY_BUFFERED_LEDGERS, it can also verify against ledgers currently
// buffered in LedgerManager).
//
// Then, depending on configuration, it can download, verify and apply buckets
// (as in MINIMAL and RECENT catchups), and then download and apply
// transactions (as in COMPLETE and RECENT catchups).
//
// After that, catchup is done and node can replay buffered ledgers and take
// After that, catchup is done and the node can replay buffered ledgers and take
// part in consensus protocol.

class CatchupWork : public Work
Expand Down
90 changes: 71 additions & 19 deletions src/catchup/DownloadApplyTxsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ DownloadApplyTxsWork::yieldMoreWork()
{
throw std::runtime_error("Work has no more children to iterate over!");
}

CLOG_INFO(History,
"Downloading, unzipping and applying {} for checkpoint {}",
typeString(FileType::HISTORY_FILE_TYPE_TRANSACTIONS),
Expand Down Expand Up @@ -80,6 +79,53 @@ DownloadApplyTxsWork::yieldMoreWork()
mApp, mDownloadDir, LedgerRange::inclusive(low, high), cb);

std::vector<std::shared_ptr<BasicWork>> seq{getAndUnzip};
std::vector<FileTransferInfo> filesToTransfer{ft};
std::vector<std::shared_ptr<BasicWork>> optionalDownloads;
#ifdef BUILD_TESTS
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS_FOR_TESTING)
{
CLOG_INFO(History,
"Downloading, unzipping and applying {} for checkpoint {}",
typeString(FileType::HISTORY_FILE_TYPE_RESULTS),
mCheckpointToQueue);

FileTransferInfo resultsFile(mDownloadDir,
FileType::HISTORY_FILE_TYPE_RESULTS,
mCheckpointToQueue);
auto getResultsWork = std::make_shared<GetAndUnzipRemoteFileWork>(
mApp, resultsFile, mArchive, /*logErrorOnFailure=*/false);
std::weak_ptr<GetAndUnzipRemoteFileWork> getResultsWorkWeak =
getResultsWork;
seq.emplace_back(getResultsWork);
seq.emplace_back(std::make_shared<WorkWithCallback>(
mApp, "get-results-" + std::to_string(mCheckpointToQueue),
[apply, getResultsWorkWeak, checkpoint, &dir](Application& app) {
auto getResults = getResultsWorkWeak.lock();
if (getResults && getResults->getState() != State::WORK_SUCCESS)
{
auto archive = getResults->getArchive();
if (archive)
{
FileTransferInfo ti(dir,
FileType::HISTORY_FILE_TYPE_RESULTS,
checkpoint);
CLOG_WARNING(
History,
"Archive {} maybe contains corrupt results file "
"{}. "
"This is not fatal as long as the archive contains "
"valid transaction history. Catchup will proceed "
"but"
"the node will not be able to skip known results.",
archive->getName(), ti.remoteName());
}
}
return true;
}));

filesToTransfer.push_back(resultsFile);
}
#endif // BUILD_TESTS

auto maybeWaitForMerges = [](Application& app) {
if (app.getConfig().CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING)
Expand Down Expand Up @@ -139,28 +185,34 @@ DownloadApplyTxsWork::yieldMoreWork()
mApp, "wait-merges" + apply->getName(), maybeWaitForMerges, apply));
}

seq.push_back(std::make_shared<WorkWithCallback>(
mApp, "delete-transactions-" + std::to_string(mCheckpointToQueue),
[ft](Application& app) {
try
{
std::filesystem::remove(
std::filesystem::path(ft.localPath_nogz()));
CLOG_DEBUG(History, "Deleted transactions {}",
for (auto const& ft : filesToTransfer)
{
auto deleteWorkName = "delete-" + ft.getTypeString() + "-" +
std::to_string(mCheckpointToQueue);
seq.push_back(std::make_shared<WorkWithCallback>(
mApp, deleteWorkName, [ft](Application& app) {
CLOG_DEBUG(History, "Deleting {} {}", ft.getTypeString(),
ft.localPath_nogz());
try
{
std::filesystem::remove(
std::filesystem::path(ft.localPath_nogz()));
CLOG_DEBUG(History, "Deleted {} {}", ft.getTypeString(),
ft.localPath_nogz());
}
catch (std::filesystem::filesystem_error const& e)
{
CLOG_ERROR(History, "Could not delete {} {}: {}",
ft.getTypeString(), ft.localPath_nogz(),
e.what());
return false;
}
return true;
}
catch (std::filesystem::filesystem_error const& e)
{
CLOG_ERROR(History, "Could not delete transactions {}: {}",
ft.localPath_nogz(), e.what());
return false;
}
}));

}));
}
auto nextWork = std::make_shared<WorkSequence>(
mApp, "download-apply-" + std::to_string(mCheckpointToQueue), seq,
BasicWork::RETRY_NEVER);
BasicWork::RETRY_NEVER, true /*stop at first failure*/);
mCheckpointToQueue += mApp.getHistoryManager().getCheckpointFrequency();
mLastYieldedWork = nextWork;
return nextWork;
Expand Down
15 changes: 15 additions & 0 deletions src/herder/LedgerCloseData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ LedgerCloseData::LedgerCloseData(uint32_t ledgerSeq,
releaseAssert(txSet->getContentsHash() == mValue.txSetHash);
}

#ifdef BUILD_TESTS
LedgerCloseData::LedgerCloseData(
uint32_t ledgerSeq, TxSetXDRFrameConstPtr txSet, StellarValue const& v,
std::optional<Hash> const& expectedLedgerHash,
std::optional<TransactionResultSet> const& expectedResults)
: mLedgerSeq(ledgerSeq)
, mTxSet(txSet)
, mValue(v)
, mExpectedLedgerHash(expectedLedgerHash)
, mExpectedResults(expectedResults)
{
releaseAssert(txSet->getContentsHash() == mValue.txSetHash);
}
#endif // BUILD_TESTS

std::string
stellarValueToString(Config const& c, StellarValue const& sv)
{
Expand Down
Loading

0 comments on commit 3780d5b

Please sign in to comment.