Skip to content

Commit

Permalink
Nominator: Nominate TX set hashes
Browse files Browse the repository at this point in the history
This will help scaling to higher TX counts easier. Envelopes wont
grow when nominated TX count increases.
  • Loading branch information
omerfirmak authored and mkykadir committed Mar 23, 2022
1 parent b8521a6 commit 944d453
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 40 deletions.
14 changes: 14 additions & 0 deletions source/agora/api/Validator.d
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ public interface API : agora.api.FullNode.API

public void postBlockSignature (ValidatorBlockSig block_sig);

/***************************************************************************
Set of TX hashes represented by the given hash
Params:
hash = hash of TX set
API:
GET /tx_set
***************************************************************************/

public Hash[] getTxSet (Hash hash);

/// Interfaces that are only enabled in debug builds
debug (AgoraDebugAPIs)
{
Expand Down
37 changes: 28 additions & 9 deletions source/agora/consensus/Ledger.d
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class NodeLedger : Ledger
NoUTXO = "Couldn't find UTXO for one or more Enrollment",
NotInPool = "Transaction is not in the pool",
MisMatchingCoinbase = "Missing matching Coinbase transaction",
UnknownTxSet = "Transaction set is unknown",
}

/// A delegate to be called when a block was externalized (unless `null`)
Expand Down Expand Up @@ -502,6 +503,9 @@ public class ValidatingLedger : NodeLedger
/// Hashes of Values we fully validated for a slot
private Set!Hash fully_validated_value;

/// Nominated TX sets
public Hash[][Hash] nominated_tx_sets;

/// See parent class
public this (immutable(ConsensusParams) params,
ManagedDatabase database, IBlockStorage storage,
Expand Down Expand Up @@ -530,14 +534,16 @@ public class ValidatingLedger : NodeLedger
auto utxo_finder = this.utxo_set.getUTXOFinder();
data.enrolls = this.getCandidateEnrollments(next_height, utxo_finder);
data.missing_validators = this.getCandidateMissingValidators(next_height, utxo_finder);
data.tx_set = this.getCandidateTransactions(next_height, utxo_finder);
auto tx_set = this.getCandidateTransactions(next_height, utxo_finder);
if (this.isCoinbaseBlock(next_height))
{
auto coinbase_tx = this.getCoinbaseTX(next_height);
auto coinbase_hash = coinbase_tx.hashFull();
log.info("prepareNominatingSet: Coinbase hash={}, tx={}", coinbase_hash, coinbase_tx.prettify);
data.tx_set ~= coinbase_hash;
tx_set ~= coinbase_hash;
}
data.tx_set = hashFull(tx_set);
this.nominated_tx_sets[data.tx_set] = tx_set;
}

/// Validate slashing data, including checking if the node is slef slashing
Expand Down Expand Up @@ -611,7 +617,10 @@ public class ValidatingLedger : NodeLedger
{
if (auto err = super.acceptBlock(block))
return err;
() @trusted { this.fully_validated_value.clear(); }();
() @trusted {
this.fully_validated_value.clear();
this.nominated_tx_sets.clear();
}();
return null;
}

Expand All @@ -638,7 +647,7 @@ public class ValidatingLedger : NodeLedger
auto coinbase_tx_hash = coinbase_tx.hashFull();
auto not_cb_filter = (Hash h) => coinbase_tx == Transaction.init || h != coinbase_tx_hash;

