Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Merge after VM PR) Use expiration policy #2447

Open
wants to merge 5 commits into
base: use-storage-read-with-offset
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2376](https://github.com/FuelLabs/fuel-core/pull/2376): Add a way to fetch transactions in P2P without specifying a peer.
- [2327](https://github.com/FuelLabs/fuel-core/pull/2327): Add more services tests and more checks of the pool. Also add an high level documentation for users of the pool and contributors.
- [2416](https://github.com/FuelLabs/fuel-core/issues/2416): Define the `GasPriceServiceV1` task.
- [2447](https://github.com/FuelLabs/fuel-core/pull/2447): Use new `expiration` policy in the transaction pool. Add a mechanism to prune the transactions when they expired.


### Fixed
Expand Down
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,4 @@ tempfile = "3.4"
tikv-jemallocator = "0.5"

[patch.crates-io]
fuel-vm = { git = "https://github.com/fuelLabs/fuel-vm" }
fuel-vm = { git = "https://github.com/fuelLabs/fuel-vm", branch = "add_expiration_policy" }
9 changes: 9 additions & 0 deletions crates/services/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1149,11 +1149,20 @@ where
) -> ExecutorResult<CheckedTransaction> {
let block_height = *header.height();
let actual_version = header.consensus_parameters_version;
let expiration = tx.expiration();
let checked_tx = match tx {
MaybeCheckedTransaction::Transaction(tx) => tx
.into_checked_basic(block_height, &self.consensus_params)?
.into(),
MaybeCheckedTransaction::CheckedTransaction(checked_tx, checked_version) => {
// If you plan to add an other check of validity like this one on the checked_tx
// then probably the `CheckedTransaction` type isn't useful anymore.
if block_height > expiration {
return Err(ExecutorError::TransactionExpired(
expiration,
block_height,
));
}
if actual_version == checked_version {
checked_tx
} else {
Expand Down
51 changes: 50 additions & 1 deletion crates/services/executor/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ use fuel_core_types::{
},
fuel_tx::{
self,
field::Expiration,
Chargeable,
ConsensusParameters,
Transaction,
TxId,
UniqueIdentifier,
},
fuel_types::ChainId,
fuel_types::{
BlockHeight,
ChainId,
},
fuel_vm::checked_transaction::CheckedTransaction,
services::{
executor::{
Expand Down Expand Up @@ -65,6 +69,51 @@ impl MaybeCheckedTransaction {
MaybeCheckedTransaction::Transaction(tx) => tx.id(chain_id),
}
}

pub fn expiration(&self) -> BlockHeight {
match self {
MaybeCheckedTransaction::CheckedTransaction(
CheckedTransaction::Script(tx),
_,
) => tx.transaction().expiration(),
MaybeCheckedTransaction::CheckedTransaction(
CheckedTransaction::Create(tx),
_,
) => tx.transaction().expiration(),
MaybeCheckedTransaction::CheckedTransaction(
CheckedTransaction::Mint(_),
_,
) => u32::MAX.into(),
MaybeCheckedTransaction::CheckedTransaction(
CheckedTransaction::Upgrade(tx),
_,
) => tx.transaction().expiration(),
MaybeCheckedTransaction::CheckedTransaction(
CheckedTransaction::Upload(tx),
_,
) => tx.transaction().expiration(),
MaybeCheckedTransaction::CheckedTransaction(
CheckedTransaction::Blob(tx),
_,
) => tx.transaction().expiration(),
MaybeCheckedTransaction::Transaction(Transaction::Script(tx)) => {
tx.expiration()
}
MaybeCheckedTransaction::Transaction(Transaction::Create(tx)) => {
tx.expiration()
}
MaybeCheckedTransaction::Transaction(Transaction::Mint(_)) => u32::MAX.into(),
MaybeCheckedTransaction::Transaction(Transaction::Upgrade(tx)) => {
tx.expiration()
}
MaybeCheckedTransaction::Transaction(Transaction::Upload(tx)) => {
tx.expiration()
}
MaybeCheckedTransaction::Transaction(Transaction::Blob(tx)) => {
tx.expiration()
}
}
}
}

pub trait TransactionExt {
Expand Down
33 changes: 33 additions & 0 deletions crates/services/txpool_v2/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ use futures::StreamExt;
use parking_lot::RwLock;
use std::{
collections::{
BTreeMap,
HashSet,
VecDeque,
},
Expand Down Expand Up @@ -322,6 +323,29 @@ where
let mut block_height = self.current_height.write();
*block_height = new_height;
}

// Remove expired transactions
let mut removed_txs = vec![];
{
let mut height_expiration_txs = self.pruner.height_expiration_txs.write();
let range_to_remove = height_expiration_txs
.range(..=new_height)
.map(|(k, _)| *k)
.collect::<Vec<_>>();
for height in range_to_remove {
let expired_txs = height_expiration_txs.remove(&height);
if let Some(expired_txs) = expired_txs {
let mut tx_pool = self.pool.write();
removed_txs
.extend(tx_pool.remove_transaction_and_dependents(expired_txs));
}
}
}
for tx in removed_txs {
self.shared_state
.tx_status_sender
.send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl));
}
}

fn borrow_txpool(&self, request: BorrowTxPoolRequest) {
Expand Down Expand Up @@ -391,6 +415,7 @@ where
let shared_state = self.shared_state.clone();
let current_height = self.current_height.clone();
let time_txs_submitted = self.pruner.time_txs_submitted.clone();
let height_expiration_txs = self.pruner.height_expiration_txs.clone();
let tx_id = transaction.id(&self.chain_id);
let utxo_validation = self.utxo_validation;

Expand Down Expand Up @@ -430,6 +455,7 @@ where
};

let tx = Arc::new(checked_tx);
let expiration = tx.expiration();

let result = {
let mut pool = pool.write();
Expand All @@ -448,6 +474,12 @@ where
.write()
.push_front((submitted_time, tx_id));

if expiration < u32::MAX.into() {
let mut lock = height_expiration_txs.write();
let block_height_expiration = lock.entry(expiration).or_default();
block_height_expiration.push(tx_id);
}

let duration = submitted_time
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time can't be less than UNIX EPOCH");
Expand Down Expand Up @@ -763,6 +795,7 @@ where
let pruner = TransactionPruner {
txs_ttl: config.max_txs_ttl,
time_txs_submitted: Arc::new(RwLock::new(VecDeque::new())),
height_expiration_txs: Arc::new(RwLock::new(BTreeMap::new())),
ttl_timer,
};

Expand Down
11 changes: 9 additions & 2 deletions crates/services/txpool_v2/src/service/pruner.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
use crate::service::Shared;
use fuel_core_types::fuel_tx::TxId;
use fuel_core_types::{
fuel_tx::TxId,
fuel_types::BlockHeight,
};
use std::{
collections::VecDeque,
collections::{
BTreeMap,
VecDeque,
},
time::SystemTime,
};

pub(super) struct TransactionPruner {
pub time_txs_submitted: Shared<VecDeque<(SystemTime, TxId)>>,
pub height_expiration_txs: Shared<BTreeMap<BlockHeight, Vec<TxId>>>,
pub ttl_timer: tokio::time::Interval,
pub txs_ttl: tokio::time::Duration,
}
10 changes: 10 additions & 0 deletions crates/services/txpool_v2/src/tests/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ use std::{
Mutex,
},
};
use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::ReceiverStream;

#[derive(Default)]
pub struct Data {
Expand Down Expand Up @@ -335,4 +337,12 @@ impl MockImporter {
});
importer
}

pub fn with_block_provider(block_provider: Receiver<SharedImportResult>) -> Self {
let mut importer = MockImporter::default();
importer
.expect_block_events()
.return_once(move || Box::pin(ReceiverStream::new(block_provider)));
importer
}
}
Loading
Loading