diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 4dc0663c..d3323451 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -58,6 +58,10 @@ pub struct OpRbuilderArgs { )] pub resource_metering_buffer_size: usize, + /// Buffer size for backrun bundles (LRU eviction when full) + #[arg(long = "builder.backrun-bundle-buffer-size", default_value = "10000")] + pub backrun_bundle_buffer_size: usize, + /// Path to builder playgorund to automatically start up the node connected to it #[arg( long = "builder.playground", diff --git a/crates/op-rbuilder/src/builders/context.rs b/crates/op-rbuilder/src/builders/context.rs index 1e0f22da..b3d16bd2 100644 --- a/crates/op-rbuilder/src/builders/context.rs +++ b/crates/op-rbuilder/src/builders/context.rs @@ -80,6 +80,8 @@ pub struct OpPayloadBuilderCtx { pub address_gas_limiter: AddressGasLimiter, /// Per transaction resource metering information pub resource_metering: ResourceMetering, + /// Backrun bundle store for storing backrun transactions + pub backrun_bundle_store: crate::bundles::BackrunBundleStore, } impl OpPayloadBuilderCtx { @@ -433,7 +435,7 @@ impl OpPayloadBuilderCtx { is_bundle_tx && !reverted_hashes.unwrap().contains(&tx_hash); let log_txn = |result: TxnExecutionResult| { - debug!( + info!( target: "payload_builder", message = "Considering transaction", tx_hash = ?tx_hash, @@ -543,7 +545,8 @@ impl OpPayloadBuilderCtx { continue; } - if result.is_success() { + let is_success = result.is_success(); + if is_success { log_txn(TxnExecutionResult::Success); num_txs_simulated_success += 1; self.metrics.successful_tx_gas_used.record(gas_used as f64); @@ -600,6 +603,120 @@ impl OpPayloadBuilderCtx { // append sender and transaction to the respective lists info.executed_senders.push(tx.signer()); info.executed_transactions.push(tx.into_inner()); + + if is_success && let Some(backrun_bundles) = self.backrun_bundle_store.get(&tx_hash) { + self.metrics.backrun_target_txs_found_total.increment(1); + let backrun_start_time = Instant::now(); + + // Pre-compute total fees and sort bundles (descending) + let mut bundles_with_fees: Vec<_> = backrun_bundles + .into_iter() + .map(|bundle| { + let total_fee: u128 = bundle + .backrun_txs + .iter() + .map(|tx| tx.effective_tip_per_gas(base_fee).unwrap_or(0)) + .sum(); + (bundle, total_fee) + }) + .collect(); + bundles_with_fees.sort_by(|a, b| b.1.cmp(&a.1)); + + 'bundle_loop: for (mut stored_bundle, total_bundle_fee) in bundles_with_fees { + info!( + target: "payload_builder", + message = "Executing backrun bundle", + tx_hash = ?tx_hash, + bundle_id = ?stored_bundle.bundle_id, + tx_count = stored_bundle.backrun_txs.len(), + ); + + // Validate: total bundle priority fee must be >= target tx's priority fee + if total_bundle_fee < miner_fee { + self.metrics + .backrun_bundles_rejected_low_fee_total + .increment(1); + info!( + target: "payload_builder", + bundle_id = ?stored_bundle.bundle_id, + target_fee = miner_fee, + total_bundle_fee = total_bundle_fee, + "Backrun bundle rejected: total priority fee below target tx" + ); + continue 'bundle_loop; + } + + // Sort backrun txs by priority fee (descending) + stored_bundle.backrun_txs.sort_unstable_by(|a, b| { + let a_tip = a.effective_tip_per_gas(base_fee).unwrap_or(0); + let b_tip = b.effective_tip_per_gas(base_fee).unwrap_or(0); + b_tip.cmp(&a_tip) + }); + + // All-or-nothing: simulate all txs first, only commit if all succeed + let mut pending_results = Vec::with_capacity(stored_bundle.backrun_txs.len()); + + for backrun_tx in &stored_bundle.backrun_txs { + let ResultAndState { result, state } = match evm.transact(backrun_tx) { + Ok(res) => res, + Err(err) => { + return Err(PayloadBuilderError::evm(err)); + } + }; + + if !result.is_success() { + self.metrics.backrun_bundles_reverted_total.increment(1); + info!( + target: "payload_builder", + target_tx = ?tx_hash, + failed_tx = ?backrun_tx.tx_hash(), + bundle_id = ?stored_bundle.bundle_id, + gas_used = result.gas_used(), + "Backrun bundle reverted (all-or-nothing)" + ); + continue 'bundle_loop; + } + + pending_results.push((backrun_tx, result, state)); + } + + for (backrun_tx, result, state) in pending_results { + let backrun_gas_used = result.gas_used(); + + info.cumulative_gas_used += backrun_gas_used; + info.cumulative_da_bytes_used += backrun_tx.encoded_2718().len() as u64; + + let ctx = ReceiptBuilderCtx { + tx: backrun_tx.inner(), + evm: &evm, + result, + state: &state, + cumulative_gas_used: info.cumulative_gas_used, + }; + info.receipts.push(self.build_receipt(ctx, None)); + + evm.db_mut().commit(state); + + let miner_fee = backrun_tx + .effective_tip_per_gas(base_fee) + .expect("fee is always valid; execution succeeded"); + info.total_fees += U256::from(miner_fee) * U256::from(backrun_gas_used); + + info.executed_senders.push(backrun_tx.signer()); + info.executed_transactions + .push(backrun_tx.clone().into_inner()); + } + + self.metrics.backrun_bundles_landed_total.increment(1); + } + + self.metrics + .backrun_bundle_execution_duration + .record(backrun_start_time.elapsed()); + + // Remove the target tx from the backrun bundle store as already executed + self.backrun_bundle_store.remove(&tx_hash); + } } let payload_transaction_simulation_time = execute_txs_start_time.elapsed(); diff --git a/crates/op-rbuilder/src/builders/flashblocks/ctx.rs b/crates/op-rbuilder/src/builders/flashblocks/ctx.rs index 28cbae76..47e9d5ee 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/ctx.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/ctx.rs @@ -1,5 +1,6 @@ use crate::{ builders::{BuilderConfig, OpPayloadBuilderCtx, flashblocks::FlashblocksConfig}, + bundles::BackrunBundleStore, gas_limiter::{AddressGasLimiter, args::GasLimiterArgs}, metrics::OpRBuilderMetrics, resource_metering::ResourceMetering, @@ -32,6 +33,8 @@ pub(super) struct OpPayloadSyncerCtx { metrics: Arc, /// Resource metering tracking resource_metering: ResourceMetering, + /// Backrun bundle store + backrun_bundle_store: BackrunBundleStore, } impl OpPayloadSyncerCtx { @@ -52,6 +55,7 @@ impl OpPayloadSyncerCtx { max_gas_per_txn: builder_config.max_gas_per_txn, metrics, resource_metering: builder_config.resource_metering, + backrun_bundle_store: builder_config.backrun_bundle_store, }) } @@ -85,6 +89,7 @@ impl OpPayloadSyncerCtx { max_gas_per_txn: self.max_gas_per_txn, address_gas_limiter: AddressGasLimiter::new(GasLimiterArgs::default()), resource_metering: self.resource_metering.clone(), + backrun_bundle_store: self.backrun_bundle_store.clone(), } } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 35e16834..6318b156 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -283,6 +283,7 @@ where max_gas_per_txn: self.config.max_gas_per_txn, address_gas_limiter: self.address_gas_limiter.clone(), resource_metering: self.config.resource_metering.clone(), + backrun_bundle_store: self.config.backrun_bundle_store.clone(), }) } diff --git a/crates/op-rbuilder/src/builders/mod.rs b/crates/op-rbuilder/src/builders/mod.rs index 48ce625b..4da8be12 100644 --- a/crates/op-rbuilder/src/builders/mod.rs +++ b/crates/op-rbuilder/src/builders/mod.rs @@ -21,7 +21,7 @@ mod flashblocks; mod generator; mod standard; -use crate::resource_metering::ResourceMetering; +use crate::{bundles::BackrunBundleStore, resource_metering::ResourceMetering}; pub use builder_tx::{ BuilderTransactionCtx, BuilderTransactionError, BuilderTransactions, InvalidContractDataError, SimulationSuccessResult, get_balance, get_nonce, @@ -130,6 +130,9 @@ pub struct BuilderConfig { /// Resource metering context pub resource_metering: ResourceMetering, + + /// Backrun bundle store for storing backrun transactions + pub backrun_bundle_store: BackrunBundleStore, } impl core::fmt::Debug for BuilderConfig { @@ -152,6 +155,7 @@ impl core::fmt::Debug for BuilderConfig { .field("specific", &self.specific) .field("max_gas_per_txn", &self.max_gas_per_txn) .field("gas_limiter_config", &self.gas_limiter_config) + .field("backrun_bundle_store", &self.backrun_bundle_store) .finish() } } @@ -171,6 +175,7 @@ impl Default for BuilderConfig { max_gas_per_txn: None, gas_limiter_config: GasLimiterArgs::default(), resource_metering: ResourceMetering::default(), + backrun_bundle_store: BackrunBundleStore::default(), } } } @@ -197,6 +202,7 @@ where args.enable_resource_metering, args.resource_metering_buffer_size, ), + backrun_bundle_store: BackrunBundleStore::new(args.backrun_bundle_buffer_size), specific: S::try_from(args)?, }) } diff --git a/crates/op-rbuilder/src/builders/standard/payload.rs b/crates/op-rbuilder/src/builders/standard/payload.rs index d9a74add..4e94a89f 100644 --- a/crates/op-rbuilder/src/builders/standard/payload.rs +++ b/crates/op-rbuilder/src/builders/standard/payload.rs @@ -252,6 +252,7 @@ where max_gas_per_txn: self.config.max_gas_per_txn, address_gas_limiter: self.address_gas_limiter.clone(), resource_metering: self.config.resource_metering.clone(), + backrun_bundle_store: self.config.backrun_bundle_store.clone(), }; let builder = OpBuilder::new(best); diff --git a/crates/op-rbuilder/src/bundles.rs b/crates/op-rbuilder/src/bundles.rs new file mode 100644 index 00000000..1c917b8b --- /dev/null +++ b/crates/op-rbuilder/src/bundles.rs @@ -0,0 +1,337 @@ +use alloy_consensus::transaction::Recovered; +use alloy_primitives::{Address, TxHash}; +use concurrent_queue::ConcurrentQueue; +use jsonrpsee::{ + core::{RpcResult, async_trait}, + proc_macros::rpc, +}; +use op_alloy_consensus::OpTxEnvelope; +use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Instant}; +use tips_core::AcceptedBundle; +use tracing::{debug, info, warn}; +use uuid::Uuid; + +use crate::metrics::OpRBuilderMetrics; + +#[derive(Clone)] +pub struct StoredBackrunBundle { + pub bundle_id: Uuid, + pub sender: Address, + pub backrun_txs: Vec>, +} + +struct BackrunData { + by_target_tx: dashmap::DashMap>, + lru: ConcurrentQueue, +} + +#[derive(Clone)] +pub struct BackrunBundleStore { + data: Arc, + metrics: OpRBuilderMetrics, +} + +impl Debug for BackrunBundleStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BackrunBundleStore") + .field("by_target_tx_count", &self.data.by_target_tx.len()) + .finish() + } +} + +impl BackrunBundleStore { + pub fn new(buffer_size: usize) -> Self { + Self { + data: Arc::new(BackrunData { + by_target_tx: dashmap::DashMap::new(), + lru: ConcurrentQueue::bounded(buffer_size), + }), + metrics: OpRBuilderMetrics::default(), + } + } + + pub fn insert(&self, bundle: AcceptedBundle) -> Result<(), String> { + if bundle.txs.len() < 2 { + return Err("Bundle must have at least 2 transactions (target + backrun)".to_string()); + } + + let target_tx_hash = bundle.txs[0].tx_hash(); + let backrun_txs: Vec> = bundle.txs[1..].to_vec(); + let backrun_sender = backrun_txs[0].signer(); + + if self.data.lru.is_full() + && let Ok(evicted_hash) = self.data.lru.pop() + { + self.data.by_target_tx.remove(&evicted_hash); + warn!( + target: "backrun_bundles", + evicted_target = ?evicted_hash, + "Evicted old backrun bundle" + ); + } + + let _ = self.data.lru.push(target_tx_hash); + + let stored_bundle = StoredBackrunBundle { + bundle_id: *bundle.uuid(), + sender: backrun_sender, + backrun_txs: backrun_txs.clone(), + }; + + let replaced = { + let mut entry = self.data.by_target_tx.entry(target_tx_hash).or_default(); + entry.insert(backrun_sender, stored_bundle).is_some() + }; + + if replaced { + info!( + target: "backrun_bundles", + target_tx = ?target_tx_hash, + sender = ?backrun_sender, + bundle_id = ?bundle.uuid(), + "Replaced existing backrun bundle from same sender" + ); + } + + self.metrics + .backrun_bundles_in_store + .set(self.data.by_target_tx.len() as f64); + + Ok(()) + } + + pub fn get(&self, target_tx_hash: &TxHash) -> Option> { + self.data + .by_target_tx + .get(target_tx_hash) + .map(|entry| entry.values().cloned().collect()) + } + + pub fn remove(&self, target_tx_hash: &TxHash) { + if let Some((_, bundles)) = self.data.by_target_tx.remove(target_tx_hash) { + debug!( + target: "backrun_bundles", + target_tx = ?target_tx_hash, + bundle_count = bundles.len(), + "Removed backrun bundles" + ); + + self.metrics + .backrun_bundles_in_store + .set(self.data.by_target_tx.len() as f64); + } + } + + pub fn len(&self) -> usize { + self.data.by_target_tx.len() + } + + pub fn is_empty(&self) -> bool { + self.data.by_target_tx.is_empty() + } +} + +impl Default for BackrunBundleStore { + fn default() -> Self { + Self::new(10_000) + } +} + +#[cfg_attr(not(test), rpc(server, namespace = "base"))] +#[cfg_attr(test, rpc(server, client, namespace = "base"))] +pub trait BaseBundlesApiExt { + #[method(name = "sendBackrunBundle")] + async fn send_backrun_bundle(&self, bundle: AcceptedBundle) -> RpcResult<()>; +} + +pub(crate) struct BundlesApiExt { + bundle_store: BackrunBundleStore, + metrics: OpRBuilderMetrics, +} + +impl BundlesApiExt { + pub(crate) fn new(bundle_store: BackrunBundleStore) -> Self { + Self { + bundle_store, + metrics: OpRBuilderMetrics::default(), + } + } +} + +#[async_trait] +impl BaseBundlesApiExtServer for BundlesApiExt { + async fn send_backrun_bundle(&self, bundle: AcceptedBundle) -> RpcResult<()> { + self.metrics.backrun_bundles_received_total.increment(1); + + let start = Instant::now(); + self.bundle_store.insert(bundle).map_err(|e| { + warn!(target: "backrun_bundles", error = %e, "Failed to store bundle"); + jsonrpsee::types::ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + format!("Failed to store bundle: {e}"), + None::<()>, + ) + })?; + self.metrics + .backrun_bundle_insert_duration + .record(start.elapsed().as_secs_f64()); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_consensus::SignableTransaction; + use alloy_primitives::{Address, TxHash, U256}; + use alloy_provider::network::TxSignerSync; + use alloy_signer_local::PrivateKeySigner; + use op_alloy_consensus::OpTxEnvelope; + use op_alloy_rpc_types::OpTransactionRequest; + use tips_core::MeterBundleResponse; + + fn create_recovered_tx( + from: &PrivateKeySigner, + nonce: u64, + to: Address, + ) -> Recovered { + let mut txn = OpTransactionRequest::default() + .value(U256::from(10_000)) + .gas_limit(21_000) + .max_fee_per_gas(200) + .max_priority_fee_per_gas(100) + .from(from.address()) + .to(to) + .nonce(nonce) + .build_typed_tx() + .unwrap(); + + let sig = from.sign_transaction_sync(&mut txn).unwrap(); + let envelope = + OpTxEnvelope::Eip1559(txn.eip1559().cloned().unwrap().into_signed(sig).clone()); + Recovered::new_unchecked(envelope, from.address()) + } + + fn create_test_accepted_bundle(txs: Vec>) -> AcceptedBundle { + AcceptedBundle { + uuid: Uuid::new_v4(), + txs, + block_number: 1, + flashblock_number_min: None, + flashblock_number_max: None, + min_timestamp: None, + max_timestamp: None, + reverting_tx_hashes: vec![], + replacement_uuid: None, + dropping_tx_hashes: vec![], + meter_bundle_response: MeterBundleResponse { + bundle_gas_price: U256::ZERO, + bundle_hash: TxHash::ZERO, + coinbase_diff: U256::ZERO, + eth_sent_to_coinbase: U256::ZERO, + gas_fees: U256::ZERO, + results: vec![], + state_block_number: 0, + state_flashblock_index: None, + total_gas_used: 0, + total_execution_time_us: 0, + }, + } + } + + #[test] + fn test_backrun_bundle_store() { + let alice = PrivateKeySigner::random(); + let bob = PrivateKeySigner::random(); + + let target_tx = create_recovered_tx(&alice, 0, bob.address()); + let backrun_tx1 = create_recovered_tx(&alice, 1, bob.address()); + let backrun_tx2 = create_recovered_tx(&alice, 2, bob.address()); + + let target_tx_hash = target_tx.tx_hash(); + + let store = BackrunBundleStore::new(100); + + // Test insert fails with only 1 tx (need target + at least 1 backrun) + let single_tx_bundle = create_test_accepted_bundle(vec![target_tx.clone()]); + assert!(store.insert(single_tx_bundle).is_err()); + assert_eq!(store.len(), 0); + + // Test insert succeeds with 2+ txs + let valid_bundle = + create_test_accepted_bundle(vec![target_tx.clone(), backrun_tx1.clone()]); + assert!(store.insert(valid_bundle).is_ok()); + assert_eq!(store.len(), 1); + + // Test get returns the backrun txs (not the target) + let retrieved = store.get(&target_tx_hash).unwrap(); + assert_eq!(retrieved.len(), 1); + assert_eq!(retrieved[0].backrun_txs.len(), 1); + assert_eq!(retrieved[0].backrun_txs[0].tx_hash(), backrun_tx1.tx_hash()); + + // Test same sender replaces previous bundle (not accumulate) + let replacement_bundle = + create_test_accepted_bundle(vec![target_tx.clone(), backrun_tx2.clone()]); + assert!(store.insert(replacement_bundle).is_ok()); + assert_eq!(store.len(), 1); + + let retrieved = store.get(&target_tx_hash).unwrap(); + assert_eq!(retrieved.len(), 1); // Still 1 bundle (replaced, not accumulated) + assert_eq!(retrieved[0].backrun_txs[0].tx_hash(), backrun_tx2.tx_hash()); // New tx + + // Test remove + store.remove(&target_tx_hash); + assert_eq!(store.len(), 0); + assert!(store.get(&target_tx_hash).is_none()); + + // Test remove on non-existent key doesn't panic + store.remove(&TxHash::ZERO); + } + + #[test] + fn test_backrun_bundle_store_multiple_senders() { + let alice = PrivateKeySigner::random(); + let bob = PrivateKeySigner::random(); + let charlie = PrivateKeySigner::random(); + + let target_tx = create_recovered_tx(&alice, 0, bob.address()); + let alice_backrun = create_recovered_tx(&alice, 1, bob.address()); + let charlie_backrun = create_recovered_tx(&charlie, 0, bob.address()); + + let target_tx_hash = target_tx.tx_hash(); + let store = BackrunBundleStore::new(100); + + // Alice submits backrun + let alice_bundle = create_test_accepted_bundle(vec![target_tx.clone(), alice_backrun]); + store.insert(alice_bundle).unwrap(); + + // Charlie submits backrun for same target + let charlie_bundle = create_test_accepted_bundle(vec![target_tx.clone(), charlie_backrun]); + store.insert(charlie_bundle).unwrap(); + + // Both bundles should exist (different senders) + let retrieved = store.get(&target_tx_hash).unwrap(); + assert_eq!(retrieved.len(), 2); + } + + #[test] + fn test_backrun_bundle_store_lru_eviction() { + let alice = PrivateKeySigner::random(); + let bob = PrivateKeySigner::random(); + + // Small buffer to test eviction + let store = BackrunBundleStore::new(2); + + // Insert 3 bundles, first should be evicted + for nonce in 0..3u64 { + let target = create_recovered_tx(&alice, nonce * 2, bob.address()); + let backrun = create_recovered_tx(&alice, nonce * 2 + 1, bob.address()); + let bundle = create_test_accepted_bundle(vec![target, backrun]); + let _ = store.insert(bundle); + } + + // Only 2 should remain due to LRU eviction + assert_eq!(store.len(), 2); + } +} diff --git a/crates/op-rbuilder/src/launcher.rs b/crates/op-rbuilder/src/launcher.rs index 5d7c6f84..27fa04f3 100644 --- a/crates/op-rbuilder/src/launcher.rs +++ b/crates/op-rbuilder/src/launcher.rs @@ -4,6 +4,7 @@ use reth_optimism_rpc::OpEthApiBuilder; use crate::{ args::*, builders::{BuilderConfig, BuilderMode, FlashblocksBuilder, PayloadBuilder, StandardBuilder}, + bundles::{BaseBundlesApiExtServer, BundlesApiExt}, metrics::{VERSION, record_flag_gauge_metrics}, monitor_tx_pool::monitor_tx_pool, primitives::reth::engine_api_builder::OpEngineApiBuilder, @@ -111,6 +112,7 @@ where let reverted_cache = Cache::builder().max_capacity(100).build(); let reverted_cache_copy = reverted_cache.clone(); let resource_metering = builder_config.resource_metering.clone(); + let backrun_bundle_store = builder_config.backrun_bundle_store.clone(); let mut addons: OpAddOns< _, @@ -167,8 +169,11 @@ where } let resource_metering_ext = ResourceMeteringExt::new(resource_metering); + let bundles_ext = BundlesApiExt::new(backrun_bundle_store); ctx.modules .add_or_replace_configured(resource_metering_ext.into_rpc())?; + ctx.modules + .add_or_replace_configured(bundles_ext.into_rpc())?; Ok(()) }) diff --git a/crates/op-rbuilder/src/lib.rs b/crates/op-rbuilder/src/lib.rs index f61c39b0..b30bd9f7 100644 --- a/crates/op-rbuilder/src/lib.rs +++ b/crates/op-rbuilder/src/lib.rs @@ -1,5 +1,6 @@ pub mod args; pub mod builders; +pub mod bundles; pub mod flashtestations; pub mod gas_limiter; pub mod launcher; diff --git a/crates/op-rbuilder/src/metrics.rs b/crates/op-rbuilder/src/metrics.rs index 75ebb874..8d3554b6 100644 --- a/crates/op-rbuilder/src/metrics.rs +++ b/crates/op-rbuilder/src/metrics.rs @@ -167,6 +167,22 @@ pub struct OpRBuilderMetrics { pub metering_unknown_transaction: Counter, /// Count of the number of times we were unable to resolve metering information due to locking pub metering_locked_transaction: Counter, + /// Current number of backrun bundles in store + pub backrun_bundles_in_store: Gauge, + /// Number of target transactions found with backrun bundles + pub backrun_target_txs_found_total: Counter, + /// Number of backrun bundles received via RPC + pub backrun_bundles_received_total: Counter, + /// Number of backrun bundles that reverted during execution (all-or-nothing) + pub backrun_bundles_reverted_total: Counter, + /// Number of backrun bundles rejected due to priority fee below target tx + pub backrun_bundles_rejected_low_fee_total: Counter, + /// Number of backrun bundles successfully landed in a block + pub backrun_bundles_landed_total: Counter, + /// Latency of inserting a backrun bundle into the store + pub backrun_bundle_insert_duration: Histogram, + /// Duration of executing all backrun bundles for a target transaction + pub backrun_bundle_execution_duration: Histogram, } impl OpRBuilderMetrics { diff --git a/crates/op-rbuilder/src/tests/backrun.rs b/crates/op-rbuilder/src/tests/backrun.rs new file mode 100644 index 00000000..6759c051 --- /dev/null +++ b/crates/op-rbuilder/src/tests/backrun.rs @@ -0,0 +1,395 @@ +use crate::tests::{ChainDriverExt, LocalInstance, framework::ONE_ETH}; +use alloy_eips::eip2718::Encodable2718; +use alloy_primitives::{TxHash, U256}; +use alloy_provider::Provider; +use macros::rb_test; +use tips_core::{AcceptedBundle, MeterBundleResponse}; +use uuid::Uuid; + +/// Tests that backrun bundles are all-or-nothing: +/// - If any backrun tx in a bundle reverts, the entire bundle is excluded +/// - Even successful txs in the bundle are not included +#[rb_test(flashblocks)] +async fn backrun_bundle_all_or_nothing_revert(rbuilder: LocalInstance) -> eyre::Result<()> { + let driver = rbuilder.driver().await?; + let accounts = driver.fund_accounts(3, ONE_ETH).await?; + + // 1. Build target tx first (we need Recovered for bundle) + let target_tx = driver + .create_transaction() + .with_signer(accounts[0]) + .with_max_priority_fee_per_gas(20) + .build() + .await; + let target_tx_hash = target_tx.tx_hash().clone(); + + // Send to mempool manually (send() doesn't return the Recovered tx) + let provider = rbuilder.provider().await?; + let _ = provider + .send_raw_transaction(target_tx.encoded_2718().as_slice()) + .await?; + + // 2. Create backrun transactions: + // - backrun_ok: valid tx with HIGH priority (executes first, succeeds) + // - backrun_revert: tx that will REVERT (executes second, fails) + // Both must have priority fee >= target's (20) to pass fee validation + let backrun_ok = driver + .create_transaction() + .with_signer(accounts[1]) + .with_max_priority_fee_per_gas(50) // High priority - executes first + .build() + .await; + let backrun_ok_hash = backrun_ok.tx_hash().clone(); + + let backrun_revert = driver + .create_transaction() + .with_signer(accounts[2]) + .with_max_priority_fee_per_gas(25) // >= target's 20, but executes second (lower than 50) + .with_revert() // This tx will revert + .build() + .await; + let backrun_revert_hash = backrun_revert.tx_hash().clone(); + + // 3. Insert backrun bundle into store + // Bundle format: [target_tx, backrun_txs...] + let bundle = AcceptedBundle { + uuid: Uuid::new_v4(), + txs: vec![target_tx, backrun_ok, backrun_revert], + block_number: driver.latest().await?.header.number + 1, + flashblock_number_min: None, + flashblock_number_max: None, + min_timestamp: None, + max_timestamp: None, + reverting_tx_hashes: vec![], + replacement_uuid: None, + dropping_tx_hashes: vec![], + meter_bundle_response: MeterBundleResponse { + bundle_gas_price: U256::ZERO, + bundle_hash: TxHash::ZERO, + coinbase_diff: U256::ZERO, + eth_sent_to_coinbase: U256::ZERO, + gas_fees: U256::ZERO, + results: vec![], + state_block_number: 0, + state_flashblock_index: None, + total_gas_used: 0, + total_execution_time_us: 0, + }, + }; + + rbuilder + .backrun_bundle_store() + .insert(bundle) + .expect("Failed to insert backrun bundle"); + + // 4. Build the block + driver.build_new_block().await?; + + // 5. Verify block contents + let block = driver.latest_full().await?; + let tx_hashes: Vec<_> = block.transactions.hashes().collect(); + + // Target tx SHOULD be in block (it was in mempool independently) + assert!( + tx_hashes.contains(&target_tx_hash), + "Target tx should be included in block" + ); + + // backrun_ok should NOT be in block (all-or-nothing: bundle failed) + assert!( + !tx_hashes.contains(&backrun_ok_hash), + "backrun_ok should NOT be in block (all-or-nothing revert)" + ); + + // backrun_revert should NOT be in block (it caused the revert) + assert!( + !tx_hashes.contains(&backrun_revert_hash), + "backrun_revert should NOT be in block" + ); + + Ok(()) +} + +/// Tests that multiple backrun bundles for the same target tx are sorted by total priority fee +/// - Bundles with higher total priority fee are processed first +/// - Both bundles can land if they don't conflict +#[rb_test(flashblocks)] +async fn backrun_bundles_sorted_by_total_fee(rbuilder: LocalInstance) -> eyre::Result<()> { + let driver = rbuilder.driver().await?; + let accounts = driver.fund_accounts(5, ONE_ETH).await?; + + // 1. Build target tx with priority fee 20 + let target_tx = driver + .create_transaction() + .with_signer(accounts[0]) + .with_max_priority_fee_per_gas(20) + .build() + .await; + let target_tx_hash = target_tx.tx_hash().clone(); + + // Send to mempool manually + let provider = rbuilder.provider().await?; + let _ = provider + .send_raw_transaction(target_tx.encoded_2718().as_slice()) + .await?; + + // 2. Create Bundle A with HIGH total priority fee + // Two txs: 60 + 50 = 110 total + let bundle_a_tx1 = driver + .create_transaction() + .with_signer(accounts[1]) + .with_max_priority_fee_per_gas(60) + .build() + .await; + let bundle_a_tx1_hash = bundle_a_tx1.tx_hash().clone(); + + let bundle_a_tx2 = driver + .create_transaction() + .with_signer(accounts[2]) + .with_max_priority_fee_per_gas(50) + .build() + .await; + let bundle_a_tx2_hash = bundle_a_tx2.tx_hash().clone(); + + // 3. Create Bundle B with LOW total priority fee + // Two txs: 30 + 25 = 55 total + let bundle_b_tx1 = driver + .create_transaction() + .with_signer(accounts[3]) + .with_max_priority_fee_per_gas(30) + .build() + .await; + let bundle_b_tx1_hash = bundle_b_tx1.tx_hash().clone(); + + let bundle_b_tx2 = driver + .create_transaction() + .with_signer(accounts[4]) + .with_max_priority_fee_per_gas(25) + .build() + .await; + let bundle_b_tx2_hash = bundle_b_tx2.tx_hash().clone(); + + // 4. Insert Bundle B FIRST (lower total fee), then Bundle A (higher total fee) + // This verifies that sorting reorders them correctly + let bundle_b = AcceptedBundle { + uuid: Uuid::new_v4(), + txs: vec![target_tx.clone(), bundle_b_tx1, bundle_b_tx2], + block_number: driver.latest().await?.header.number + 1, + flashblock_number_min: None, + flashblock_number_max: None, + min_timestamp: None, + max_timestamp: None, + reverting_tx_hashes: vec![], + replacement_uuid: None, + dropping_tx_hashes: vec![], + meter_bundle_response: MeterBundleResponse { + bundle_gas_price: U256::ZERO, + bundle_hash: TxHash::ZERO, + coinbase_diff: U256::ZERO, + eth_sent_to_coinbase: U256::ZERO, + gas_fees: U256::ZERO, + results: vec![], + state_block_number: 0, + state_flashblock_index: None, + total_gas_used: 0, + total_execution_time_us: 0, + }, + }; + + let bundle_a = AcceptedBundle { + uuid: Uuid::new_v4(), + txs: vec![target_tx, bundle_a_tx1, bundle_a_tx2], + block_number: driver.latest().await?.header.number + 1, + flashblock_number_min: None, + flashblock_number_max: None, + min_timestamp: None, + max_timestamp: None, + reverting_tx_hashes: vec![], + replacement_uuid: None, + dropping_tx_hashes: vec![], + meter_bundle_response: MeterBundleResponse { + bundle_gas_price: U256::ZERO, + bundle_hash: TxHash::ZERO, + coinbase_diff: U256::ZERO, + eth_sent_to_coinbase: U256::ZERO, + gas_fees: U256::ZERO, + results: vec![], + state_block_number: 0, + state_flashblock_index: None, + total_gas_used: 0, + total_execution_time_us: 0, + }, + }; + + // Insert in "wrong" order - B first, then A + rbuilder + .backrun_bundle_store() + .insert(bundle_b) + .expect("Failed to insert bundle B"); + rbuilder + .backrun_bundle_store() + .insert(bundle_a) + .expect("Failed to insert bundle A"); + + // 5. Build the block + driver.build_new_block().await?; + + // 6. Verify block contents + let block = driver.latest_full().await?; + let tx_hashes: Vec<_> = block.transactions.hashes().collect(); + + // All txs should be in block + assert!( + tx_hashes.contains(&target_tx_hash), + "Target tx not included in block" + ); + assert!( + tx_hashes.contains(&bundle_a_tx1_hash), + "Bundle A tx1 not included in block" + ); + assert!( + tx_hashes.contains(&bundle_a_tx2_hash), + "Bundle A tx2 not included in block" + ); + assert!( + tx_hashes.contains(&bundle_b_tx1_hash), + "Bundle B tx1 not included in block" + ); + assert!( + tx_hashes.contains(&bundle_b_tx2_hash), + "Bundle B tx2 not included in block" + ); + + // 7. Verify ordering: Bundle A txs come BEFORE Bundle B txs + // (higher total fee bundle processed first) + let a_tx1_pos = tx_hashes + .iter() + .position(|h| *h == bundle_a_tx1_hash) + .expect("Bundle A tx1 position not found"); + let a_tx2_pos = tx_hashes + .iter() + .position(|h| *h == bundle_a_tx2_hash) + .expect("Bundle A tx2 position not found"); + let b_tx1_pos = tx_hashes + .iter() + .position(|h| *h == bundle_b_tx1_hash) + .expect("Bundle B tx1 position not found"); + let b_tx2_pos = tx_hashes + .iter() + .position(|h| *h == bundle_b_tx2_hash) + .expect("Bundle B tx2 position not found"); + + // Bundle A (higher total fee) should come before Bundle B + let bundle_a_last_pos = a_tx1_pos.max(a_tx2_pos); + let bundle_b_first_pos = b_tx1_pos.min(b_tx2_pos); + + assert!( + bundle_a_last_pos < bundle_b_first_pos, + "Bundle A (total fee 110) should be processed before Bundle B (total fee 55). \ + Bundle A last tx at pos {}, Bundle B first tx at pos {}", + bundle_a_last_pos, + bundle_b_first_pos + ); + + Ok(()) +} + +/// Tests that backrun bundles are rejected if total bundle priority fee < target tx priority fee +#[rb_test(flashblocks)] +async fn backrun_bundle_rejected_low_total_fee(rbuilder: LocalInstance) -> eyre::Result<()> { + let driver = rbuilder.driver().await?; + let accounts = driver.fund_accounts(3, ONE_ETH).await?; + + // 1. Build target tx with HIGH priority fee (100) + let target_tx = driver + .create_transaction() + .with_signer(accounts[0]) + .with_max_priority_fee_per_gas(100) + .build() + .await; + let target_tx_hash = target_tx.tx_hash().clone(); + + // Send to mempool manually + let provider = rbuilder.provider().await?; + let _ = provider + .send_raw_transaction(target_tx.encoded_2718().as_slice()) + .await?; + + // 2. Create backrun transactions with LOW total fee: + // - backrun_1: priority fee 30 + // - backrun_2: priority fee 20 + // - Total: 30 + 20 = 50 < target's 100 → bundle rejected + let backrun_1 = driver + .create_transaction() + .with_signer(accounts[1]) + .with_max_priority_fee_per_gas(30) + .build() + .await; + let backrun_1_hash = backrun_1.tx_hash().clone(); + + let backrun_2 = driver + .create_transaction() + .with_signer(accounts[2]) + .with_max_priority_fee_per_gas(20) + .build() + .await; + let backrun_2_hash = backrun_2.tx_hash().clone(); + + // 3. Insert backrun bundle into store + let bundle = AcceptedBundle { + uuid: Uuid::new_v4(), + txs: vec![target_tx, backrun_1, backrun_2], + block_number: driver.latest().await?.header.number + 1, + flashblock_number_min: None, + flashblock_number_max: None, + min_timestamp: None, + max_timestamp: None, + reverting_tx_hashes: vec![], + replacement_uuid: None, + dropping_tx_hashes: vec![], + meter_bundle_response: MeterBundleResponse { + bundle_gas_price: U256::ZERO, + bundle_hash: TxHash::ZERO, + coinbase_diff: U256::ZERO, + eth_sent_to_coinbase: U256::ZERO, + gas_fees: U256::ZERO, + results: vec![], + state_block_number: 0, + state_flashblock_index: None, + total_gas_used: 0, + total_execution_time_us: 0, + }, + }; + + rbuilder + .backrun_bundle_store() + .insert(bundle) + .expect("Failed to insert backrun bundle"); + + // 4. Build the block + driver.build_new_block().await?; + + // 5. Verify block contents + let block = driver.latest_full().await?; + let tx_hashes: Vec<_> = block.transactions.hashes().collect(); + + // Target tx SHOULD be in block (it was in mempool independently) + assert!( + tx_hashes.contains(&target_tx_hash), + "Target tx should be included in block" + ); + + // backrun_1 should NOT be in block (bundle rejected: total fee 50 < target fee 100) + assert!( + !tx_hashes.contains(&backrun_1_hash), + "backrun_1 should NOT be in block (bundle rejected: total fee below target)" + ); + + // backrun_2 should NOT be in block (bundle rejected) + assert!( + !tx_hashes.contains(&backrun_2_hash), + "backrun_2 should NOT be in block" + ); + + Ok(()) +} diff --git a/crates/op-rbuilder/src/tests/framework/instance.rs b/crates/op-rbuilder/src/tests/framework/instance.rs index 8c718491..a7611b8e 100644 --- a/crates/op-rbuilder/src/tests/framework/instance.rs +++ b/crates/op-rbuilder/src/tests/framework/instance.rs @@ -1,6 +1,7 @@ use crate::{ args::OpRbuilderArgs, builders::{BuilderConfig, FlashblocksBuilder, PayloadBuilder, StandardBuilder}, + bundles::BackrunBundleStore, primitives::reth::engine_api_builder::OpEngineApiBuilder, revert_protection::{EthApiExtServer, RevertProtectionExt}, tests::{ @@ -64,6 +65,7 @@ pub struct LocalInstance { _node_handle: Box, pool_observer: TransactionPoolObserver, attestation_server: Option, + backrun_bundle_store: BackrunBundleStore, } impl LocalInstance { @@ -112,6 +114,7 @@ impl LocalInstance { .expect("Failed to convert rollup args to builder config"); let da_config = builder_config.da_config.clone(); let gas_limit_config = builder_config.gas_limit_config.clone(); + let backrun_bundle_store = builder_config.backrun_bundle_store.clone(); let addons: OpAddOns< _, @@ -187,6 +190,7 @@ impl LocalInstance { task_manager: Some(task_manager), pool_observer: TransactionPoolObserver::new(pool_monitor, reverted_cache_clone), attestation_server, + backrun_bundle_store, }) } @@ -267,6 +271,10 @@ impl LocalInstance { &self.attestation_server } + pub fn backrun_bundle_store(&self) -> &BackrunBundleStore { + &self.backrun_bundle_store + } + pub async fn driver(&self) -> eyre::Result> { ChainDriver::::local(self).await } diff --git a/crates/op-rbuilder/src/tests/mod.rs b/crates/op-rbuilder/src/tests/mod.rs index fd202a89..87384e5c 100644 --- a/crates/op-rbuilder/src/tests/mod.rs +++ b/crates/op-rbuilder/src/tests/mod.rs @@ -17,6 +17,9 @@ mod miner_gas_limit; #[cfg(test)] mod gas_limiter; +#[cfg(test)] +mod backrun; + #[cfg(test)] mod ordering;