foreach (const ref tx_hash; data.tx_set)
foreach (const ref tx_hash; this.nominated_tx_sets[data.tx_set])
{
if (!not_cb_filter(tx_hash))
continue;
Expand All @@ -655,18 +664,22 @@ public class ValidatingLedger : NodeLedger
/// Ditto
public string isValidTXSet (in ConsensusData data) @safe nothrow
{
auto tx_set = data.tx_set in this.nominated_tx_sets;
if (tx_set is null)
return InvalidConsensusDataReason.UnknownTxSet;

auto coinbase_tx = this.getCoinbaseTX(this.height() + 1);
Hash coinbase_tx_hash = coinbase_tx.hashFull();
if (coinbase_tx != Transaction.init)
{
log.trace("isValidTxSet: Coinbase hash={}, tx={}", coinbase_tx_hash, coinbase_tx.prettify);
if (!data.tx_set.canFind(coinbase_tx_hash))
if (!(*tx_set).canFind(coinbase_tx_hash))
return InvalidConsensusDataReason.MisMatchingCoinbase;
}

auto enrolled_utxos = Set!Hash.from(data.enrolls.map!(enroll => enroll.utxo_key));
auto not_cb_filter = (Hash h) => coinbase_tx == Transaction.init || h != coinbase_tx_hash;
if (auto reason = this.pool.isValidTxSet(data.tx_set.filter!(not_cb_filter), enrolled_utxos))
if (auto reason = this.pool.isValidTxSet((*tx_set).filter!(not_cb_filter), enrolled_utxos))
return reason;

return null;
Expand Down Expand Up @@ -1522,10 +1535,14 @@ unittest
if (height >= 2 * testPayoutPeriod && height % testPayoutPeriod == 0)
{
// Remove the coinbase TX
data.tx_set = data.tx_set[0 .. $ - 1];
auto tx_set = ledger.nominated_tx_sets[data.tx_set][0 .. $ - 1];
ledger.nominated_tx_sets[tx_set.hashFull] = tx_set;
data.tx_set = tx_set.hashFull;
assert(ledger.validateConsensusData(data, skip_indexes) == "Missing matching Coinbase transaction");
// Add different hash to tx_set
data.tx_set ~= "Not Coinbase tx".hashFull();
tx_set ~= "Not Coinbase tx".hashFull();
ledger.nominated_tx_sets[tx_set.hashFull] = tx_set;
data.tx_set = tx_set.hashFull;
assert(ledger.validateConsensusData(data, skip_indexes) == "Missing matching Coinbase transaction");
}

Expand Down Expand Up @@ -1678,7 +1695,7 @@ unittest

ConsensusData data;
ledger.prepareNominatingSet(data);
assert(data.tx_set.canFind(melting_tx.hashFull()));
assert(ledger.nominated_tx_sets[data.tx_set].canFind(melting_tx.hashFull()));
assert(ledger.validateConsensusData(data, []) is null);

// can't enroll and spend the stake at the same height
Expand Down Expand Up @@ -1716,6 +1733,7 @@ unittest
ledger.simulatePreimages(Height(params.ValidatorCycle));

auto expected_height = 0;
ledger.nominated_tx_sets[Hash.init] = [];
// if less than 4 out of 6 validators sign it should not externalize
iota(4).each!((signed)
{
Expand All @@ -1727,6 +1745,7 @@ unittest
// if at least 4 out of 6 validators sign it should externalize
iota(4, 7).each!((signed)
{
ledger.nominated_tx_sets[Hash.init] = [];
assert(!ledger.externalize(ConsensusData.init, signed));
assert(ledger.height == ++expected_height);
});
Expand Down
4 changes: 2 additions & 2 deletions source/agora/consensus/protocol/Data.d
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import agora.crypto.Hash;
public struct ConsensusData
{
/// The transaction set that is being nominated / voted on
public Hash[] tx_set;
public Hash tx_set;

/// The enrollments that are being nominated / voted on
public Enrollment[] enrolls;
Expand Down Expand Up @@ -60,7 +60,7 @@ unittest

const(ConsensusData) data =
{
tx_set: GenesisBlock.txs.map!(tx => tx.hashFull()).array,
tx_set: GenesisBlock.txs.map!(tx => tx.hashFull()).array.hashFull,
enrolls: [ record, record, ],
missing_validators : [ 1, 3, 5 ]
};
Expand Down
41 changes: 29 additions & 12 deletions source/agora/consensus/protocol/Nominator.d
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,7 @@ extern(D):
}
}

Set!Hash missing_sets;
Set!Hash missing_txs;
auto values = Slot.getStatementValues(envelope.statement);
foreach (value; values)
Expand All @@ -768,12 +769,16 @@ extern(D):
catch (Exception ex)
return;

auto missing = this.ledger.getUnknownTXsFromSet(Set!Hash.from(data.tx_set));
missing.each!(hash => missing_txs.put(hash));
if (auto tx_set = data.tx_set in this.ledger.nominated_tx_sets)
this.ledger.getUnknownTXsFromSet(Set!Hash.from(*tx_set))
.each!(hash => missing_txs.put(hash));
else
missing_sets.put(data.tx_set);
}

auto shared_env = this.wrapEnvelope(envelope);
if (missing_txs.length > 0 && this.handleMissingTxEnvelope(shared_env, missing_txs, utxo))
if ((missing_sets.length > 0 || missing_txs.length > 0)
&& this.handleMissingTxEnvelope(shared_env, missing_sets, missing_txs, utxo))
return;

if (this.scp.receiveEnvelope(shared_env) != SCP.EnvelopeState.VALID)
Expand All @@ -788,6 +793,7 @@ extern(D):
Params:
shared_env = the SCP envelope
missing_sets = Set of unknown TX sets
missing_txs = Set of unknown TX hashes
peer_utxo = stake of the peer that the envelope belongs to
Expand All @@ -796,19 +802,35 @@ extern(D):
***************************************************************************/

private bool handleMissingTxEnvelope (SCPEnvelopeWrapperPtr shared_env, Set!Hash missing_txs, Hash peer_utxo) @trusted
private bool handleMissingTxEnvelope (SCPEnvelopeWrapperPtr shared_env,
Set!Hash missing_sets, Set!Hash missing_txs, Hash peer_utxo) @trusted
{
auto peer = this.network.getPeerByStake(peer_utxo);
if (!peer)
return false;

log.trace("Missing {} TXs while handling envelope from {} for slot {}", missing_txs.length,
shared_env.getEnvelope().statement.nodeID, shared_env.getEnvelope().statement.slotIndex);
log.trace("Missing {} sets, {} TXs while handling envelope from {} for slot {}", missing_sets.length,
missing_txs.length, shared_env.getEnvelope().statement.nodeID, shared_env.getEnvelope().statement.slotIndex);

auto heap_env = new SCPEnvelopeWrapperPtr(shared_env);
this.taskman.runTask({
try
{
foreach (set_hash; missing_sets)
{
try
{
auto tx_set = peer.getTxSet(set_hash);
if (tx_set.hashFull() != set_hash)
continue;
this.ledger.nominated_tx_sets[set_hash] = tx_set;
this.ledger.getUnknownTXsFromSet(Set!Hash.from(tx_set))
.each!(hash => missing_txs.put(hash));
}
catch (Exception e)
continue;
}

Transaction[] txs;
do
{
Expand Down Expand Up @@ -1427,11 +1449,6 @@ extern(D):
else if (this.total_rate < other.total_rate)
return 1;

if (this.consensus_data.tx_set.length > other.consensus_data.tx_set.length)
return -1;
else if (this.consensus_data.tx_set.length < other.consensus_data.tx_set.length)
return 1;

if (this.hash > other.hash)
return -1;
else if (this.hash < other.hash)
Expand Down Expand Up @@ -1505,7 +1522,7 @@ extern(D):
log.trace("Consensus data: {}", data.prettify);

Amount total_rate;
foreach (const ref tx_hash; data.tx_set)
foreach (const ref tx_hash; this.ledger.nominated_tx_sets[data.tx_set])
{
Amount rate;
auto errormsg = this.ledger.getTxFeeRate(tx_hash, rate);
Expand Down
17 changes: 17 additions & 0 deletions source/agora/network/Client.d
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,23 @@ public class NetworkClient
return this.attemptRequest!(API.getPreimages, Throw.No)(enroll_keys);
}

/***************************************************************************
Set of TX hashes represented by the given hash
Params:
hash = hash of TX set
API:
GET /tx_set
***************************************************************************/

public Hash[] getTxSet (Hash hash) @trusted nothrow
{
return this.attemptRequest!(API.getTxSet, Throw.No)(hash);
}

/***************************************************************************
Attempt a request up to 'this.max_retries' attempts, and make the task
Expand Down
19 changes: 19 additions & 0 deletions source/agora/node/Validator.d
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,25 @@ public class Validator : FullNode, API
}
}

/***************************************************************************
Set of TX hashes represented by the given hash
Params:
hash = hash of TX set
API:
GET /tx_set
***************************************************************************/

public override Hash[] getTxSet (Hash hash) @safe
{
if (auto tx_set = hash in (cast(ValidatingLedger) this.ledger).nominated_tx_sets)
return *tx_set;
throw new Exception("Unknown TX Set");
}

debug (AgoraDebugAPIs)
{
/***************************************************************************
Expand Down
6 changes: 6 additions & 0 deletions source/agora/test/Base.d
Original file line number Diff line number Diff line change
Expand Up @@ -1961,6 +1961,12 @@ public class TestFullNode : FullNode, TestAPI
{
assert(0);
}

/// ditto
public override Hash[] getTxSet (Hash hash) @safe
{
assert(0);
}
}

/// A Validator which also implements test routines in TestAPI
Expand Down
12 changes: 10 additions & 2 deletions source/agora/test/FutureEnrollment.d
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ version (unittest):
import agora.test.Base;

import core.atomic : atomicLoad;
import scpd.types.Stellar_SCP;

/// Situation: A delayed validator is a block behind the latest height(19)
/// where other nodes have a block that contains a frozen UTXO for
Expand All @@ -44,6 +45,13 @@ unittest
super(args);
}

///
public override void postEnvelope (SCPEnvelope envelope) @safe
{
if (atomicLoad(*this.enable_catchup))
this.nominator.receiveEnvelope(envelope);
}

///
protected override void catchupTask () nothrow
{
Expand All @@ -58,7 +66,7 @@ unittest
{
mixin ForwardCtor!();

public static shared bool enable_catchup = false;
public static shared bool enable_catchup = true;

/// set base class
public override void createNewNode (Config conf, string file, int line)
Expand Down Expand Up @@ -95,7 +103,7 @@ unittest
network.postAndEnsureTxInPool(network.freezeUTXO(only(GenesisValidators)));

// the delayed validator becomes unresponsive
network.clients[delayed_node].filter!(API.postTransaction);
CustomAPIManager.enable_catchup = false;

// Block 19 we add the frozen utxo for the outsider validator
network.generateBlocks(iota(1, GenesisValidators), Height(GenesisValidatorCycle - 1));
Expand Down
6 changes: 3 additions & 3 deletions source/agora/utils/PrettyPrinter.d
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ private struct ConsensusDataFmt
try
{
formattedWrite(sink, "{ tx_set: %s, enrolls: %s, missing_validators: %s }",
this.data.tx_set.map!(tx => HashFmt(tx)),
HashFmt(this.data.tx_set),
this.data.enrolls.map!(enroll => EnrollmentFmt(enroll)),
this.data.missing_validators);
}
Expand Down Expand Up @@ -699,12 +699,12 @@ unittest

const(ConsensusData) cd =
{
tx_set: [hashFull(37), hashFull(44)],
tx_set: hashFull(37),
enrolls: [ record, record, ],
missing_validators: [0, 2, 4],
};

static immutable Res1 = `{ tx_set: [0x992e...6694, 0x053f...89c8], enrolls: [{ utxo: 0x0000...e26f, seed: 0x4a5e...a33b, sig: 0x0000...be78 }, { utxo: 0x0000...e26f, seed: 0x4a5e...a33b, sig: 0x0000...be78 }], missing_validators: [0, 2, 4] }`;
static immutable Res1 = `{ tx_set: 0x992e...6694, enrolls: [{ utxo: 0x0000...e26f, seed: 0x4a5e...a33b, sig: 0x0000...be78 }, { utxo: 0x0000...e26f, seed: 0x4a5e...a33b, sig: 0x0000...be78 }], missing_validators: [0, 2, 4] }`;

assert(Res1 == format("%s", prettify(cd)),
format("%s", prettify(cd)));
Expand Down
Loading

0 comments on commit 944d453

Please sign in to comment.