diff --git a/aggregation_mode/proof_aggregator/src/backend/config.rs b/aggregation_mode/proof_aggregator/src/backend/config.rs index 41081afc7..5fcb09971 100644 --- a/aggregation_mode/proof_aggregator/src/backend/config.rs +++ b/aggregation_mode/proof_aggregator/src/backend/config.rs @@ -21,6 +21,12 @@ pub struct Config { pub sp1_chunk_aggregator_vk_hash: String, pub monthly_budget_eth: f64, pub db_connection_urls: Vec, + pub max_bump_retries: u16, + pub bump_retry_interval_seconds: u64, + pub base_bump_percentage: u64, + pub max_fee_bump_percentage: u64, + pub priority_fee_wei: u128, + pub final_receipt_check_timeout_seconds: u64, } impl Config { diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index f43398595..59268fcee 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -20,12 +20,15 @@ use crate::{ use alloy::{ consensus::{BlobTransactionSidecar, EnvKzgSettings, EthereumTxEnvelope, TxEip4844WithSidecar}, - eips::{eip4844::BYTES_PER_BLOB, eip7594::BlobTransactionSidecarEip7594, Encodable2718}, + eips::{ + eip4844::BYTES_PER_BLOB, eip7594::BlobTransactionSidecarEip7594, BlockNumberOrTag, + Encodable2718, + }, hex, - network::EthereumWallet, - primitives::{utils::parse_ether, Address, U256}, + network::{EthereumWallet, TransactionBuilder}, + primitives::{utils::parse_ether, Address, TxHash, U256}, providers::{PendingTransactionError, Provider, ProviderBuilder}, - rpc::types::TransactionReceipt, + rpc::types::{TransactionReceipt, TransactionRequest}, signers::local::LocalSigner, }; use config::Config; @@ -52,6 +55,16 @@ pub enum AggregatedProofSubmissionError { MerkleRootMisMatch, StoringMerklePaths(DbError), GasPriceError(String), + LatestBlockNotFound, + BaseFeePerGasMissing, +} + +enum SubmitOutcome { + // NOTE: Boxed because enums are sized to their largest variant; without boxing, + // every `SubmitOutcome` would reserve space for a full `TransactionReceipt`, + // even in the `Pending` case (see clippy::large_enum_variant). + Confirmed(Box), + Pending(TxHash), } pub struct ProofAggregator { @@ -62,6 +75,7 @@ pub struct ProofAggregator { sp1_chunk_aggregator_vk_hash_bytes: [u8; 32], risc0_chunk_aggregator_image_id_bytes: [u8; 32], db: Db, + signer_address: Address, } impl ProofAggregator { @@ -72,7 +86,9 @@ impl ProofAggregator { config.ecdsa.private_key_store_password.clone(), ) .expect("Keystore signer should be `cast wallet` compliant"); - let wallet = EthereumWallet::from(signer); + let wallet = EthereumWallet::from(signer.clone()); + + let signer_address = signer.address(); // Check if the monthly budget is non-negative to avoid runtime errors later let _monthly_budget_in_wei = parse_ether(&config.monthly_budget_eth.to_string()) @@ -117,6 +133,7 @@ impl ProofAggregator { sp1_chunk_aggregator_vk_hash_bytes, risc0_chunk_aggregator_image_id_bytes, db, + signer_address, } } @@ -334,7 +351,139 @@ impl ProofAggregator { info!("Sending proof to ProofAggregationService contract..."); - let tx_req = match aggregated_proof { + let max_retries = self.config.max_bump_retries; + + let mut last_error: Option = None; + + let mut pending_hashes: Vec = Vec::with_capacity(max_retries as usize); + + // Get the nonce once at the beginning and reuse it for all retries + let nonce = self + .proof_aggregation_service + .provider() + .get_transaction_count(self.signer_address) + .await + .map_err(|e| { + RetryError::Transient( + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(format!( + "Failed to get nonce: {e}" + )), + ) + })?; + + info!("Using nonce {}", nonce); + + for attempt in 0..max_retries { + info!("Transaction attempt {} of {}", attempt + 1, max_retries); + + // Wrap the entire transaction submission in a result to catch all errors, passing + // the same nonce to all attempts + let attempt_result = self + .try_submit_transaction(&blob, blob_versioned_hash, aggregated_proof, nonce) + .await; + + match attempt_result { + Ok(SubmitOutcome::Confirmed(receipt)) => { + info!( + "Transaction confirmed successfully on attempt {}", + attempt + 1 + ); + return Ok(*receipt); + } + Ok(SubmitOutcome::Pending(tx_hash)) => { + warn!( + "Attempt {} timed out waiting for receipt; storing pending tx and continuing", + attempt + 1 + ); + pending_hashes.push(tx_hash); + + last_error = Some( + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction( + "Timed out waiting for receipt".to_string(), + ), + ); + + if attempt < max_retries - 1 { + info!("Retrying with bumped gas fees and same nonce {}...", nonce); + tokio::time::sleep(Duration::from_millis(500)).await; + } + } + Err(err) => { + warn!("Attempt {} failed: {:?}", attempt + 1, err); + last_error = Some(err); + + if attempt < max_retries - 1 { + info!("Retrying with bumped gas fees and same nonce {}...", nonce); + + tokio::time::sleep(Duration::from_millis(500)).await; + } else { + warn!("Max retries ({}) exceeded", max_retries); + } + } + } + } + + let receipt_timeout = Duration::from_secs(self.config.final_receipt_check_timeout_seconds); + + // After exhausting all retry attempts, we iterate over every pending transaction hash + // that was previously submitted with the same nonce but different gas parameters. + // One of these transactions may have been included in a block while we were still + // retrying and waiting on others. By explicitly checking the receipt for each hash, + // we ensure we don't "lose" a transaction that was actually mined but whose receipt + // we never observed due to timeouts during earlier attempts. + for (i, tx_hash) in pending_hashes.into_iter().enumerate() { + // NOTE: `get_transaction_receipt` has no built-in timeout, so we guard it to + // avoid hanging the aggregator on a stuck RPC call. + match tokio::time::timeout( + receipt_timeout, + self.proof_aggregation_service + .provider() + .get_transaction_receipt(tx_hash), + ) + .await + { + Ok(Ok(Some(receipt))) => { + info!("Pending tx #{} confirmed; returning receipt", i + 1); + return Ok(receipt); + } + Ok(Ok(None)) => { + warn!( + "Pending tx #{} still no receipt yet (hash {})", + i + 1, + tx_hash + ); + } + Ok(Err(err)) => { + warn!("Pending tx #{} receipt query failed: {:?}", i + 1, err); + } + Err(_) => { + warn!( + "Pending tx #{} receipt query timed out after {:?}", + i + 1, + receipt_timeout + ); + } + } + } + + Err(RetryError::Transient(last_error.unwrap_or_else(|| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction( + "Max retries exceeded with no error details".to_string(), + ) + }))) + } + + async fn try_submit_transaction( + &self, + blob: &BlobTransactionSidecar, + blob_versioned_hash: [u8; 32], + aggregated_proof: &AlignedProof, + nonce: u64, + ) -> Result { + let retry_interval = Duration::from_secs(self.config.bump_retry_interval_seconds); + + // Build the transaction request + let mut tx_req = match aggregated_proof { AlignedProof::SP1(proof) => self .proof_aggregation_service .verifyAggregationSP1( @@ -343,12 +492,12 @@ impl ProofAggregator { proof.proof_with_pub_values.bytes().into(), self.sp1_chunk_aggregator_vk_hash_bytes.into(), ) - .sidecar(blob) + .sidecar(blob.clone()) .into_transaction_request(), AlignedProof::Risc0(proof) => { - let encoded_seal = encode_seal(&proof.receipt) - .map_err(|e| AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string())) - .map_err(RetryError::Permanent)?; + let encoded_seal = encode_seal(&proof.receipt).map_err(|e| { + AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string()) + })?; self.proof_aggregation_service .verifyAggregationRisc0( blob_versioned_hash.into(), @@ -356,68 +505,116 @@ impl ProofAggregator { proof.receipt.journal.bytes.clone().into(), self.risc0_chunk_aggregator_image_id_bytes.into(), ) - .sidecar(blob) + .sidecar(blob.clone()) .into_transaction_request() } }; + // Set the nonce explicitly + tx_req = tx_req.with_nonce(nonce); + + // Apply gas fee bump for retries + tx_req = self.apply_gas_fee_bump(tx_req).await?; + let provider = self.proof_aggregation_service.provider(); + + // Fill the transaction let envelope = provider .fill(tx_req) .await .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction( - err.to_string(), - ) - }) - .map_err(RetryError::Transient)? + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(format!( + "Failed to fill transaction: {err}" + )) + })? .try_into_envelope() .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction( - err.to_string(), - ) - }) - .map_err(RetryError::Transient)?; + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(format!( + "Failed to convert to envelope: {err}" + )) + })?; + + // Convert to EIP-4844 transaction let tx: EthereumTxEnvelope> = envelope .try_into_pooled() .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction( - err.to_string(), - ) - }) - .map_err(RetryError::Transient)? + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(format!( + "Failed to pool transaction: {err}" + )) + })? .try_map_eip4844(|tx| { tx.try_map_sidecar(|sidecar| sidecar.try_into_7594(EnvKzgSettings::Default.get())) }) .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction( - err.to_string(), - ) - }) - .map_err(RetryError::Transient)?; + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(format!( + "Failed to convert to EIP-7594: {err}" + )) + })?; + // Send the transaction let encoded_tx = tx.encoded_2718(); let pending_tx = provider .send_raw_transaction(&encoded_tx) .await .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction( - err.to_string(), - ) - }) - .map_err(RetryError::Transient)?; + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(format!( + "Failed to send raw transaction: {err}" + )) + })?; + + let tx_hash = *pending_tx.tx_hash(); + + let receipt_result = tokio::time::timeout(retry_interval, pending_tx.get_receipt()).await; + + match receipt_result { + Ok(Ok(receipt)) => Ok(SubmitOutcome::Confirmed(Box::new(receipt))), + Ok(Err(err)) => Err( + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(format!( + "Error getting receipt: {err}" + )), + ), + Err(_) => Ok(SubmitOutcome::Pending(tx_hash)), + } + } + + // Updates the gas fees of a `TransactionRequest` using a fixed bump strategy. + // Intended for retrying an on-chain submission after a timeout. + // + // Strategy: + // - Fetch the current network gas price. + // - Apply `base_bump_percentage` to compute a bumped base fee. + // - Apply `max_fee_bump_percentage` on top of the bumped base fee to set `max_fee_per_gas`. + // - Set `max_priority_fee_per_gas` to a fixed value derived from `priority_fee_wei`. + // + // Fees are recomputed on each retry using the latest gas price (no incremental per-attempt bump). + + async fn apply_gas_fee_bump( + &self, + tx_req: TransactionRequest, + ) -> Result { + let provider = self.proof_aggregation_service.provider(); - let receipt = pending_tx - .get_receipt() + let base_bump_percentage = self.config.base_bump_percentage; + let max_fee_bump_percentage = self.config.max_fee_bump_percentage; + let priority_fee_wei = self.config.priority_fee_wei; + + let latest_block = provider + .get_block_by_number(BlockNumberOrTag::Latest) .await - .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction( - err.to_string(), - ) - }) - .map_err(RetryError::Transient)?; + .map_err(|e| AggregatedProofSubmissionError::GasPriceError(e.to_string()))? + .ok_or(AggregatedProofSubmissionError::LatestBlockNotFound)?; + + let current_base_fee = latest_block + .header + .base_fee_per_gas + .ok_or(AggregatedProofSubmissionError::BaseFeePerGasMissing)?; + + let new_base_fee = current_base_fee as f64 * (1.0 + base_bump_percentage as f64 / 100.0); + let new_max_fee = new_base_fee * (1.0 + max_fee_bump_percentage as f64 / 100.0); - Ok(receipt) + Ok(tx_req + .with_max_fee_per_gas(new_max_fee as u128) + .with_max_priority_fee_per_gas(priority_fee_wei)) } async fn wait_until_can_submit_aggregated_proof( diff --git a/config-files/config-proof-aggregator-ethereum-package.yaml b/config-files/config-proof-aggregator-ethereum-package.yaml index 23034743c..d116e134b 100644 --- a/config-files/config-proof-aggregator-ethereum-package.yaml +++ b/config-files/config-proof-aggregator-ethereum-package.yaml @@ -27,6 +27,14 @@ monthly_budget_eth: 15.0 sp1_chunk_aggregator_vk_hash: "00d6e32a34f68ea643362b96615591c94ee0bf99ee871740ab2337966a4f77af" risc0_chunk_aggregator_image_id: "8908f01022827e80a5de71908c16ee44f4a467236df20f62e7c994491629d74c" +# These values modify the bumping behavior after the aggregated proof on-chain submission times out. +max_bump_retries: 5 +bump_retry_interval_seconds: 120 +base_bump_percentage: 10 +max_fee_bump_percentage: 100 +priority_fee_wei: 3000000000 # 3 Gwei +final_receipt_check_timeout_seconds: 5 + ecdsa: private_key_store_path: "config-files/anvil.proof-aggregator.ecdsa.key.json" private_key_store_password: "" diff --git a/config-files/config-proof-aggregator.yaml b/config-files/config-proof-aggregator.yaml index 61a4f982a..707815c26 100644 --- a/config-files/config-proof-aggregator.yaml +++ b/config-files/config-proof-aggregator.yaml @@ -24,8 +24,16 @@ monthly_budget_eth: 15.0 # These program ids are the ones from the chunk aggregator programs # Can be found in the Proof Aggregation Service deployment config # (remember to trim the 0x prefix) -sp1_chunk_aggregator_vk_hash: "00ba19eed0aaeb0151f07b8d3ee7c659bcd29f3021e48fb42766882f55b84509" -risc0_chunk_aggregator_image_id: "d8cfdd5410c70395c0a1af1842a0148428cc46e353355faccfba694dd4862dbf" +sp1_chunk_aggregator_vk_hash: "00d6e32a34f68ea643362b96615591c94ee0bf99ee871740ab2337966a4f77af" +risc0_chunk_aggregator_image_id: "8908f01022827e80a5de71908c16ee44f4a467236df20f62e7c994491629d74c" + +# These values modify the bumping behavior after the aggregated proof on-chain submission times out. +max_bump_retries: 5 +bump_retry_interval_seconds: 120 +base_bump_percentage: 10 +max_fee_bump_percentage: 100 +priority_fee_wei: 3000000000 # 3 Gwei +final_receipt_check_timeout_seconds: 5 ecdsa: private_key_store_path: "config-files/anvil.proof-aggregator.ecdsa.key.json"