From 03f6724cd33b49be63706eaa72a15b30c81b6db4 Mon Sep 17 00:00:00 2001 From: Maciej Skrzypkowski Date: Thu, 8 Jan 2026 15:42:16 +0100 Subject: [PATCH] multi rpc for providers with wallets --- Cargo.lock | 1 + Cargo.toml | 1 + common/Cargo.toml | 1 + common/src/shared/alloy_tools.rs | 128 ++++++++++++++++++----- common/src/shared/transaction_monitor.rs | 41 +------- pacaya/src/l1/execution_layer.rs | 5 +- pacaya/src/l2/execution_layer.rs | 8 +- permissionless/src/l1/execution_layer.rs | 5 +- shasta/src/l1/execution_layer.rs | 7 +- shasta/src/l2/execution_layer.rs | 8 +- 10 files changed, 120 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d38302d5..131f684f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2470,6 +2470,7 @@ dependencies = [ "strum 0.27.2", "tokio", "tokio-util", + "tower 0.5.2", "tracing", "tracing-subscriber 0.3.22", "warp", diff --git a/Cargo.toml b/Cargo.toml index 02edc1e7..8ed984e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,6 +86,7 @@ taiko_protocol = { git = "https://github.com/taikoxyz/taiko-mono.git", rev = "9f taiko_rpc = { git = "https://github.com/taikoxyz/taiko-mono.git", rev = "9fadda25747b9d04984de5f23792ad83390648bd", package = "rpc" } tokio = { version = "1.45", default-features = false, features = ["full"] } tokio-util = { version = "0.7", default-features = false } +tower = { version = "0.5", default-features = false } tracing = { version = "0.1.41", default-features = false } tracing-subscriber = { version = "0.3", default-features = false, features = [ "fmt", diff --git a/common/Cargo.toml b/common/Cargo.toml index d748e013..dbfbd937 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -35,6 +35,7 @@ strum = { workspace = true, features = ["derive"] } taiko_protocol = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } +tower = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } warp = { workspace = true } diff --git a/common/src/shared/alloy_tools.rs b/common/src/shared/alloy_tools.rs index 3bc8df9a..f5a63dcd 100644 --- a/common/src/shared/alloy_tools.rs +++ b/common/src/shared/alloy_tools.rs @@ -3,12 +3,20 @@ use alloy::{ network::{Ethereum, EthereumWallet}, primitives::B256, providers::{DynProvider, Provider, ProviderBuilder, WsConnect, ext::DebugApi}, + pubsub::{PubSubConnect, PubSubFrontend}, + rpc::client::RpcClient, rpc::types::{Transaction, TransactionRequest, trace::geth::GethDebugTracingOptions}, signers::local::PrivateKeySigner, + transports::{ + http::{Http, reqwest::Url}, + layers::FallbackLayer, + }, }; use anyhow::Error; -use std::str::FromStr; -use tracing::debug; +use futures_util::future; +use std::{num::NonZeroUsize, str::FromStr}; +use tower::ServiceBuilder; +use tracing::{debug, warn}; pub async fn check_for_revert_reason>( provider: &P, @@ -75,22 +83,22 @@ fn find_errors_from_trace(trace_str: &str) -> Option { pub async fn construct_alloy_provider( signer: &Signer, - execution_ws_rpc_url: &str, + execution_ws_rpc_urls: &[String], ) -> Result { match signer { Signer::PrivateKey(private_key, _) => { debug!( - "Creating alloy provider with URL: {} and private key signer.", - execution_ws_rpc_url + "Creating alloy provider with URLs: {:?} and private key signer.", + execution_ws_rpc_urls ); let signer = PrivateKeySigner::from_str(private_key.as_str())?; - Ok(create_alloy_provider_with_wallet(signer.into(), execution_ws_rpc_url).await?) + Ok(create_alloy_provider_with_wallet(signer.into(), execution_ws_rpc_urls).await?) } Signer::Web3signer(web3signer, address) => { debug!( - "Creating alloy provider with URL: {} and web3signer signer.", - execution_ws_rpc_url + "Creating alloy provider with URLs: {:?} and web3signer signer.", + execution_ws_rpc_urls ); let preconfer_address = *address; @@ -100,34 +108,100 @@ pub async fn construct_alloy_provider( )?; let wallet = EthereumWallet::new(tx_signer); - Ok(create_alloy_provider_with_wallet(wallet, execution_ws_rpc_url).await?) + Ok(create_alloy_provider_with_wallet(wallet, execution_ws_rpc_urls).await?) } } } async fn create_alloy_provider_with_wallet( wallet: EthereumWallet, - url: &str, + urls: &[String], ) -> Result { - if url.contains("ws://") || url.contains("wss://") { - let ws = WsConnect::new(url); - Ok(ProviderBuilder::new() - .wallet(wallet) - .connect_ws(ws.clone()) - .await - .map_err(|e| Error::msg(format!("Execution layer: Failed to connect to WS: {e}")))? - .erased()) - } else if url.contains("http://") || url.contains("https://") { - Ok(ProviderBuilder::new() - .wallet(wallet) - .connect_http(url.parse::()?) - .erased()) + let client = if urls + .iter() + .all(|url| url.starts_with("ws://") || url.starts_with("wss://")) + { + let transports = create_websocket_transports(urls).await?; + + let fallback_layer = FallbackLayer::default().with_active_transport_count( + NonZeroUsize::new(transports.len()).ok_or_else(|| { + anyhow::anyhow!("Failed to create NonZeroUsize from transports.len()") + })?, + ); + RpcClient::builder().transport( + ServiceBuilder::new() + .layer(fallback_layer) + .service(transports), + false, + ) + } else if urls + .iter() + .all(|url| url.contains("http://") || url.contains("https://")) + { + let transports = create_http_transports(urls)?; + + let fallback_layer = FallbackLayer::default().with_active_transport_count( + NonZeroUsize::new(transports.len()).ok_or_else(|| { + anyhow::anyhow!("Failed to create NonZeroUsize from transports.len()") + })?, + ); + RpcClient::builder().transport( + ServiceBuilder::new() + .layer(fallback_layer) + .service(transports), + false, + ) } else { - Err(anyhow::anyhow!( - "Invalid URL, only websocket and http are supported: {}", - url - )) + return Err(anyhow::anyhow!( + "Invalid URL list, only websocket and http are supported, you cannot mix websockets and HTTP URLs: {}", + urls.join(", ") + )); + }; + + Ok(ProviderBuilder::new() + .wallet(wallet) + .connect_client(client) + .erased()) +} + +async fn create_websocket_transports(urls: &[String]) -> Result, Error> { + let connection_futures = urls.iter().map(|url| async move { + WsConnect::new(url) + .into_service() + .await + .map(|ws| (url, ws)) + .inspect(|_| debug!("Connected to {url}")) + .inspect_err(|e| warn!("Failed to connect to {url}: {e}")) + }); + + let transports: Vec<_> = future::join_all(connection_futures) + .await + .into_iter() + .filter_map(Result::ok) + .map(|(_, ws)| ws) + .collect(); + + if transports.is_empty() { + return Err(anyhow::anyhow!( + "No valid WebSocket connections established" + )); } + + Ok(transports) +} + +fn create_http_transports(urls: &[String]) -> Result>, Error> { + urls.iter() + .map(|url| { + Url::parse(url) + .map_err(|e| { + anyhow::anyhow!( + "Failed to parse URL while creating HTTP transport for alloy provider: {e}" + ) + }) + .map(Http::new) + }) + .collect() } pub async fn create_alloy_provider_without_wallet(url: &str) -> Result { diff --git a/common/src/shared/transaction_monitor.rs b/common/src/shared/transaction_monitor.rs index 381e2970..b237c01a 100644 --- a/common/src/shared/transaction_monitor.rs +++ b/common/src/shared/transaction_monitor.rs @@ -1,5 +1,5 @@ use crate::l1::{config::EthereumL1Config, tools, transaction_error::TransactionError}; -use crate::{metrics::Metrics, shared::alloy_tools, signer::Signer}; +use crate::metrics::Metrics; use alloy::{ consensus::TxType, network::{Network, ReceiptResponse, TransactionBuilder, TransactionBuilder4844}, @@ -34,8 +34,6 @@ pub struct TransactionMonitorConfig { max_attempts_to_send_tx: u64, max_attempts_to_wait_tx: u64, delay_between_tx_attempts: Duration, - execution_rpc_urls: Vec, - signer: Arc, } pub struct TransactionMonitorThread { @@ -77,8 +75,6 @@ impl TransactionMonitor { delay_between_tx_attempts: Duration::from_secs( config.delay_between_tx_attempts_sec, ), - execution_rpc_urls: config.execution_rpc_urls.clone(), - signer: config.signer.clone(), }, join_handle: Mutex::new(None), error_notification_channel, @@ -363,10 +359,7 @@ impl TransactionMonitorThread { sending_attempt: u64, ) -> Option> { match self.provider.send_transaction(tx.clone()).await { - Ok(pending_tx) => { - self.propagate_transaction_to_other_backup_nodes(tx).await; - Some(pending_tx) - } + Ok(pending_tx) => Some(pending_tx), Err(e) => { self.handle_rpc_error(e, sending_attempt).await; None @@ -374,36 +367,6 @@ impl TransactionMonitorThread { } } - /// Recreates each backup node every time to avoid connection issues - async fn propagate_transaction_to_other_backup_nodes(&self, tx: TransactionRequest) { - // Skip the first RPC URL since it is the main one - for url in self.config.execution_rpc_urls.iter().skip(1) { - let provider = alloy_tools::construct_alloy_provider(&self.config.signer, url).await; - match provider { - Ok(provider) => { - let tx = provider.send_transaction(tx.clone()).await; - if let Err(e) = tx { - if e.to_string().contains("AlreadyKnown") - || e.to_string().to_lowercase().contains("already known") - { - debug!("Transaction already known to backup node {}", url); - } else { - warn!("Failed to send transaction to backup node {}: {}", url, e); - } - } else { - info!("Transaction sent to backup node {}", url); - } - } - Err(e) => { - warn!( - "Failed to construct alloy provider for backup node {}: {}", - url, e - ); - } - } - } - } - async fn handle_rpc_error(&self, e: RpcError, sending_attempt: u64) { if let RpcError::ErrorResp(err) = &e { if err.message.contains("nonce too low") { diff --git a/pacaya/src/l1/execution_layer.rs b/pacaya/src/l1/execution_layer.rs index 553ceeb0..8994e68b 100644 --- a/pacaya/src/l1/execution_layer.rs +++ b/pacaya/src/l1/execution_layer.rs @@ -60,10 +60,7 @@ impl ELTrait for ExecutionLayer { ) -> Result { let provider = alloy_tools::construct_alloy_provider( &common_config.signer, - common_config - .execution_rpc_urls - .first() - .ok_or_else(|| anyhow!("L1 RPC URL is required"))?, + &common_config.execution_rpc_urls, ) .await?; let common = ExecutionLayerCommon::new(provider.clone()).await?; diff --git a/pacaya/src/l2/execution_layer.rs b/pacaya/src/l2/execution_layer.rs index 403f8ffb..a91a5ba2 100644 --- a/pacaya/src/l2/execution_layer.rs +++ b/pacaya/src/l2/execution_layer.rs @@ -131,9 +131,11 @@ impl L2ExecutionLayer { self.chain_id, dest_chain_id ); - let provider = - alloy_tools::construct_alloy_provider(&self.config.signer, &self.config.taiko_geth_url) - .await?; + let provider = alloy_tools::construct_alloy_provider( + &self.config.signer, + std::slice::from_ref(&self.config.taiko_geth_url), + ) + .await?; self.transfer_eth_from_l2_to_l1_with_provider( provider, diff --git a/permissionless/src/l1/execution_layer.rs b/permissionless/src/l1/execution_layer.rs index cb1147af..f9eb5892 100644 --- a/permissionless/src/l1/execution_layer.rs +++ b/permissionless/src/l1/execution_layer.rs @@ -34,10 +34,7 @@ impl ELTrait for ExecutionLayer { ) -> Result { let provider = alloy_tools::construct_alloy_provider( &common_config.signer, - common_config - .execution_rpc_urls - .first() - .ok_or_else(|| anyhow!("L1 RPC URL is required"))?, + &common_config.execution_rpc_urls, ) .await?; let protocol_config = ProtocolConfig::default(); diff --git a/shasta/src/l1/execution_layer.rs b/shasta/src/l1/execution_layer.rs index 7b284ca2..7111598f 100644 --- a/shasta/src/l1/execution_layer.rs +++ b/shasta/src/l1/execution_layer.rs @@ -7,7 +7,7 @@ use alloy::{ primitives::{Address, U256, aliases::U48}, providers::DynProvider, }; -use anyhow::{Error, anyhow}; +use anyhow::Error; use common::shared::l2_block_v2::L2BlockV2; use common::{ l1::{ @@ -49,10 +49,7 @@ impl ELTrait for ExecutionLayer { ) -> Result { let provider = alloy_tools::construct_alloy_provider( &common_config.signer, - common_config - .execution_rpc_urls - .first() - .ok_or_else(|| anyhow!("L1 RPC URL is required"))?, + &common_config.execution_rpc_urls, ) .await?; let common = ExecutionLayerCommon::new(provider.clone()).await?; diff --git a/shasta/src/l2/execution_layer.rs b/shasta/src/l2/execution_layer.rs index 8704aa72..3fae1129 100644 --- a/shasta/src/l2/execution_layer.rs +++ b/shasta/src/l2/execution_layer.rs @@ -125,9 +125,11 @@ impl L2ExecutionLayer { self.chain_id, dest_chain_id ); - let provider = - alloy_tools::construct_alloy_provider(&self.config.signer, &self.config.taiko_geth_url) - .await?; + let provider = alloy_tools::construct_alloy_provider( + &self.config.signer, + std::slice::from_ref(&self.config.taiko_geth_url), + ) + .await?; self.transfer_eth_from_l2_to_l1_with_provider( provider,