From 57d4bdf9b58879aaa8c8be9c7b405401137b1f46 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 3 Jun 2025 15:11:49 +0000 Subject: [PATCH 1/2] core: Register protocols at runtime Signed-off-by: Alexandru Vasile --- src/lib.rs | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 32f61bf4..292a0916 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -136,6 +136,18 @@ pub enum Litep2pEvent { }, } +/// Configuration for adding protocols at runtime (protocols that are started after the Litep2p +/// instance is created). +struct RuntimeConfigs { + /// The keep-alive timeout for protocols. + /// + /// Needed for supporting protocols at runtime. + keep_alive_timeout: std::time::Duration, + + /// Executor used to run protocols. + executor: Arc, +} + /// [`Litep2p`] object. pub struct Litep2p { /// Local peer ID. @@ -149,6 +161,11 @@ pub struct Litep2p { /// Bandwidth sink. bandwidth_sink: BandwidthSink, + + /// The keep-alive timeout for protocols. + /// + /// Needed for supporting protocols at runtime. + runtime_configs: RuntimeConfigs, } impl Litep2p { @@ -415,6 +432,10 @@ impl Litep2p { bandwidth_sink, listen_addresses, transport_manager, + runtime_configs: RuntimeConfigs { + keep_alive_timeout: litep2p_config.keep_alive_timeout, + executor: litep2p_config.executor, + }, }) } @@ -491,6 +512,45 @@ impl Litep2p { self.transport_manager.add_known_address(peer, address) } + /// Register a request-response protocol at runtime. + pub fn register_request_response( + &mut self, + config: crate::protocol::request_response::Config, + ) -> crate::Result<()> { + let service = self.transport_manager.register_protocol( + config.protocol_name.clone(), + config.fallback_names.clone(), + config.codec, + self.runtime_configs.keep_alive_timeout, + ); + + self.runtime_configs.executor.run(Box::pin(async move { + RequestResponseProtocol::new(service, config).run().await + })); + + Ok(()) + } + + /// Register a notification protocol at runtime. + pub fn register_notification( + &mut self, + config: crate::protocol::notification::Config, + ) -> crate::Result<()> { + let service = self.transport_manager.register_protocol( + config.protocol_name.clone(), + config.fallback_names.clone(), + config.codec, + self.runtime_configs.keep_alive_timeout, + ); + + let executor = Arc::clone(&self.runtime_configs.executor); + self.runtime_configs.executor.run(Box::pin(async move { + NotificationProtocol::new(service, config, executor).run().await + })); + + Ok(()) + } + /// Poll next event. /// /// This function must be called in order for litep2p to make progress. From 89ab4876404a3a365dcb8fbf5d77aee644b4c9d9 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 3 Jun 2025 15:29:24 +0000 Subject: [PATCH 2/2] tests: Make stability test longruning Signed-off-by: Alexandru Vasile --- tests/connection/stability.rs | 70 ++++++++++++++++++++++++++--------- 1 file changed, 52 insertions(+), 18 deletions(-) diff --git a/tests/connection/stability.rs b/tests/connection/stability.rs index 095309a4..bfc44d3a 100644 --- a/tests/connection/stability.rs +++ b/tests/connection/stability.rs @@ -32,6 +32,7 @@ use litep2p::{ utils::futures_stream::FuturesStream, Litep2p, PeerId, }; +use std::sync::{atomic::AtomicUsize, Arc}; use futures::{future::BoxFuture, StreamExt}; @@ -59,12 +60,19 @@ const LOG_TARGET: &str = "litep2p::stability"; pub struct StabilityProtocol { /// The number of identical packets to send / receive on a substream. total_packets: usize, - inbound: FuturesStream>>, - outbound: FuturesStream>>, + inbound: FuturesStream>>, + outbound: FuturesStream>>, /// Peer Id for logging purposes. peer_id: PeerId, /// The sender to notify the test that the protocol finished. tx: Option>, + + debug_interval: tokio::time::Interval, + + inbound_num: usize, + outbound_num: usize, + inbound_debug: Vec>, + outbound_debug: Vec>, } impl StabilityProtocol { @@ -78,6 +86,11 @@ impl StabilityProtocol { outbound: FuturesStream::new(), peer_id, tx: Some(tx), + debug_interval: tokio::time::interval(std::time::Duration::from_secs(10)), + inbound_debug: (0..16).map(|_| Arc::new(AtomicUsize::new(0))).collect(), + outbound_debug: (0..16).map(|_| Arc::new(AtomicUsize::new(0))).collect(), + inbound_num: 0, + outbound_num: 0, }, rx, ) @@ -87,6 +100,10 @@ impl StabilityProtocol { let mut total_packets = self.total_packets; match direction { Direction::Inbound => { + let index = self.inbound_num; + self.inbound_num += 1; + let atomic = self.inbound_debug.get(index).expect("Index is valid; qed").clone(); + self.inbound.push(Box::pin(async move { while total_packets > 0 { let _payload = substream @@ -100,18 +117,19 @@ impl StabilityProtocol { tracing::warn!(target: LOG_TARGET, "Failed to read from substream {:?}", err); "Failed to read from substream".to_string() })?; + atomic.fetch_add(1, std::sync::atomic::Ordering::Relaxed); total_packets -= 1; } - Ok(()) + Ok(substream) })); } Direction::Outbound { .. } => { + let index = self.outbound_num; + self.outbound_num += 1; + let atomic = self.outbound_debug.get(index).expect("Index is valid; qed").clone(); self.outbound.push(Box::pin(async move { - let mut frame = vec![0; 128]; - for i in 0..frame.len() { - frame[i] = i as u8; - } + let mut frame = vec![0; 1 * 1024 * 1024]; while total_packets > 0 { substream.send_framed(frame.clone().into()).await.map_err(|err| { @@ -119,9 +137,11 @@ impl StabilityProtocol { "Failed to send to substream".to_string() })?; total_packets -= 1; + + atomic.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } - Ok(()) + Ok(substream) })); } } @@ -136,18 +156,19 @@ impl UserProtocol for StabilityProtocol { fn codec(&self) -> ProtocolCodec { // Similar to the identify payload size. - ProtocolCodec::UnsignedVarint(Some(4096)) + // 1 * 1024 * 1024 + ProtocolCodec::UnsignedVarint(Some(10 * 1024 * 1024)) } async fn run(mut self: Box, mut service: TransportService) -> litep2p::Result<()> { let num_substreams = 16; - let mut handled_substreams = 0; + let mut handled_substreams = Vec::new(); loop { - if handled_substreams == 2 * num_substreams { + if handled_substreams.len() == 2 * num_substreams { tracing::info!( target: LOG_TARGET, - handled_substreams, + len = handled_substreams.len(), peer_id = %self.peer_id, "StabilityProtocol finished to handle packets", ); @@ -160,6 +181,18 @@ impl UserProtocol for StabilityProtocol { } tokio::select! { + _ = self.debug_interval.tick() => { + let inbound_total: usize = self.inbound_debug.iter().map(|a| a.load(std::sync::atomic::Ordering::Relaxed)).sum(); + let outbound_total: usize = self.outbound_debug.iter().map(|a| a.load(std::sync::atomic::Ordering::Relaxed)).sum(); + + tracing::info!( + target: LOG_TARGET, + peer_id = %self.peer_id, + "StabilityProtocol debug stats: inbound_total = {}, outbound_total = {}", + inbound_total, outbound_total + ); + }, + event = service.next() => match event { Some(TransportEvent::ConnectionEstablished { peer, .. }) => { for i in 0..num_substreams { @@ -201,8 +234,8 @@ impl UserProtocol for StabilityProtocol { inbound = self.inbound.next(), if !self.inbound.is_empty() => { match inbound { - Some(Ok(())) => { - handled_substreams += 1; + Some(Ok(substream)) => { + handled_substreams.push(substream); } Some(Err(err)) => { tracing::error!( @@ -227,8 +260,9 @@ impl UserProtocol for StabilityProtocol { outbound = self.outbound.next(), if !self.outbound.is_empty() => { match outbound { - Some(Ok(())) => { - handled_substreams += 1; + Some(Ok(substream + )) => { + handled_substreams.push(substream); } Some(Err(err)) => { tracing::error!( @@ -263,7 +297,7 @@ async fn stability_litep2p_transport(transport1: Transport, transport2: Transpor let (ping_config1, _ping_event_stream1) = PingConfig::default(); let keypair = Keypair::generate(); let peer_id = keypair.public().to_peer_id(); - let (stability_protocol, mut exit1) = StabilityProtocol::new(1000, peer_id); + let (stability_protocol, mut exit1) = StabilityProtocol::new(10000000, peer_id); let config1 = ConfigBuilder::new() .with_keypair(keypair) .with_libp2p_ping(ping_config1) @@ -274,7 +308,7 @@ async fn stability_litep2p_transport(transport1: Transport, transport2: Transpor let (ping_config2, _ping_event_stream2) = PingConfig::default(); let keypair = Keypair::generate(); let peer_id = keypair.public().to_peer_id(); - let (stability_protocol, mut exit2) = StabilityProtocol::new(1000, peer_id); + let (stability_protocol, mut exit2) = StabilityProtocol::new(10000000, peer_id); let config2 = ConfigBuilder::new() .with_keypair(keypair) .with_libp2p_ping(ping_config2)