diff --git a/Cargo.lock b/Cargo.lock index ed5d6fae2..219324770 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13487,6 +13487,7 @@ dependencies = [ "alloy-signer", "alloy-signer-local", "alloy-sol-types", + "codspeed-criterion-compat", "futures", "itertools 0.14.0", "metrics", diff --git a/crates/transaction-pool/Cargo.toml b/crates/transaction-pool/Cargo.toml index 4cc520757..f76114ee8 100644 --- a/crates/transaction-pool/Cargo.toml +++ b/crates/transaction-pool/Cargo.toml @@ -75,3 +75,8 @@ alloy-signer.workspace = true alloy-signer-local.workspace = true tokio.workspace = true test-case.workspace = true +criterion.workspace = true + +[[bench]] +name = "aa_2d_pool" +harness = false diff --git a/crates/transaction-pool/benches/aa_2d_pool.rs b/crates/transaction-pool/benches/aa_2d_pool.rs new file mode 100644 index 000000000..771e0b943 --- /dev/null +++ b/crates/transaction-pool/benches/aa_2d_pool.rs @@ -0,0 +1,313 @@ +//! Benchmarks for the AA 2D nonce pool under saturation. +//! +//! Covers the hot paths observed in high-TPS runs: +//! * `add_transaction` while the pool is at capacity (every insert triggers eviction) +//! * `on_state_updates` when a block mines many 2D nonce and expiring nonce transactions + +use alloy_primitives::{Address, Signature, TxKind, U256, map::AddressMap}; +use criterion::{BatchSize, Criterion, Throughput, criterion_group, criterion_main}; +use reth_primitives_traits::Recovered; +use reth_transaction_pool::{SubPoolLimit, TransactionOrigin, ValidPoolTransaction}; +use revm::database::{AccountStatus, BundleAccount, states::StorageSlot}; +use std::{hint::black_box, sync::Arc, time::Instant}; +use tempo_chainspec::hardfork::TempoHardfork; +use tempo_precompiles::NONCE_PRECOMPILE_ADDRESS; +use tempo_primitives::{ + TempoTxEnvelope, + transaction::{ + TEMPO_EXPIRING_NONCE_KEY, TempoTransaction, + tempo_transaction::Call, + tt_signature::{PrimitiveSignature, TempoSignature}, + tt_signed::AASigned, + }, +}; +use tempo_transaction_pool::{AA2dPool, AA2dPoolConfig, transaction::TempoPooledTransaction}; + +const HARDFORK: TempoHardfork = TempoHardfork::T8; + +/// Builds a valid pool transaction for the given sender/nonce key/nonce. +/// +/// `tip` controls `max_priority_fee_per_gas`, which determines eviction priority. +fn build_tx( + sender: Address, + nonce_key: U256, + nonce: u64, + tip: u128, +) -> Arc> { + let tx = TempoTransaction { + chain_id: 42431, + max_priority_fee_per_gas: tip, + max_fee_per_gas: 20_000_000_000 + tip, + gas_limit: 100_000, + calls: vec![Call { + to: TxKind::Call(Address::with_last_byte(1)), + value: U256::ZERO, + input: Default::default(), + }], + nonce_key, + nonce, + fee_token: None, + fee_payer_signature: None, + valid_after: None, + valid_before: None, + access_list: Default::default(), + tempo_authorization_list: Vec::new(), + key_authorization: None, + }; + let signature = + TempoSignature::Primitive(PrimitiveSignature::Secp256k1(Signature::test_signature())); + let envelope: TempoTxEnvelope = AASigned::new_unhashed(tx, signature).into(); + let recovered = Recovered::new_unchecked(envelope, sender); + let transaction = TempoPooledTransaction::new(recovered); + let transaction_id = reth_transaction_pool::identifier::TransactionId::new(0u64.into(), nonce); + Arc::new(ValidPoolTransaction { + transaction, + transaction_id, + propagate: true, + timestamp: Instant::now(), + origin: TransactionOrigin::External, + authority_ids: None, + }) +} + +/// Deterministic sender address derived from an index. +fn sender(i: u64) -> Address { + Address::from_slice(&{ + let mut b = [0u8; 20]; + b[..8].copy_from_slice(&i.to_be_bytes()); + b[19] = 0x42; + b + }) +} + +/// Builds `n` expiring nonce transactions from unique senders with increasing tips. +fn build_expiring_txs( + n: u64, + tip_offset: u128, +) -> Vec>> { + (0..n) + .map(|i| { + build_tx( + sender(i), + TEMPO_EXPIRING_NONCE_KEY, + i, + 1_000_000 + tip_offset + u128::from(i), + ) + }) + .collect() +} + +/// Builds 2D nonce transactions: `keys` nonce keys starting at `key_offset`, with +/// `per_key` sequential nonces each. +fn build_2d_txs( + keys: u64, + per_key: u64, + key_offset: u64, + tip_offset: u128, +) -> Vec>> { + (key_offset..key_offset + keys) + .flat_map(|k| { + (0..per_key).map(move |n| { + build_tx( + sender(k), + U256::from(k + 1), + n, + 1_000_000 + tip_offset + u128::from(k), + ) + }) + }) + .collect() +} + +fn pool_config(max_txs: usize) -> AA2dPoolConfig { + AA2dPoolConfig { + pending_limit: SubPoolLimit { + max_txs, + max_size: usize::MAX, + }, + queued_limit: SubPoolLimit { + max_txs, + max_size: usize::MAX, + }, + max_txs_per_sender: usize::MAX, + ..Default::default() + } +} + +/// Builds a pool pre-filled with the given transactions. +fn fill_pool( + config: AA2dPoolConfig, + txs: &[Arc>], +) -> AA2dPool { + let mut pool = AA2dPool::new(config); + pool.set_base_fee(1_000_000_000); + for tx in txs { + pool.add_transaction(Arc::clone(tx), 0, HARDFORK).unwrap(); + } + pool +} + +/// Inserting expiring nonce transactions into a pool that is at capacity, so every +/// insert evicts the current lowest-priority transaction. +fn bench_add_at_capacity(c: &mut Criterion) { + const CAPACITY: usize = 10_000; + const ADDS: u64 = 2_000; + + let mut group = c.benchmark_group("aa_2d_pool/add_at_capacity"); + group.throughput(Throughput::Elements(ADDS)); + group.sample_size(10); + + let base = build_expiring_txs(CAPACITY as u64, 0); + // higher tips so each insert evicts an old transaction instead of itself + let incoming = build_expiring_txs(ADDS, 1_000_000_000); + + group.bench_function("expiring", |b| { + b.iter_batched_ref( + || fill_pool(pool_config(CAPACITY), &base), + |pool| { + for tx in &incoming { + let _ = black_box(pool.add_transaction(Arc::clone(tx), 0, HARDFORK)); + } + }, + BatchSize::PerIteration, + ) + }); + + let base_2d = build_2d_txs(CAPACITY as u64 / 4, 4, 0, 0); + // disjoint nonce keys so inserts evict instead of replacing + let incoming_2d = build_2d_txs(ADDS / 4, 4, CAPACITY as u64, 1_000_000_000); + + group.bench_function("2d", |b| { + b.iter_batched_ref( + || fill_pool(pool_config(CAPACITY), &base_2d), + |pool| { + for tx in &incoming_2d { + let _ = black_box(pool.add_transaction(Arc::clone(tx), 0, HARDFORK)); + } + }, + BatchSize::PerIteration, + ) + }); + + group.finish(); +} + +/// Filling an empty pool below capacity (no eviction pressure). +fn bench_add_fill(c: &mut Criterion) { + const N: u64 = 10_000; + + let mut group = c.benchmark_group("aa_2d_pool/add_fill"); + group.throughput(Throughput::Elements(N)); + group.sample_size(10); + + let expiring = build_expiring_txs(N, 0); + group.bench_function("expiring", |b| { + b.iter_batched_ref( + || { + let mut pool = AA2dPool::new(pool_config(N as usize * 2)); + pool.set_base_fee(1_000_000_000); + pool + }, + |pool| { + for tx in &expiring { + let _ = black_box(pool.add_transaction(Arc::clone(tx), 0, HARDFORK)); + } + }, + BatchSize::PerIteration, + ) + }); + + let txs_2d = build_2d_txs(N / 4, 4, 0, 0); + group.bench_function("2d", |b| { + b.iter_batched_ref( + || { + let mut pool = AA2dPool::new(pool_config(N as usize * 2)); + pool.set_base_fee(1_000_000_000); + pool + }, + |pool| { + for tx in &txs_2d { + let _ = black_box(pool.add_transaction(Arc::clone(tx), 0, HARDFORK)); + } + }, + BatchSize::PerIteration, + ) + }); + + group.finish(); +} + +/// State updates that mine a large number of transactions at once out of a saturated pool. +fn bench_on_state_updates(c: &mut Criterion) { + const CAPACITY: usize = 10_000; + const MINED: u64 = 5_000; + + let mut group = c.benchmark_group("aa_2d_pool/on_state_updates"); + group.throughput(Throughput::Elements(MINED)); + group.sample_size(10); + + // Expiring nonce transactions: mark MINED of them as seen on chain. + let expiring = build_expiring_txs(CAPACITY as u64, 0); + let mut storage = revm::primitives::HashMap::default(); + for tx in expiring.iter().take(MINED as usize) { + let slot = tx + .transaction + .expiring_nonce_slot() + .expect("expiring nonce tx has slot"); + storage.insert(slot, StorageSlot::new_changed(U256::ZERO, U256::from(1u64))); + } + let mut expiring_state = AddressMap::default(); + expiring_state.insert( + NONCE_PRECOMPILE_ADDRESS, + BundleAccount::new(None, None, storage, AccountStatus::Changed), + ); + + group.bench_function("expiring_mined", |b| { + b.iter_batched_ref( + || fill_pool(pool_config(CAPACITY * 2), &expiring), + |pool| black_box(pool.on_state_updates(&expiring_state)), + BatchSize::PerIteration, + ) + }); + + // 2D nonce transactions: advance the on-chain nonce of each key so that + // MINED transactions across all keys are pruned at once. + const PER_KEY: u64 = 4; + let keys = CAPACITY as u64 / PER_KEY; + let txs_2d = build_2d_txs(keys, PER_KEY, 0, 0); + let mined_per_key = 2u64; + let mut storage = revm::primitives::HashMap::default(); + for k in 0..(MINED / mined_per_key) { + let slot = txs_2d[(k * PER_KEY) as usize] + .transaction + .nonce_key_slot() + .expect("2d tx has nonce key slot"); + storage.insert( + slot, + StorageSlot::new_changed(U256::ZERO, U256::from(mined_per_key)), + ); + } + let mut state_2d = AddressMap::default(); + state_2d.insert( + NONCE_PRECOMPILE_ADDRESS, + BundleAccount::new(None, None, storage, AccountStatus::Changed), + ); + + group.bench_function("2d_mined", |b| { + b.iter_batched_ref( + || fill_pool(pool_config(CAPACITY * 2), &txs_2d), + |pool| black_box(pool.on_state_updates(&state_2d)), + BatchSize::PerIteration, + ) + }); + + group.finish(); +} + +criterion_group!( + benches, + bench_add_at_capacity, + bench_add_fill, + bench_on_state_updates +); +criterion_main!(benches); diff --git a/crates/transaction-pool/src/tt_2d_pool.rs b/crates/transaction-pool/src/tt_2d_pool.rs index 81552f917..93dd4ae92 100644 --- a/crates/transaction-pool/src/tt_2d_pool.rs +++ b/crates/transaction-pool/src/tt_2d_pool.rs @@ -70,7 +70,7 @@ pub struct AA2dPool { by_hash: B256Map>>, /// Expiring nonce transactions, keyed by expiring nonce hash (always pending/independent). /// These use expiring nonce replay protection instead of sequential nonces. - expiring_nonce_txs: B256Map, + expiring_nonce_txs: B256Map, /// Expiring nonce transactions in eviction order. /// /// Regular 2D transactions use `by_eviction_order`, which is keyed by @@ -106,7 +106,14 @@ pub struct AA2dPool { /// set checking `is_pending` to find queued or pending transactions. Keys /// own a priority snapshot so repricing does not mutate canonical /// transaction storage. + /// + /// Removals are tombstoned: removed transactions stay in the set with their + /// `live` flag cleared and are purged lazily, see [`Self::maybe_compact_eviction_order`]. by_eviction_order: BTreeSet, + /// Number of tombstoned (removed) entries in `by_eviction_order`. + eviction_order_stale: usize, + /// Number of tombstoned (removed) entries in `expiring_nonce_eviction_order`. + expiring_eviction_order_stale: usize, /// Base fee used for transaction insertion and eviction-order priorities. base_fee: u64, /// Tracks the number of transactions per sender for DoS protection. @@ -146,6 +153,8 @@ impl AA2dPool { config, metrics: AA2dPoolMetrics::default(), by_eviction_order: Default::default(), + eviction_order_stale: 0, + expiring_eviction_order_stale: 0, base_fee: 0, txs_by_sender: Default::default(), pending_count: 0, @@ -168,7 +177,10 @@ impl AA2dPool { self.metrics.set_transaction_counts(total, pending, queued); } - pub(crate) fn set_base_fee(&mut self, base_fee: u64) { + /// Sets the base fee used for transaction insertion and eviction-order priorities. + /// + /// Rebuilds the eviction order if the base fee changed. + pub fn set_base_fee(&mut self, base_fee: u64) { if self.base_fee == base_fee { return; } @@ -179,6 +191,7 @@ impl AA2dPool { fn rebuild_eviction_order(&mut self) { self.by_eviction_order.clear(); + self.eviction_order_stale = 0; for (id, tx) in &self.by_id { self.by_eviction_order.insert(EvictionKey::with_base_fee( Arc::clone(tx), @@ -188,6 +201,7 @@ impl AA2dPool { } self.expiring_nonce_eviction_order.clear(); + self.expiring_eviction_order_stale = 0; for tx in self.expiring_nonce_txs.values() { self.expiring_nonce_eviction_order.insert( ExpiringNonceEvictionKey::from_pending_with_base_fee(tx, self.base_fee), @@ -195,6 +209,52 @@ impl AA2dPool { } } + /// Compacts `by_eviction_order` once more than half of its entries are tombstones. + /// + /// Keeps removal O(1) amortized: each removal only flips the transaction's + /// `live` flag, and the periodic compaction cost is amortized over at least + /// as many removals. + fn maybe_compact_eviction_order(&mut self) { + if self.eviction_order_stale > self.by_id.len() { + self.by_eviction_order.retain(|key| key.is_live()); + self.eviction_order_stale = 0; + } + } + + /// Compacts `expiring_nonce_eviction_order` once more than half of its entries are + /// tombstones, see [`Self::maybe_compact_eviction_order`]. + fn maybe_compact_expiring_eviction_order(&mut self) { + if self.expiring_eviction_order_stale > self.expiring_nonce_txs.len() { + self.expiring_nonce_eviction_order + .retain(|key| key.is_live()); + self.expiring_eviction_order_stale = 0; + } + } + + /// Removes tombstoned entries from the front of `by_eviction_order` so the + /// first entry is live. + fn purge_dead_eviction_front(&mut self) { + while let Some(first) = self.by_eviction_order.first() { + if first.is_live() { + break; + } + self.by_eviction_order.pop_first(); + self.eviction_order_stale -= 1; + } + } + + /// Removes tombstoned entries from the front of `expiring_nonce_eviction_order` so + /// the first entry is live. + fn purge_dead_expiring_eviction_front(&mut self) { + while let Some(first) = self.expiring_nonce_eviction_order.first() { + if first.is_live() { + break; + } + self.expiring_nonce_eviction_order.pop_first(); + self.expiring_eviction_order_stale -= 1; + } + } + /// Entrypoint for adding a 2d AA transaction. /// /// `on_chain_nonce` is expected to be the nonce of the sender at the time of validation. @@ -204,7 +264,7 @@ impl AA2dPool { /// `hardfork` indicates the active Tempo hardfork. When T1 or later, expiring nonce /// transactions (nonce_key == U256::MAX) are handled specially. Otherwise, they are /// treated as regular 2D nonce transactions. - pub(crate) fn add_transaction( + pub fn add_transaction( &mut self, transaction: Arc>, on_chain_nonce: u64, @@ -252,6 +312,7 @@ impl AA2dPool { let tx = Arc::new(AA2dInternalTransaction { inner: AA2dStoredTransaction::new(self.next_id(), transaction.clone()), is_pending: AtomicBool::new(false), + removed: AtomicBool::new(false), }); // Use entry API once to both check for replacement and insert. @@ -459,18 +520,19 @@ impl AA2dPool { } // Create pending transaction - let pending_tx = AA2dStoredTransaction { - submission_id: { - let id = self.submission_id; - self.submission_id = self.submission_id.wrapping_add(1); - id - }, - transaction: transaction.clone(), + let submission_id = { + let id = self.submission_id; + self.submission_id = self.submission_id.wrapping_add(1); + id }; + let pending_tx = ExpiringNonceTransaction::new(AA2dStoredTransaction::new( + submission_id, + transaction.clone(), + )); let eviction_key = ExpiringNonceEvictionKey::from_pending_with_base_fee(&pending_tx, self.base_fee); let pending_tx_update = if self.new_transaction_notifier.receiver_count() > 0 { - Some(pending_tx.clone()) + Some(pending_tx.inner.clone()) } else { None }; @@ -505,7 +567,7 @@ impl AA2dPool { } /// Returns how many pending and queued transactions are in the pool. - pub(crate) fn pending_and_queued_txn_count(&self) -> (usize, usize) { + pub fn pending_and_queued_txn_count(&self) -> (usize, usize) { (self.pending_count, self.queued_count) } @@ -817,11 +879,14 @@ impl AA2dPool { } } + /// Tombstones the transaction's entry in `by_eviction_order`. + /// + /// The entry is purged lazily instead of doing a keyed `BTreeSet` removal, which + /// would require recomputing the transaction's priority. fn remove_eviction_key(&mut self, tx: &Arc) { - self.by_eviction_order.remove(&EvictionOrderKey::new( - TempoTipOrdering::default().priority(&tx.inner.transaction.transaction, self.base_fee), - tx.inner.submission_id, - )); + tx.mark_removed(); + self.eviction_order_stale += 1; + self.maybe_compact_eviction_order(); } /// Removes the independent transaction if it matches the given id. @@ -1189,7 +1254,7 @@ impl AA2dPool { let to_remove: Vec<_> = self .by_eviction_order .iter() - .filter(|key| !key.is_pending()) + .filter(|key| key.is_live() && !key.is_pending()) .map(|key| key.tx_id) .take(count) .collect(); @@ -1205,10 +1270,14 @@ impl AA2dPool { /// Evicts one pending transaction, considering both regular 2D and expiring nonce txs. /// Evicts the transaction with lowest priority; ties broken by submission order (newer first). fn evict_one_pending(&mut self) -> Option>> { + // ensure the front entries are live before peeking + self.purge_dead_eviction_front(); + self.purge_dead_expiring_eviction_front(); + let worst_2d = self .by_eviction_order .iter() - .find(|key| key.is_pending()) + .find(|key| key.is_live() && key.is_pending()) .map(|key| (key.tx_id, key.priority().clone(), key.submission_id())); let worst_expiring = self @@ -1253,31 +1322,36 @@ impl AA2dPool { fn evict_worst_expiring_nonce_tx( &mut self, ) -> Option>> { - let eviction_key = self.expiring_nonce_eviction_order.pop_first()?; - let pending_tx = self - .expiring_nonce_txs - .remove(&eviction_key.expiring_hash())?; + loop { + let eviction_key = self.expiring_nonce_eviction_order.pop_first()?; + if !eviction_key.is_live() { + self.expiring_eviction_order_stale -= 1; + continue; + } + let pending_tx = self + .expiring_nonce_txs + .remove(&eviction_key.expiring_hash())?; + // invalidate any snapshot copies of this transaction + pending_tx.mark_removed(); - Some(self.remove_expiring_nonce_pending_tx(pending_tx)) + return Some(self.remove_expiring_nonce_pending_tx(pending_tx.inner)); + } } /// Removes an expiring nonce transaction by hash. /// /// Use when removal starts from a hash, such as direct removal, sender - /// removal, or nonce-state inclusion. This path removes the matching - /// eviction key by lookup. + /// removal, or nonce-state inclusion. This path tombstones the matching + /// eviction key, which is purged lazily. fn remove_expiring_nonce_tx( &mut self, expiring_hash: &B256, ) -> Option>> { let pending_tx = self.expiring_nonce_txs.remove(expiring_hash)?; - self.expiring_nonce_eviction_order - .remove(&EvictionOrderKey::new( - TempoTipOrdering::default() - .priority(&pending_tx.transaction.transaction, self.base_fee), - pending_tx.submission_id, - )); - Some(self.remove_expiring_nonce_pending_tx(pending_tx)) + pending_tx.mark_removed(); + self.expiring_eviction_order_stale += 1; + self.maybe_compact_expiring_eviction_order(); + Some(self.remove_expiring_nonce_pending_tx(pending_tx.inner)) } /// Removes secondary state for an already-detached expiring nonce transaction. @@ -1354,10 +1428,7 @@ impl AA2dPool { } /// Processes nonce-precompile storage updates and updates internal state accordingly. - pub(crate) fn on_state_updates( - &mut self, - state: &AddressMap, - ) -> PoolUpdateResult { + pub fn on_state_updates(&mut self, state: &AddressMap) -> PoolUpdateResult { self.state_update_nonce_changes.clear(); self.state_update_included_expiring_nonce_hashes.clear(); @@ -1425,19 +1496,45 @@ impl AA2dPool { self.expiring_nonce_txs.len(), self.by_hash.len() ); + let live_expiring_keys = self + .expiring_nonce_eviction_order + .iter() + .filter(|key| key.is_live()) + .count(); assert_eq!( self.expiring_nonce_txs.len(), + live_expiring_keys, + "expiring_nonce_txs.len() ({}) != live expiring eviction keys ({})", + self.expiring_nonce_txs.len(), + live_expiring_keys + ); + assert_eq!( + self.expiring_nonce_eviction_order.len(), + self.expiring_nonce_txs.len() + self.expiring_eviction_order_stale, + "expiring eviction order len ({}) != live ({}) + stale ({})", self.expiring_nonce_eviction_order.len(), - "expiring_nonce_txs.len() ({}) != expiring_nonce_eviction_order.len() ({})", self.expiring_nonce_txs.len(), - self.expiring_nonce_eviction_order.len() + self.expiring_eviction_order_stale ); + let live_eviction_keys = self + .by_eviction_order + .iter() + .filter(|key| key.is_live()) + .count(); assert_eq!( self.by_id.len(), + live_eviction_keys, + "by_id.len() ({}) != live eviction keys ({})", + self.by_id.len(), + live_eviction_keys + ); + assert_eq!( + self.by_eviction_order.len(), + self.by_id.len() + self.eviction_order_stale, + "eviction order len ({}) != live ({}) + stale ({})", self.by_eviction_order.len(), - "by_id.len() ({}) != by_eviction_order.len() ({})", self.by_id.len(), - self.by_eviction_order.len() + self.eviction_order_stale ); // All independent transactions must exist in by_id @@ -1575,6 +1672,10 @@ impl AA2dPool { } for key in &self.by_eviction_order { + // tombstoned keys reference removed transactions and are purged lazily + if !key.is_live() { + continue; + } let Some(tx) = self.by_id.get(&key.tx_id) else { panic!("Eviction key {:?} not in by_id", key.tx_id); }; @@ -1654,6 +1755,9 @@ impl AA2dPool { } for key in &self.expiring_nonce_eviction_order { + if !key.is_live() { + continue; + } let expiring_hash = key.expiring_hash(); let Some(pending_tx) = self.expiring_nonce_txs.get(&expiring_hash) else { panic!("Expiring nonce eviction key {expiring_hash:?} not in expiring_nonce_txs"); @@ -1664,7 +1768,7 @@ impl AA2dPool { "Expiring nonce eviction key {expiring_hash:?} has mismatched submission id" ); assert_eq!( - key.transaction.hash(), + key.stored.transaction.hash(), pending_tx.transaction.hash(), "Expiring nonce eviction key {expiring_hash:?} has mismatched transaction hash" ); @@ -1729,6 +1833,40 @@ impl AA2dStoredTransaction { } } +/// An expiring nonce transaction tracked by the pool. +#[derive(Debug)] +struct ExpiringNonceTransaction { + inner: AA2dStoredTransaction, + /// Tombstone flag shared with this transaction's eviction-order keys, set to + /// `false` when the transaction is removed from the pool. + /// + /// Eviction-order sets keep tombstoned keys until they are lazily purged or + /// compacted, which makes removal O(1) instead of a keyed `BTreeSet` removal. + live: Arc, +} + +impl ExpiringNonceTransaction { + fn new(inner: AA2dStoredTransaction) -> Self { + Self { + inner, + live: Arc::new(AtomicBool::new(true)), + } + } + + /// Marks this transaction as removed from the pool. + fn mark_removed(&self) { + self.live.store(false, Ordering::Relaxed); + } +} + +impl std::ops::Deref for ExpiringNonceTransaction { + type Target = AA2dStoredTransaction; + + fn deref(&self) -> &AA2dStoredTransaction { + &self.inner + } +} + #[derive(Debug)] struct AA2dInternalTransaction { /// Keeps track of the transaction without an ordering priority. @@ -1741,6 +1879,12 @@ struct AA2dInternalTransaction { /// the transaction from the eviction set. This allows a single eviction set for /// all transactions, with pending/queued filtering done at eviction time. is_pending: AtomicBool, + /// Tombstone flag, set when the transaction is removed from the pool while its + /// eviction-order key is still present. + /// + /// Eviction-order keys share this transaction via `Arc`, so tombstoning does not + /// require an extra allocation, see also [`ExpiringNonceTransaction::live`]. + removed: AtomicBool, } impl AA2dInternalTransaction { @@ -1753,6 +1897,16 @@ impl AA2dInternalTransaction { fn set_pending(&self, pending: bool) -> bool { self.is_pending.swap(pending, Ordering::Relaxed) } + + /// Returns `true` if this transaction was removed from the pool. + fn is_removed(&self) -> bool { + self.removed.load(Ordering::Relaxed) + } + + /// Marks this transaction as removed from the pool. + fn mark_removed(&self) { + self.removed.store(true, Ordering::Relaxed); + } } /// Minimal ordering key for eviction set lookups. @@ -1804,24 +1958,29 @@ impl PartialOrd for EvictionOrderKey { #[derive(Debug, Clone)] struct ExpiringNonceEvictionKey { order: EvictionOrderKey, - transaction: Arc>, + stored: AA2dStoredTransaction, + /// Tombstone flag shared with the pool's [`ExpiringNonceTransaction`]. + live: Arc, } impl ExpiringNonceEvictionKey { - fn from_pending_with_base_fee(tx: &AA2dStoredTransaction, base_fee: u64) -> Self { + fn from_pending_with_base_fee(tx: &ExpiringNonceTransaction, base_fee: u64) -> Self { Self { order: EvictionOrderKey::new( TempoTipOrdering::default().priority(&tx.transaction.transaction, base_fee), tx.submission_id, ), - transaction: tx.transaction.clone(), + stored: tx.inner.clone(), + live: tx.live.clone(), } } fn from_pending_owned(tx: PendingTransaction) -> Self { Self { order: EvictionOrderKey::new(tx.priority, tx.submission_id), - transaction: tx.transaction, + // not tracked by the pool's tombstone flags, always considered live + stored: AA2dStoredTransaction::new(tx.submission_id, tx.transaction), + live: Arc::new(AtomicBool::new(true)), } } @@ -1829,7 +1988,7 @@ impl ExpiringNonceEvictionKey { PendingTransaction { submission_id: self.order.submission_id, priority: self.order.priority, - transaction: self.transaction, + transaction: self.stored.transaction, } } @@ -1841,8 +2000,14 @@ impl ExpiringNonceEvictionKey { self.order.submission_id } + /// Returns `true` if the referenced transaction is still in the pool. + fn is_live(&self) -> bool { + self.live.load(Ordering::Relaxed) + } + fn expiring_hash(&self) -> B256 { - self.transaction + self.stored + .transaction .transaction .precomputed_expiring_nonce_hash() } @@ -1924,6 +2089,11 @@ impl EvictionKey { fn is_pending(&self) -> bool { self.tx.is_pending() } + + /// Returns `true` if the referenced transaction is still in the pool. + fn is_live(&self) -> bool { + !self.tx.is_removed() + } } impl Borrow for EvictionKey { @@ -2021,8 +2191,20 @@ impl BestAA2dTransactions { Some(key.into_transaction()) } + /// Removes tombstoned entries from the back of `expiring_nonce_order` so the + /// last entry references a transaction that is still in the pool. + fn purge_dead_expiring_back(&mut self) { + while let Some(last) = self.expiring_nonce_order.last() { + if last.is_live() { + break; + } + self.expiring_nonce_order.pop_last(); + } + } + /// Removes the best regular or expiring nonce transaction. fn pop_best(&mut self) -> Option { + self.purge_dead_expiring_back(); match (self.independent.last(), self.expiring_nonce_order.last()) { (Some(regular), Some(expiring)) => { if regular @@ -2104,10 +2286,7 @@ impl BestAA2dTransactions { } self.by_id.insert( id, - AA2dStoredTransaction { - submission_id: tx.submission_id, - transaction: tx.transaction, - }, + AA2dStoredTransaction::new(tx.submission_id, tx.transaction), ); } } else { @@ -6496,7 +6675,12 @@ mod tests { fn assert_expiring_eviction_index_len(pool: &AA2dPool, len: usize) { assert_eq!(pool.expiring_nonce_txs.len(), len); - assert_eq!(pool.expiring_nonce_eviction_order.len(), len); + let live = pool + .expiring_nonce_eviction_order + .iter() + .filter(|key| key.is_live()) + .count(); + assert_eq!(live, len); pool.assert_invariants(); } @@ -6504,7 +6688,7 @@ mod tests { assert!( pool.expiring_nonce_eviction_order .iter() - .any(|key| key.expiring_hash() == expiring_hash), + .any(|key| key.is_live() && key.expiring_hash() == expiring_hash), "expiring_nonce_eviction_order should contain {expiring_hash:?}" ); } @@ -6513,7 +6697,7 @@ mod tests { assert!( pool.expiring_nonce_eviction_order .iter() - .all(|key| key.expiring_hash() != expiring_hash), + .all(|key| !key.is_live() || key.expiring_hash() != expiring_hash), "expiring_nonce_eviction_order should not contain {expiring_hash:?}" ); }