From 2245703171f16696bcd4f3ba168c056c33799a28 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 13:18:09 +0200 Subject: [PATCH 01/29] cargo: Add metrics feature flag with prometheus Signed-off-by: Alexandru Vasile --- Cargo.lock | 1 + Cargo.toml | 2 ++ 2 files changed, 3 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 3ccff070..79077150 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2889,6 +2889,7 @@ dependencies = [ "nohash-hasher", "parking_lot 0.12.3", "pin-project", + "prometheus", "prost 0.12.6", "prost-build 0.13.1", "quickcheck", diff --git a/Cargo.toml b/Cargo.toml index a066b3b1..8870b89f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ yasna = "0.5.0" zeroize = "1.8.1" nohash-hasher = "0.2.0" static_assertions = "1.1.0" +prometheus = { version = "0.13.0", default-features = false, optional = true } # Exposed dependencies. Breaking changes to these are breaking changes to us. [dependencies.rustls] @@ -90,6 +91,7 @@ custom_sc_network = [] quic = ["dep:webpki", "dep:quinn"] webrtc = ["dep:str0m"] websocket = ["dep:tokio-tungstenite"] +metrics = ["dep:prometheus"] [profile.release] debug = true From c6154da00d5d8b47739948e5ae77c1a7cb50182b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 13:18:23 +0200 Subject: [PATCH 02/29] litep2p/config: Expose prometheus metrics in config Signed-off-by: Alexandru Vasile --- src/config.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/config.rs b/src/config.rs index c2956021..35d96f56 100644 --- a/src/config.rs +++ b/src/config.rs @@ -83,6 +83,10 @@ pub struct ConfigBuilder { #[cfg(feature = "websocket")] websocket: Option, + /// Prometheus metrics registry. + #[cfg(feature = "metrics")] + metrics_registry: Option, + /// Keypair. keypair: Option, @@ -143,6 +147,8 @@ impl ConfigBuilder { webrtc: None, #[cfg(feature = "websocket")] websocket: None, + #[cfg(feature = "metrics")] + metrics_registry: None, keypair: None, ping: None, identify: None, @@ -187,6 +193,13 @@ impl ConfigBuilder { self } + /// Add metrics registry. + #[cfg(feature = "metrics")] + pub fn with_metrics_registry(mut self, registry: prometheus::Registry) -> Self { + self.metrics_registry = Some(registry); + self + } + /// Add keypair. /// /// If no keypair is specified, litep2p creates a new keypair. @@ -295,6 +308,8 @@ impl ConfigBuilder { webrtc: self.webrtc.take(), #[cfg(feature = "websocket")] websocket: self.websocket.take(), + #[cfg(feature = "metrics")] + metrics_registry: self.metrics_registry.take(), ping: self.ping.take(), identify: self.identify.take(), kademlia: self.kademlia.take(), @@ -328,6 +343,10 @@ pub struct Litep2pConfig { #[cfg(feature = "websocket")] pub(crate) websocket: Option, + /// Prometheus metrics registry. + #[cfg(feature = "metrics")] + pub(crate) metrics_registry: Option, + /// Keypair. pub(crate) keypair: Keypair, From f3948c119ccfdde3f915a7c89d4730e723fe956a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 14:57:52 +0200 Subject: [PATCH 03/29] cargo: Make litep2p agnostic of metrics Signed-off-by: Alexandru Vasile --- Cargo.lock | 1 - Cargo.toml | 2 - src/metrics.rs | 124 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 124 insertions(+), 3 deletions(-) create mode 100644 src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 79077150..3ccff070 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2889,7 +2889,6 @@ dependencies = [ "nohash-hasher", "parking_lot 0.12.3", "pin-project", - "prometheus", "prost 0.12.6", "prost-build 0.13.1", "quickcheck", diff --git a/Cargo.toml b/Cargo.toml index 8870b89f..a066b3b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,6 @@ yasna = "0.5.0" zeroize = "1.8.1" nohash-hasher = "0.2.0" static_assertions = "1.1.0" -prometheus = { version = "0.13.0", default-features = false, optional = true } # Exposed dependencies. Breaking changes to these are breaking changes to us. [dependencies.rustls] @@ -91,7 +90,6 @@ custom_sc_network = [] quic = ["dep:webpki", "dep:quinn"] webrtc = ["dep:str0m"] websocket = ["dep:tokio-tungstenite"] -metrics = ["dep:prometheus"] [profile.release] debug = true diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 00000000..d3fa115b --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,124 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// Copyright 2024 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! A generic module for handling the metrics exposed by litep2p. +//! +//! Contains the traits and types that are used to define and interact with metrics. + +use crate::Error; +use std::sync::Arc; + +pub type MetricCounter = Arc; + +pub type MetricGauge = Arc; + +pub type MetricsRegistry = Arc; + +/// Represents a metric that can only go up. +pub trait MetricCounterT: Send + Sync { + /// Increment the counter by `value`. + fn inc(&self, value: u64); +} + +/// Represents a metric that can arbitrarily go up and down. +pub trait MetricGaugeT: Send + Sync { + /// Set the gauge to `value`. + fn set(&self, value: u64); +} + +/// A registry for metrics. +pub trait MetricsRegistryT: Send + Sync { + /// Register a new counter. + fn register_counter( + &self, + name: &'static str, + help: &'static str, + ) -> Result; + + /// Register a new gauge. + fn register_gauge(&self, name: &'static str, help: &'static str) -> Result; +} + +#[cfg(feature = "metrics")] +pub mod metrics { + use super::*; + use prometheus::{ + core::{AtomicU64 as U64, GenericCounter, GenericGauge}, + Registry, + }; + + impl From for Error { + fn from(err: prometheus::Error) -> Self { + Error::MetricError(err.to_string()) + } + } + + /// A registry for metrics that uses the Prometheus metrics library. + pub struct PrometheusMetricsRegistry { + registry: Registry, + } + + impl PrometheusMetricsRegistry { + /// Create a new [`PrometheusMetricsRegistry`]. + pub fn from_registry(registry: Registry) -> Self { + Self { registry } + } + + /// Get the Prometheus registry. + pub fn registry(&self) -> &Registry { + &self.registry + } + } + + impl MetricsRegistryT for PrometheusMetricsRegistry { + fn register_counter( + &self, + name: &'static str, + help: &'static str, + ) -> Result { + let counter = GenericCounter::::new(name, help)?; + self.registry.register(Box::new(counter.clone()))?; + Ok(Arc::new(counter)) + } + + fn register_gauge( + &self, + name: &'static str, + help: &'static str, + ) -> Result { + let gauge = GenericGauge::::new(name, help)?; + self.registry.register(Box::new(gauge.clone()))?; + Ok(Arc::new(gauge)) + } + } + + impl MetricCounterT for GenericCounter { + fn inc(&self, value: u64) { + self.inc_by(value); + } + } + + impl MetricGaugeT for GenericGauge { + fn set(&self, value: u64) { + self.set(value); + } + } +} From 31144bfbe371838f1d424401d9725a3374e364b7 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 14:58:06 +0200 Subject: [PATCH 04/29] metrics: Add abstraction layers over metrics Signed-off-by: Alexandru Vasile --- src/metrics.rs | 66 -------------------------------------------------- 1 file changed, 66 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index d3fa115b..af16690a 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -56,69 +56,3 @@ pub trait MetricsRegistryT: Send + Sync { /// Register a new gauge. fn register_gauge(&self, name: &'static str, help: &'static str) -> Result; } - -#[cfg(feature = "metrics")] -pub mod metrics { - use super::*; - use prometheus::{ - core::{AtomicU64 as U64, GenericCounter, GenericGauge}, - Registry, - }; - - impl From for Error { - fn from(err: prometheus::Error) -> Self { - Error::MetricError(err.to_string()) - } - } - - /// A registry for metrics that uses the Prometheus metrics library. - pub struct PrometheusMetricsRegistry { - registry: Registry, - } - - impl PrometheusMetricsRegistry { - /// Create a new [`PrometheusMetricsRegistry`]. - pub fn from_registry(registry: Registry) -> Self { - Self { registry } - } - - /// Get the Prometheus registry. - pub fn registry(&self) -> &Registry { - &self.registry - } - } - - impl MetricsRegistryT for PrometheusMetricsRegistry { - fn register_counter( - &self, - name: &'static str, - help: &'static str, - ) -> Result { - let counter = GenericCounter::::new(name, help)?; - self.registry.register(Box::new(counter.clone()))?; - Ok(Arc::new(counter)) - } - - fn register_gauge( - &self, - name: &'static str, - help: &'static str, - ) -> Result { - let gauge = GenericGauge::::new(name, help)?; - self.registry.register(Box::new(gauge.clone()))?; - Ok(Arc::new(gauge)) - } - } - - impl MetricCounterT for GenericCounter { - fn inc(&self, value: u64) { - self.inc_by(value); - } - } - - impl MetricGaugeT for GenericGauge { - fn set(&self, value: u64) { - self.set(value); - } - } -} From 77332aaa388aa51030d807ecb362746543e58af7 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 14:58:35 +0200 Subject: [PATCH 05/29] Expose metrics to litep2p transport layers Signed-off-by: Alexandru Vasile --- src/config.rs | 14 +++++-------- src/error.rs | 2 ++ src/lib.rs | 27 +++++++++++++++++++------ src/transport/mod.rs | 11 ++++++++-- src/transport/quic/mod.rs | 1 + src/transport/tcp/mod.rs | 37 ++++++++++++++++++++++++++++++++++ src/transport/webrtc/mod.rs | 6 +++++- src/transport/websocket/mod.rs | 1 + 8 files changed, 81 insertions(+), 18 deletions(-) diff --git a/src/config.rs b/src/config.rs index 35d96f56..e1872752 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,6 +23,7 @@ use crate::{ crypto::ed25519::Keypair, executor::{DefaultExecutor, Executor}, + metrics::MetricsRegistry, protocol::{ libp2p::{bitswap, identify, kademlia, ping}, mdns::Config as MdnsConfig, @@ -83,9 +84,8 @@ pub struct ConfigBuilder { #[cfg(feature = "websocket")] websocket: Option, - /// Prometheus metrics registry. - #[cfg(feature = "metrics")] - metrics_registry: Option, + /// Metrics registry + metrics_registry: Option, /// Keypair. keypair: Option, @@ -147,7 +147,6 @@ impl ConfigBuilder { webrtc: None, #[cfg(feature = "websocket")] websocket: None, - #[cfg(feature = "metrics")] metrics_registry: None, keypair: None, ping: None, @@ -194,8 +193,7 @@ impl ConfigBuilder { } /// Add metrics registry. - #[cfg(feature = "metrics")] - pub fn with_metrics_registry(mut self, registry: prometheus::Registry) -> Self { + pub fn with_metrics_registry(mut self, registry: MetricsRegistry) -> Self { self.metrics_registry = Some(registry); self } @@ -308,7 +306,6 @@ impl ConfigBuilder { webrtc: self.webrtc.take(), #[cfg(feature = "websocket")] websocket: self.websocket.take(), - #[cfg(feature = "metrics")] metrics_registry: self.metrics_registry.take(), ping: self.ping.take(), identify: self.identify.take(), @@ -344,8 +341,7 @@ pub struct Litep2pConfig { pub(crate) websocket: Option, /// Prometheus metrics registry. - #[cfg(feature = "metrics")] - pub(crate) metrics_registry: Option, + pub(crate) metrics_registry: Option, /// Keypair. pub(crate) keypair: Keypair, diff --git a/src/error.rs b/src/error.rs index 604d00e9..c96868b3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -127,6 +127,8 @@ pub enum Error { ConnectionLimit(ConnectionLimitsError), #[error("Failed to dial peer immediately")] ImmediateDialError(#[from] ImmediateDialError), + #[error("Invalid metric: `{0}`")] + MetricError(String), } /// Error type for address parsing. diff --git a/src/lib.rs b/src/lib.rs index 66e03289..1779ece6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,6 +72,7 @@ pub mod config; pub mod crypto; pub mod error; pub mod executor; +pub mod metrics; pub mod protocol; pub mod substream; pub mod transport; @@ -313,8 +314,11 @@ impl Litep2p { // enable tcp transport if the config exists if let Some(config) = litep2p_config.tcp.take() { let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor)); - let (transport, transport_listen_addresses) = - ::new(handle, config)?; + let (transport, transport_listen_addresses) = ::new( + handle, + config, + litep2p_config.metrics_registry.clone(), + )?; for address in transport_listen_addresses { transport_manager.register_listen_address(address.clone()); @@ -330,8 +334,11 @@ impl Litep2p { #[cfg(feature = "quic")] if let Some(config) = litep2p_config.quic.take() { let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor)); - let (transport, transport_listen_addresses) = - ::new(handle, config)?; + let (transport, transport_listen_addresses) = ::new( + handle, + config, + litep2p_config.metrics_registry.clone(), + )?; for address in transport_listen_addresses { transport_manager.register_listen_address(address.clone()); @@ -348,7 +355,11 @@ impl Litep2p { if let Some(config) = litep2p_config.webrtc.take() { let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor)); let (transport, transport_listen_addresses) = - ::new(handle, config)?; + ::new( + handle, + config, + litep2p_config.metrics_registry.clone(), + )?; for address in transport_listen_addresses { transport_manager.register_listen_address(address.clone()); @@ -365,7 +376,11 @@ impl Litep2p { if let Some(config) = litep2p_config.websocket.take() { let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor)); let (transport, transport_listen_addresses) = - ::new(handle, config)?; + ::new( + handle, + config, + litep2p_config.metrics_registry.clone(), + )?; for address in transport_listen_addresses { transport_manager.register_listen_address(address.clone()); diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 1d61ca9d..7a0f1eea 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -20,7 +20,10 @@ //! Transport protocol implementations provided by [`Litep2p`](`crate::Litep2p`). -use crate::{error::DialError, transport::manager::TransportHandle, types::ConnectionId, PeerId}; +use crate::{ + error::DialError, metrics::MetricsRegistry, transport::manager::TransportHandle, + types::ConnectionId, PeerId, +}; use futures::Stream; use multiaddr::Multiaddr; @@ -177,7 +180,11 @@ pub(crate) trait TransportBuilder { type Transport: Transport; /// Create new [`Transport`] object. - fn new(context: TransportHandle, config: Self::Config) -> crate::Result<(Self, Vec)> + fn new( + context: TransportHandle, + config: Self::Config, + registry: Option, + ) -> crate::Result<(Self, Vec)> where Self: Sized; } diff --git a/src/transport/quic/mod.rs b/src/transport/quic/mod.rs index 0cf5e255..482e6744 100644 --- a/src/transport/quic/mod.rs +++ b/src/transport/quic/mod.rs @@ -215,6 +215,7 @@ impl TransportBuilder for QuicTransport { fn new( context: TransportHandle, mut config: Self::Config, + _registry: Option, ) -> crate::Result<(Self, Vec)> where Self: Sized, diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 748e138d..d8e61bdb 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -24,6 +24,7 @@ use crate::{ config::Role, error::{DialError, Error}, + metrics::{MetricGauge, MetricsRegistry}, transport::{ common::listener::{DialAddresses, GetSocketAddr, SocketListener, TcpAddress}, manager::TransportHandle, @@ -129,6 +130,13 @@ pub(crate) struct TcpTransport { /// Connections which have been opened and negotiated but are being validated by the /// `TransportManager`. pending_open: HashMap, + + /// Tcp metrics. + metrics: Option, +} + +struct TcpMetrics { + pending_dials_num: MetricGauge, } impl TcpTransport { @@ -271,6 +279,7 @@ impl TransportBuilder for TcpTransport { fn new( context: TransportHandle, mut config: Self::Config, + registry: Option, ) -> crate::Result<(Self, Vec)> { tracing::debug!( target: LOG_TARGET, @@ -285,6 +294,17 @@ impl TransportBuilder for TcpTransport { config.nodelay, ); + let metrics = if let Some(registry) = registry { + Some(TcpMetrics { + pending_dials_num: registry.register_gauge( + "litep2p_tcp_pending_dials", + "Litep2p number of pending dials", + )?, + }) + } else { + None + }; + Ok(( Self { listener, @@ -298,6 +318,7 @@ impl TransportBuilder for TcpTransport { pending_connections: FuturesStream::new(), pending_raw_connections: FuturesStream::new(), cancel_futures: HashMap::new(), + metrics, }, listen_addresses, )) @@ -319,6 +340,10 @@ impl Transport for TcpTransport { let nodelay = self.config.nodelay; self.pending_dials.insert(connection_id, address.clone()); + if let Some(metrics) = &self.metrics { + metrics.pending_dials_num.set(self.pending_dials.len() as u64); + } + self.pending_connections.push(Box::pin(async move { let (_, stream) = TcpTransport::dial_peer(address, dial_addresses, connection_open_timeout, nodelay) @@ -500,6 +525,10 @@ impl Transport for TcpTransport { ); self.pending_dials.insert(connection_id, address); + if let Some(metrics) = &self.metrics { + metrics.pending_dials_num.set(self.pending_dials.len() as u64); + } + self.pending_connections.push(Box::pin(async move { match tokio::time::timeout(connection_open_timeout, async move { TcpConnection::negotiate_connection( @@ -652,6 +681,10 @@ impl Stream for TcpTransport { let peer = connection.peer(); let endpoint = connection.endpoint(); self.pending_dials.remove(&connection.connection_id()); + if let Some(metrics) = &self.metrics { + metrics.pending_dials_num.set(self.pending_dials.len() as u64); + } + self.pending_open.insert(connection.connection_id(), connection); return Poll::Ready(Some(TransportEvent::ConnectionEstablished { @@ -661,6 +694,10 @@ impl Stream for TcpTransport { } Err((connection_id, error)) => { if let Some(address) = self.pending_dials.remove(&connection_id) { + if let Some(metrics) = &self.metrics { + metrics.pending_dials_num.set(self.pending_dials.len() as u64); + } + return Poll::Ready(Some(TransportEvent::DialFailure { connection_id, address, diff --git a/src/transport/webrtc/mod.rs b/src/transport/webrtc/mod.rs index 7dce743d..b401291e 100644 --- a/src/transport/webrtc/mod.rs +++ b/src/transport/webrtc/mod.rs @@ -423,7 +423,11 @@ impl TransportBuilder for WebRtcTransport { type Transport = WebRtcTransport; /// Create new [`Transport`] object. - fn new(context: TransportHandle, config: Self::Config) -> crate::Result<(Self, Vec)> + fn new( + context: TransportHandle, + config: Self::Config, + _registry: Option, + ) -> crate::Result<(Self, Vec)> where Self: Sized, { diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 2435f639..bd8b656c 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -300,6 +300,7 @@ impl TransportBuilder for WebSocketTransport { fn new( context: TransportHandle, mut config: Self::Config, + _registry: Option, ) -> crate::Result<(Self, Vec)> where Self: Sized, From 3ec010c22758eaa62d568fa2e4fb00e0f6b39d20 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 15:27:26 +0200 Subject: [PATCH 06/29] tcp: Extend collected metrics at intervals for code simplicity Signed-off-by: Alexandru Vasile --- src/transport/tcp/mod.rs | 69 ++++++++++++++++++++++++++++++------- src/utils/futures_stream.rs | 1 - 2 files changed, 57 insertions(+), 13 deletions(-) diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index d8e61bdb..138f216f 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -133,10 +133,19 @@ pub(crate) struct TcpTransport { /// Tcp metrics. metrics: Option, + + /// Interval for collecting metrics. + interval: tokio::time::Interval, } struct TcpMetrics { pending_dials_num: MetricGauge, + pending_inbound_connections_num: MetricGauge, + pending_connections_num: MetricGauge, + pending_raw_connections_num: MetricGauge, + open_raw_connections_num: MetricGauge, + cancel_futures_num: MetricGauge, + pending_open_num: MetricGauge, } impl TcpTransport { @@ -300,6 +309,30 @@ impl TransportBuilder for TcpTransport { "litep2p_tcp_pending_dials", "Litep2p number of pending dials", )?, + pending_inbound_connections_num: registry.register_gauge( + "litep2p_tcp_pending_inbound_connections", + "Litep2p number of pending inbound connections", + )?, + pending_connections_num: registry.register_gauge( + "litep2p_tcp_pending_connections", + "Litep2p number of pending connections", + )?, + pending_raw_connections_num: registry.register_gauge( + "litep2p_tcp_pending_raw_connections", + "Litep2p number of pending raw connections", + )?, + open_raw_connections_num: registry.register_gauge( + "litep2p_tcp_open_raw_connections", + "Litep2p number of open raw connections", + )?, + cancel_futures_num: registry.register_gauge( + "litep2p_tcp_cancel_futures", + "Litep2p number of cancel futures", + )?, + pending_open_num: registry.register_gauge( + "litep2p_tcp_pending_open", + "Litep2p number of pending open connections", + )?, }) } else { None @@ -319,6 +352,7 @@ impl TransportBuilder for TcpTransport { pending_raw_connections: FuturesStream::new(), cancel_futures: HashMap::new(), metrics, + interval: tokio::time::interval(Duration::from_secs(15)), }, listen_addresses, )) @@ -340,9 +374,6 @@ impl Transport for TcpTransport { let nodelay = self.config.nodelay; self.pending_dials.insert(connection_id, address.clone()); - if let Some(metrics) = &self.metrics { - metrics.pending_dials_num.set(self.pending_dials.len() as u64); - } self.pending_connections.push(Box::pin(async move { let (_, stream) = @@ -525,9 +556,6 @@ impl Transport for TcpTransport { ); self.pending_dials.insert(connection_id, address); - if let Some(metrics) = &self.metrics { - metrics.pending_dials_num.set(self.pending_dials.len() as u64); - } self.pending_connections.push(Box::pin(async move { match tokio::time::timeout(connection_open_timeout, async move { @@ -570,6 +598,22 @@ impl Stream for TcpTransport { type Item = TransportEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready(_) = self.interval.poll_tick(cx) { + if let Some(metrics) = &self.metrics { + metrics.pending_dials_num.set(self.pending_dials.len() as u64); + metrics + .pending_inbound_connections_num + .set(self.pending_inbound_connections.len() as u64); + metrics.pending_connections_num.set(self.pending_connections.len() as u64); + metrics + .pending_raw_connections_num + .set(self.pending_raw_connections.len() as u64); + metrics.open_raw_connections_num.set(self.opened_raw.len() as u64); + metrics.cancel_futures_num.set(self.cancel_futures.len() as u64); + metrics.pending_open_num.set(self.pending_open.len() as u64); + } + } + if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) { return match event { None => { @@ -766,7 +810,7 @@ mod tests { }; let (mut transport1, listen_addresses) = - TcpTransport::new(handle1, transport_config1).unwrap(); + TcpTransport::new(handle1, transport_config1, None).unwrap(); let listen_address = listen_addresses[0].clone(); let keypair2 = Keypair::generate(); @@ -795,7 +839,7 @@ mod tests { ..Default::default() }; - let (mut transport2, _) = TcpTransport::new(handle2, transport_config2).unwrap(); + let (mut transport2, _) = TcpTransport::new(handle2, transport_config2, None).unwrap(); transport2.dial(ConnectionId::new(), listen_address).unwrap(); let (tx, mut from_transport2) = channel(64); @@ -859,7 +903,7 @@ mod tests { }; let (mut transport1, listen_addresses) = - TcpTransport::new(handle1, transport_config1).unwrap(); + TcpTransport::new(handle1, transport_config1, None).unwrap(); let listen_address = listen_addresses[0].clone(); let keypair2 = Keypair::generate(); @@ -888,7 +932,7 @@ mod tests { ..Default::default() }; - let (mut transport2, _) = TcpTransport::new(handle2, transport_config2).unwrap(); + let (mut transport2, _) = TcpTransport::new(handle2, transport_config2, None).unwrap(); transport2.dial(ConnectionId::new(), listen_address).unwrap(); let (tx, mut from_transport2) = channel(64); @@ -941,7 +985,7 @@ mod tests { }, )]), }; - let (mut transport1, _) = TcpTransport::new(handle1, Default::default()).unwrap(); + let (mut transport1, _) = TcpTransport::new(handle1, Default::default(), None).unwrap(); tokio::spawn(async move { while let Some(event) = transport1.next().await { @@ -978,7 +1022,7 @@ mod tests { )]), }; - let (mut transport2, _) = TcpTransport::new(handle2, Default::default()).unwrap(); + let (mut transport2, _) = TcpTransport::new(handle2, Default::default(), None).unwrap(); let peer1: PeerId = PeerId::from_public_key(&keypair1.public().into()); let peer2: PeerId = PeerId::from_public_key(&keypair2.public().into()); @@ -1029,6 +1073,7 @@ mod tests { listen_addresses: vec!["/ip4/127.0.0.1/tcp/0".parse().unwrap()], ..Default::default() }, + None, ) .unwrap(); diff --git a/src/utils/futures_stream.rs b/src/utils/futures_stream.rs index 6393296d..c661ac69 100644 --- a/src/utils/futures_stream.rs +++ b/src/utils/futures_stream.rs @@ -45,7 +45,6 @@ impl FuturesStream { } /// Number of futures in the stream. - #[cfg(test)] pub fn len(&self) -> usize { self.futures.len() } From 593608361d97d02db8dc1dba17997fe0ce770295 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 16:03:48 +0200 Subject: [PATCH 07/29] tcp: Selectively poll interval if the metrics are enabled Signed-off-by: Alexandru Vasile --- src/transport/tcp/mod.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 138f216f..5b0a6f81 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -133,12 +133,17 @@ pub(crate) struct TcpTransport { /// Tcp metrics. metrics: Option, +} +struct TcpMetrics { /// Interval for collecting metrics. + /// + /// This is a tradeoff we make in favor of simplicity and correctness. + /// An alternative to this would be to complicate the code by collecting + /// individual metrics in each method. This is error prone, as names are + /// easily mismatched, and it's hard to keep track of all the metrics. interval: tokio::time::Interval, -} -struct TcpMetrics { pending_dials_num: MetricGauge, pending_inbound_connections_num: MetricGauge, pending_connections_num: MetricGauge, @@ -305,6 +310,8 @@ impl TransportBuilder for TcpTransport { let metrics = if let Some(registry) = registry { Some(TcpMetrics { + interval: tokio::time::interval(Duration::from_secs(15)), + pending_dials_num: registry.register_gauge( "litep2p_tcp_pending_dials", "Litep2p number of pending dials", @@ -352,7 +359,6 @@ impl TransportBuilder for TcpTransport { pending_raw_connections: FuturesStream::new(), cancel_futures: HashMap::new(), metrics, - interval: tokio::time::interval(Duration::from_secs(15)), }, listen_addresses, )) @@ -598,8 +604,9 @@ impl Stream for TcpTransport { type Item = TransportEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let Poll::Ready(_) = self.interval.poll_tick(cx) { - if let Some(metrics) = &self.metrics { + // Take the metrics to only poll the tick in case they are enabled. + if let Some(mut metrics) = self.metrics.take() { + if let Poll::Ready(_) = metrics.interval.poll_tick(cx) { metrics.pending_dials_num.set(self.pending_dials.len() as u64); metrics .pending_inbound_connections_num @@ -612,6 +619,8 @@ impl Stream for TcpTransport { metrics.cancel_futures_num.set(self.cancel_futures.len() as u64); metrics.pending_open_num.set(self.pending_open.len() as u64); } + + self.metrics = Some(metrics); } if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) { From c68ef20f80b201cf10f5249bd58523828029e216 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 16:09:43 +0200 Subject: [PATCH 08/29] metric: Extend Gauge methods with inc / dec Signed-off-by: Alexandru Vasile --- src/metrics.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/metrics.rs b/src/metrics.rs index af16690a..5cff3e42 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -42,6 +42,12 @@ pub trait MetricCounterT: Send + Sync { pub trait MetricGaugeT: Send + Sync { /// Set the gauge to `value`. fn set(&self, value: u64); + + /// Increment the gauge by `value`. + fn inc(&self, value: u64); + + /// Decrement the gauge by `value`. + fn dec(&self, value: u64); } /// A registry for metrics. From 907312f3be0da66a2224b25309f174359b17a250 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 16:27:08 +0200 Subject: [PATCH 09/29] metrics: Add scope RAII metric counter Signed-off-by: Alexandru Vasile --- src/metrics.rs | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index 5cff3e42..1ca9a901 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -43,11 +43,11 @@ pub trait MetricGaugeT: Send + Sync { /// Set the gauge to `value`. fn set(&self, value: u64); - /// Increment the gauge by `value`. - fn inc(&self, value: u64); + /// Increment the gauge. + fn inc(&self); - /// Decrement the gauge by `value`. - fn dec(&self, value: u64); + /// Decrement the gauge. + fn dec(&self); } /// A registry for metrics. @@ -62,3 +62,25 @@ pub trait MetricsRegistryT: Send + Sync { /// Register a new gauge. fn register_gauge(&self, name: &'static str, help: &'static str) -> Result; } + +/// A scope for metrics that modifies a provided gauge in an RAII fashion. +/// +/// The gauge is incremented when constructed and decremented when the object is dropped. +#[derive(Clone)] +pub struct ScopeGaugeMetric { + inner: MetricGauge, +} + +impl ScopeGaugeMetric { + /// Create a new [`ScopeGaugeMetric`]. + pub fn new(inner: MetricGauge) -> Self { + inner.inc(); + ScopeGaugeMetric { inner } + } +} + +impl Drop for ScopeGaugeMetric { + fn drop(&mut self) { + self.inner.dec(); + } +} From 436291957a9351113c9dbd6bcd3ed4de19df2359 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 16:28:04 +0200 Subject: [PATCH 10/29] metrics: Make register methods less restrictive Ideally this should be Into, but that way the we cannot be object safe Signed-off-by: Alexandru Vasile --- src/metrics.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index 1ca9a901..e144f711 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -53,14 +53,10 @@ pub trait MetricGaugeT: Send + Sync { /// A registry for metrics. pub trait MetricsRegistryT: Send + Sync { /// Register a new counter. - fn register_counter( - &self, - name: &'static str, - help: &'static str, - ) -> Result; + fn register_counter(&self, name: String, help: String) -> Result; /// Register a new gauge. - fn register_gauge(&self, name: &'static str, help: &'static str) -> Result; + fn register_gauge(&self, name: String, help: String) -> Result; } /// A scope for metrics that modifies a provided gauge in an RAII fashion. From 5d9174158598df85aae2af8cc34e1cd436a50e62 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 16:32:05 +0200 Subject: [PATCH 11/29] tcp: Extend with connection metrics Signed-off-by: Alexandru Vasile --- src/transport/tcp/connection.rs | 27 ++++++++++++- src/transport/tcp/mod.rs | 69 ++++++++++++++++++++++++--------- 2 files changed, 76 insertions(+), 20 deletions(-) diff --git a/src/transport/tcp/connection.rs b/src/transport/tcp/connection.rs index d0d21fde..f184bb75 100644 --- a/src/transport/tcp/connection.rs +++ b/src/transport/tcp/connection.rs @@ -25,6 +25,7 @@ use crate::{ noise::{self, NoiseSocket}, }, error::{Error, NegotiationError, SubstreamError}, + metrics::{MetricGauge, ScopeGaugeMetric}, multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}, protocol::{Direction, Permit, ProtocolCommand, ProtocolSet}, substream, @@ -140,6 +141,15 @@ impl NegotiatedConnection { } } +pub struct TcpConnectionMetrics { + /// Metric for the number of pending substreams that are negotiated. + pub pending_substreams_num: MetricGauge, + + /// Metric incremented when the connection starts + /// and decremented when the connection closes. + pub _active_connections_num: ScopeGaugeMetric, +} + /// TCP connection. pub struct TcpConnection { /// Protocol context. @@ -169,6 +179,9 @@ pub struct TcpConnection { /// Pending substreams. pending_substreams: FuturesUnordered>>, + + /// Metrics. + metrics: Option, } impl fmt::Debug for TcpConnection { @@ -187,6 +200,7 @@ impl TcpConnection { protocol_set: ProtocolSet, bandwidth_sink: BandwidthSink, next_substream_id: Arc, + metrics: Option, ) -> Self { let NegotiatedConnection { connection, @@ -206,6 +220,7 @@ impl TcpConnection { next_substream_id, pending_substreams: FuturesUnordered::new(), substream_open_timeout, + metrics, } } @@ -535,6 +550,9 @@ impl TcpConnection { }), } })); + if let Some(metrics) = &self.metrics { + metrics.pending_substreams_num.inc(); + } Ok(false) } @@ -694,6 +712,9 @@ impl TcpConnection { }), } })); + if let Some(metrics) = &self.metrics { + metrics.pending_substreams_num.inc(); + } Ok(false) } @@ -734,7 +755,11 @@ impl TcpConnection { } }, substream = self.pending_substreams.select_next_some(), if !self.pending_substreams.is_empty() => { - self.handle_negotiated_substream(substream).await; + if let Some(metrics) = &self.metrics { + metrics.pending_substreams_num.set(self.pending_substreams.len() as u64); + } + + self.handle_negotiated_substream(substream).await; } protocol = self.protocol_set.next() => { if self.handle_protocol_command(protocol).await? { diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 5b0a6f81..c0dc4e7f 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -24,7 +24,7 @@ use crate::{ config::Role, error::{DialError, Error}, - metrics::{MetricGauge, MetricsRegistry}, + metrics::{MetricGauge, MetricsRegistry, ScopeGaugeMetric}, transport::{ common::listener::{DialAddresses, GetSocketAddr, SocketListener, TcpAddress}, manager::TransportHandle, @@ -38,6 +38,7 @@ use crate::{ utils::futures_stream::FuturesStream, }; +use connection::TcpConnectionMetrics; use futures::{ future::BoxFuture, stream::{AbortHandle, FuturesUnordered, Stream, StreamExt}, @@ -144,6 +145,7 @@ struct TcpMetrics { /// easily mismatched, and it's hard to keep track of all the metrics. interval: tokio::time::Interval, + /// The following metrics are used for the transport itself. pending_dials_num: MetricGauge, pending_inbound_connections_num: MetricGauge, pending_connections_num: MetricGauge, @@ -151,6 +153,19 @@ struct TcpMetrics { open_raw_connections_num: MetricGauge, cancel_futures_num: MetricGauge, pending_open_num: MetricGauge, + + /// The following metrics are shared with all TCP connections. + active_connections_num: MetricGauge, + pending_substreams_num: MetricGauge, +} + +impl TcpMetrics { + fn to_connection_metrics(&self) -> TcpConnectionMetrics { + TcpConnectionMetrics { + _active_connections_num: ScopeGaugeMetric::new(self.active_connections_num.clone()), + pending_substreams_num: self.pending_substreams_num.clone(), + } + } } impl TcpTransport { @@ -313,32 +328,42 @@ impl TransportBuilder for TcpTransport { interval: tokio::time::interval(Duration::from_secs(15)), pending_dials_num: registry.register_gauge( - "litep2p_tcp_pending_dials", - "Litep2p number of pending dials", + "litep2p_tcp_pending_dials".into(), + "Litep2p number of pending dials".into(), )?, pending_inbound_connections_num: registry.register_gauge( - "litep2p_tcp_pending_inbound_connections", - "Litep2p number of pending inbound connections", + "litep2p_tcp_pending_inbound_connections".into(), + "Litep2p number of pending inbound connections".into(), )?, pending_connections_num: registry.register_gauge( - "litep2p_tcp_pending_connections", - "Litep2p number of pending connections", + "litep2p_tcp_pending_connections".into(), + "Litep2p number of pending connections".into(), )?, pending_raw_connections_num: registry.register_gauge( - "litep2p_tcp_pending_raw_connections", - "Litep2p number of pending raw connections", + "litep2p_tcp_pending_raw_connections".into(), + "Litep2p number of pending raw connections".into(), )?, open_raw_connections_num: registry.register_gauge( - "litep2p_tcp_open_raw_connections", - "Litep2p number of open raw connections", + "litep2p_tcp_open_raw_connections".into(), + "Litep2p number of open raw connections".into(), )?, cancel_futures_num: registry.register_gauge( - "litep2p_tcp_cancel_futures", - "Litep2p number of cancel futures", + "litep2p_tcp_cancel_futures".into(), + "Litep2p number of cancel futures".into(), )?, pending_open_num: registry.register_gauge( - "litep2p_tcp_pending_open", - "Litep2p number of pending open connections", + "litep2p_tcp_pending_open".into(), + "Litep2p number of pending open connections".into(), + )?, + + active_connections_num: registry.register_gauge( + "litep2p_tcp_active_connections".into(), + "Litep2p number of active connections".into(), + )?, + + pending_substreams_num: registry.register_gauge( + "litep2p_tcp_pending_substreams".into(), + "Litep2p number of pending substreams".into(), )?, }) } else { @@ -421,11 +446,17 @@ impl Transport for TcpTransport { "start connection", ); + let metrics = self.metrics.as_ref().map(|metrics| metrics.to_connection_metrics()); self.context.executor.run(Box::pin(async move { - if let Err(error) = - TcpConnection::new(context, protocol_set, bandwidth_sink, next_substream_id) - .start() - .await + if let Err(error) = TcpConnection::new( + context, + protocol_set, + bandwidth_sink, + next_substream_id, + metrics, + ) + .start() + .await { tracing::debug!( target: LOG_TARGET, From 05f57412cafc25291f7cca20ade6d5e4036f5d46 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 16:43:34 +0200 Subject: [PATCH 12/29] websocket: Add specific metrics Signed-off-by: Alexandru Vasile --- src/transport/tcp/connection.rs | 4 +- src/transport/tcp/mod.rs | 1 + src/transport/websocket/connection.rs | 27 +++++++ src/transport/websocket/mod.rs | 107 +++++++++++++++++++++++++- 4 files changed, 137 insertions(+), 2 deletions(-) diff --git a/src/transport/tcp/connection.rs b/src/transport/tcp/connection.rs index f184bb75..085a713b 100644 --- a/src/transport/tcp/connection.rs +++ b/src/transport/tcp/connection.rs @@ -141,6 +141,7 @@ impl NegotiatedConnection { } } +/// Connection specific metrics. pub struct TcpConnectionMetrics { /// Metric for the number of pending substreams that are negotiated. pub pending_substreams_num: MetricGauge, @@ -756,7 +757,8 @@ impl TcpConnection { }, substream = self.pending_substreams.select_next_some(), if !self.pending_substreams.is_empty() => { if let Some(metrics) = &self.metrics { - metrics.pending_substreams_num.set(self.pending_substreams.len() as u64); + // This must be decremented and not set because the metric is shared across connections. + metrics.pending_substreams_num.dec(); } self.handle_negotiated_substream(substream).await; diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index c0dc4e7f..a5aef70c 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -136,6 +136,7 @@ pub(crate) struct TcpTransport { metrics: Option, } +/// TCP specific metrics. struct TcpMetrics { /// Interval for collecting metrics. /// diff --git a/src/transport/websocket/connection.rs b/src/transport/websocket/connection.rs index 3635e655..90d77d2e 100644 --- a/src/transport/websocket/connection.rs +++ b/src/transport/websocket/connection.rs @@ -25,6 +25,7 @@ use crate::{ noise::{self, NoiseSocket}, }, error::{Error, NegotiationError, SubstreamError}, + metrics::{MetricGauge, ScopeGaugeMetric}, multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}, protocol::{Direction, Permit, ProtocolCommand, ProtocolSet}, substream, @@ -130,6 +131,16 @@ impl NegotiatedConnection { } } +/// Connection specific metrics. +pub struct WebSocketConnectionMetrics { + /// Metric for the number of pending substreams that are negotiated. + pub pending_substreams_num: MetricGauge, + + /// Metric incremented when the connection starts + /// and decremented when the connection closes. + pub _active_connections_num: ScopeGaugeMetric, +} + /// WebSocket connection. pub(crate) struct WebSocketConnection { /// Protocol context. @@ -160,6 +171,9 @@ pub(crate) struct WebSocketConnection { /// Pending substreams. pending_substreams: FuturesUnordered>>, + + /// Metrics. + metrics: Option, } impl WebSocketConnection { @@ -169,6 +183,7 @@ impl WebSocketConnection { protocol_set: ProtocolSet, bandwidth_sink: BandwidthSink, substream_open_timeout: Duration, + metrics: Option, ) -> Self { let NegotiatedConnection { peer, @@ -187,6 +202,7 @@ impl WebSocketConnection { bandwidth_sink, substream_open_timeout, pending_substreams: FuturesUnordered::new(), + metrics, } } @@ -454,6 +470,9 @@ impl WebSocketConnection { }), } })); + if let Some(metrics) = &self.metrics { + metrics.pending_substreams_num.inc(); + } }, Some(Err(error)) => { tracing::debug!( @@ -475,6 +494,11 @@ impl WebSocketConnection { }, // TODO: move this to a function substream = self.pending_substreams.select_next_some(), if !self.pending_substreams.is_empty() => { + if let Some(metrics) = &self.metrics { + // This must be decremented and not set because the metric is shared across connections. + metrics.pending_substreams_num.dec(); + } + match substream { // TODO: return error to protocol Err(error) => { @@ -556,6 +580,9 @@ impl WebSocketConnection { }), } })); + if let Some(metrics) = &self.metrics { + metrics.pending_substreams_num.inc(); + } } Some(ProtocolCommand::ForceClose) => { tracing::debug!( diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index bd8b656c..48aa87c8 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -23,6 +23,7 @@ use crate::{ config::Role, error::{AddressError, Error, NegotiationError}, + metrics::{MetricGauge, ScopeGaugeMetric}, transport::{ common::listener::{DialAddresses, GetSocketAddr, SocketListener, WebSocketAddress}, manager::TransportHandle, @@ -37,6 +38,7 @@ use crate::{ DialError, PeerId, }; +use connection::WebSocketConnectionMetrics; use futures::{ future::BoxFuture, stream::{AbortHandle, FuturesUnordered}, @@ -132,6 +134,42 @@ pub(crate) struct WebSocketTransport { /// Negotiated connections waiting validation. pending_open: HashMap, + + /// Websocket metrics. + metrics: Option, +} + +/// Websocket specific metrics. +struct WebSocketMetrics { + /// Interval for collecting metrics. + /// + /// This is a tradeoff we make in favor of simplicity and correctness. + /// An alternative to this would be to complicate the code by collecting + /// individual metrics in each method. This is error prone, as names are + /// easily mismatched, and it's hard to keep track of all the metrics. + interval: tokio::time::Interval, + + /// The following metrics are used for the transport itself. + pending_dials_num: MetricGauge, + pending_inbound_connections_num: MetricGauge, + pending_connections_num: MetricGauge, + pending_raw_connections_num: MetricGauge, + open_raw_connections_num: MetricGauge, + cancel_futures_num: MetricGauge, + pending_open_num: MetricGauge, + + /// The following metrics are shared with all TCP connections. + active_connections_num: MetricGauge, + pending_substreams_num: MetricGauge, +} + +impl WebSocketMetrics { + fn to_connection_metrics(&self) -> WebSocketConnectionMetrics { + WebSocketConnectionMetrics { + _active_connections_num: ScopeGaugeMetric::new(self.active_connections_num.clone()), + pending_substreams_num: self.pending_substreams_num.clone(), + } + } } impl WebSocketTransport { @@ -300,7 +338,7 @@ impl TransportBuilder for WebSocketTransport { fn new( context: TransportHandle, mut config: Self::Config, - _registry: Option, + registry: Option, ) -> crate::Result<(Self, Vec)> where Self: Sized, @@ -316,6 +354,53 @@ impl TransportBuilder for WebSocketTransport { config.nodelay, ); + let metrics = if let Some(registry) = registry { + Some(WebSocketMetrics { + interval: tokio::time::interval(Duration::from_secs(15)), + + pending_dials_num: registry.register_gauge( + "litep2p_websocket_pending_dials".into(), + "Litep2p number of pending dials".into(), + )?, + pending_inbound_connections_num: registry.register_gauge( + "litep2p_websocket_pending_inbound_connections".into(), + "Litep2p number of pending inbound connections".into(), + )?, + pending_connections_num: registry.register_gauge( + "litep2p_websocket_pending_connections".into(), + "Litep2p number of pending connections".into(), + )?, + pending_raw_connections_num: registry.register_gauge( + "litep2p_websocket_pending_raw_connections".into(), + "Litep2p number of pending raw connections".into(), + )?, + open_raw_connections_num: registry.register_gauge( + "litep2p_websocket_open_raw_connections".into(), + "Litep2p number of open raw connections".into(), + )?, + cancel_futures_num: registry.register_gauge( + "litep2p_websocket_cancel_futures".into(), + "Litep2p number of cancel futures".into(), + )?, + pending_open_num: registry.register_gauge( + "litep2p_websocket_pending_open".into(), + "Litep2p number of pending open connections".into(), + )?, + + active_connections_num: registry.register_gauge( + "litep2p_websocket_active_connections".into(), + "Litep2p number of active connections".into(), + )?, + + pending_substreams_num: registry.register_gauge( + "litep2p_websocket_pending_substreams".into(), + "Litep2p number of pending substreams".into(), + )?, + }) + } else { + None + }; + Ok(( Self { listener, @@ -329,6 +414,7 @@ impl TransportBuilder for WebSocketTransport { pending_connections: FuturesStream::new(), pending_raw_connections: FuturesStream::new(), cancel_futures: HashMap::new(), + metrics, }, listen_addresses, )) @@ -401,12 +487,14 @@ impl Transport for WebSocketTransport { "start connection", ); + let metrics = self.metrics.as_ref().map(|metrics| metrics.to_connection_metrics()); self.context.executor.run(Box::pin(async move { if let Err(error) = WebSocketConnection::new( context, protocol_set, bandwidth_sink, substream_open_timeout, + metrics, ) .start() .await @@ -588,6 +676,23 @@ impl Stream for WebSocketTransport { type Item = TransportEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Take the metrics to only poll the tick in case they are enabled. + if let Some(mut metrics) = self.metrics.take() { + if let Poll::Ready(_) = metrics.interval.poll_tick(cx) { + metrics.pending_dials_num.set(self.pending_dials.len() as u64); + metrics + .pending_inbound_connections_num + .set(self.pending_inbound_connections.len() as u64); + metrics.pending_connections_num.set(self.pending_connections.len() as u64); + metrics + .pending_raw_connections_num + .set(self.pending_raw_connections.len() as u64); + metrics.open_raw_connections_num.set(self.opened_raw.len() as u64); + metrics.cancel_futures_num.set(self.cancel_futures.len() as u64); + metrics.pending_open_num.set(self.pending_open.len() as u64); + } + } + if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) { return match event { None => { From f6d478d77eb30ff9b447f3878c45046aab2452fc Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 17:43:50 +0200 Subject: [PATCH 13/29] req-resp: Propagate metrics Signed-off-by: Alexandru Vasile --- src/lib.rs | 12 ++- src/protocol/request_response/mod.rs | 131 ++++++++++++++++++++++++++- 2 files changed, 136 insertions(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 1779ece6..02d05143 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -208,9 +208,15 @@ impl Litep2p { config.codec, litep2p_config.keep_alive_timeout, ); - litep2p_config.executor.run(Box::pin(async move { - RequestResponseProtocol::new(service, config).run().await - })); + let request_response = RequestResponseProtocol::new( + service, + config, + litep2p_config.metrics_registry.clone(), + )?; + + litep2p_config + .executor + .run(Box::pin(async move { request_response.run().await })); } // start user protocol event loops diff --git a/src/protocol/request_response/mod.rs b/src/protocol/request_response/mod.rs index 38e631a1..4f93a657 100644 --- a/src/protocol/request_response/mod.rs +++ b/src/protocol/request_response/mod.rs @@ -22,6 +22,7 @@ use crate::{ error::{Error, NegotiationError, SubstreamError}, + metrics::{MetricGauge, MetricsRegistry}, multistream_select::NegotiationError::Failed as MultistreamFailed, protocol::{ request_response::handle::{InnerRequestResponseEvent, RequestResponseCommand}, @@ -33,7 +34,7 @@ use crate::{ }; use bytes::BytesMut; -use futures::{channel, future::BoxFuture, stream::FuturesUnordered, StreamExt}; +use futures::{channel, future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; use tokio::{ sync::{ mpsc::{Receiver, Sender}, @@ -45,10 +46,12 @@ use tokio::{ use std::{ collections::{hash_map::Entry, HashMap, HashSet}, io::ErrorKind, + pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, + task::{Context, Poll}, time::Duration, }; @@ -183,12 +186,107 @@ pub(crate) struct RequestResponseProtocol { /// Maximum concurrent inbound requests, if specified. max_concurrent_inbound_requests: Option, + + /// Metrics. + metrics: PollMetrics, +} + +struct PollMetrics { + inner: Option, +} + +impl Stream for PollMetrics { + type Item = tokio::time::Instant; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.inner.take() { + Some(mut metrics) => match metrics.interval.poll_tick(cx) { + Poll::Ready(_) => { + self.inner = Some(metrics); + return Poll::Ready(Some(tokio::time::Instant::now())); + } + Poll::Pending => { + self.inner = Some(metrics); + return Poll::Pending; + } + }, + None => Poll::Pending, + } + } +} + +/// Request-response protocol metrics. +struct Metrics { + /// Interval for collecting metrics. + /// + /// This is a tradeoff we make in favor of simplicity and correctness. + /// An alternative to this would be to complicate the code by collecting + /// individual metrics in each method. This is error prone, as names are + /// easily mismatched, and it's hard to keep track of all the metrics. + interval: tokio::time::Interval, + + connected_peers: MetricGauge, + pending_outbound_num: MetricGauge, + pending_outbound_responses_num: MetricGauge, + pending_outbound_cancels_num: MetricGauge, + pending_inbound_num: MetricGauge, + pending_inbound_requests_num: MetricGauge, + pending_dials_num: MetricGauge, } impl RequestResponseProtocol { /// Create new [`RequestResponseProtocol`]. - pub(crate) fn new(service: TransportService, config: Config) -> Self { - Self { + pub(crate) fn new( + service: TransportService, + config: Config, + registry: Option, + ) -> Result { + let metrics = if let Some(registry) = registry { + let protocol_name = config.protocol_name.to_string().replace("/", "_"); + + let metrics = Metrics { + interval: tokio::time::interval(Duration::from_secs(15)), + + connected_peers: registry.register_gauge( + format!("litep2p_req_res{}_connected_peers", protocol_name), + "Litep2p number of connected peers".into(), + )?, + pending_outbound_num: registry.register_gauge( + format!("litep2p_req_res{}_pending_outbound", protocol_name), + "Litep2p number of pending outbound requests".into(), + )?, + pending_outbound_responses_num: registry.register_gauge( + format!( + "litep2p_req_res{}_pending_outbound_responses", + protocol_name + ), + "Litep2p number of pending outbound responses".into(), + )?, + pending_outbound_cancels_num: registry.register_gauge( + format!("litep2p_req_res{}_pending_outbound_cancels", protocol_name), + "Litep2p number of pending outbound cancels".into(), + )?, + pending_inbound_num: registry.register_gauge( + format!("litep2p_req_res{}_pending_inbound", protocol_name), + "Litep2p number of pending inbound requests".into(), + )?, + pending_inbound_requests_num: registry.register_gauge( + format!("litep2p_req_res{}_pending_inbound_requests", protocol_name), + "Litep2p number of pending inbound requests".into(), + )?, + pending_dials_num: registry.register_gauge( + format!("litep2p_req_res{}_pending_dials", protocol_name), + "Litep2p number of pending dials".into(), + )?, + }; + PollMetrics { + inner: Some(metrics), + } + } else { + PollMetrics { inner: None } + }; + + Ok(Self { service, peers: HashMap::new(), timeout: config.timeout, @@ -203,7 +301,8 @@ impl RequestResponseProtocol { pending_inbound_requests: SubstreamSet::new(), pending_outbound_responses: FuturesUnordered::new(), max_concurrent_inbound_requests: config.max_concurrent_inbound_request, - } + metrics, + }) } /// Get next ephemeral request ID. @@ -1013,6 +1112,25 @@ impl RequestResponseProtocol { } } + /// Report metrics. + fn report_metrics(&self) { + if let Some(metrics) = self.metrics.inner.as_ref() { + metrics.connected_peers.set(self.peers.len() as u64); + metrics.pending_outbound_num.set(self.pending_outbound.len() as u64); + metrics + .pending_outbound_responses_num + .set(self.pending_outbound_responses.len() as u64); + metrics + .pending_outbound_cancels_num + .set(self.pending_outbound_cancels.len() as u64); + metrics.pending_inbound_num.set(self.pending_inbound.len() as u64); + metrics + .pending_inbound_requests_num + .set(self.pending_inbound_requests.len() as u64); + metrics.pending_dials_num.set(self.pending_dials.len() as u64); + } + } + /// Start [`RequestResponseProtocol`] event loop. pub async fn run(mut self) { tracing::debug!(target: LOG_TARGET, "starting request-response event loop"); @@ -1078,6 +1196,11 @@ impl RequestResponseProtocol { return } }, + + // Maybe metrics. + _ = self.metrics.next() => { + self.report_metrics(); + } } } } From b5e80e2ed9b938f329a3d17af985316e8328b60a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 18:07:28 +0200 Subject: [PATCH 14/29] notifications: Expose metrics Signed-off-by: Alexandru Vasile --- src/lib.rs | 11 +- src/protocol/notification/mod.rs | 128 ++++++++++++++++++++++- src/protocol/notification/negotiation.rs | 10 ++ 3 files changed, 141 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 02d05143..56fffe79 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -189,9 +189,14 @@ impl Litep2p { litep2p_config.keep_alive_timeout, ); let executor = Arc::clone(&litep2p_config.executor); - litep2p_config.executor.run(Box::pin(async move { - NotificationProtocol::new(service, config, executor).run().await - })); + let notification = NotificationProtocol::new( + service, + config, + executor, + litep2p_config.metrics_registry.clone(), + )?; + + litep2p_config.executor.run(Box::pin(async move { notification.run().await })); } // start request-response protocol event loops diff --git a/src/protocol/notification/mod.rs b/src/protocol/notification/mod.rs index 42063081..412ac2d0 100644 --- a/src/protocol/notification/mod.rs +++ b/src/protocol/notification/mod.rs @@ -23,6 +23,7 @@ use crate::{ error::{Error, SubstreamError}, executor::Executor, + metrics::{MetricGauge, MetricsRegistry}, protocol::{ self, notification::{ @@ -39,14 +40,20 @@ use crate::{ }; use bytes::BytesMut; -use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; +use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; use multiaddr::Multiaddr; use tokio::sync::{ mpsc::{channel, Receiver, Sender}, oneshot, }; -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; pub use config::{Config, ConfigBuilder}; pub use handle::{NotificationHandle, NotificationSink}; @@ -282,6 +289,51 @@ pub(crate) struct NotificationProtocol { /// Should `NotificationProtocol` attempt to dial the peer. should_dial: bool, + + /// Metrics. + metrics: PollMetrics, +} + +/// Request-response protocol metrics. +struct Metrics { + /// Interval for collecting metrics. + /// + /// This is a tradeoff we make in favor of simplicity and correctness. + /// An alternative to this would be to complicate the code by collecting + /// individual metrics in each method. This is error prone, as names are + /// easily mismatched, and it's hard to keep track of all the metrics. + interval: tokio::time::Interval, + + connected_peers: MetricGauge, + pending_outbound_num: MetricGauge, + pending_outbound_handshake_num: MetricGauge, + ready_substreams_handshake_num: MetricGauge, + pending_validations_num: MetricGauge, + timers_num: MetricGauge, +} + +struct PollMetrics { + inner: Option, +} + +impl Stream for PollMetrics { + type Item = tokio::time::Instant; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.inner.take() { + Some(mut metrics) => match metrics.interval.poll_tick(cx) { + Poll::Ready(_) => { + self.inner = Some(metrics); + return Poll::Ready(Some(tokio::time::Instant::now())); + } + Poll::Pending => { + self.inner = Some(metrics); + return Poll::Pending; + } + }, + None => Poll::Pending, + } + } } impl NotificationProtocol { @@ -289,10 +341,56 @@ impl NotificationProtocol { service: TransportService, config: Config, executor: Arc, - ) -> Self { + registry: Option, + ) -> Result { let (shutdown_tx, shutdown_rx) = channel(DEFAULT_CHANNEL_SIZE); - Self { + let metrics = if let Some(registry) = registry { + let protocol_name = config.protocol_name.to_string().replace("/", "_"); + + let metrics = Metrics { + interval: tokio::time::interval(Duration::from_secs(15)), + + connected_peers: registry.register_gauge( + format!("litep2p_notif{}_connected_peers", protocol_name), + "Number of connected peers".to_string(), + )?, + pending_outbound_num: registry.register_gauge( + format!("litep2p_notif{}_pending_outbound_num", protocol_name), + "Number of pending outbound substreams".to_string(), + )?, + pending_outbound_handshake_num: registry.register_gauge( + format!( + "litep2p_notif{}_pending_outbound_handshake_num", + protocol_name + ), + "Number of pending outbound substreams with handshake".to_string(), + )?, + ready_substreams_handshake_num: registry.register_gauge( + format!( + "litep2p_notif{}_ready_substreams_handshake_num", + protocol_name + ), + "Number of ready substreams with handshake".to_string(), + )?, + pending_validations_num: registry.register_gauge( + format!("litep2p_notif{}_pending_validations_num", protocol_name), + "Number of pending substream validations".to_string(), + )?, + timers_num: registry.register_gauge( + format!("litep2p_notif{}_timers_num", protocol_name), + "Number of pending timers".to_string(), + )?, + }; + + PollMetrics { + inner: Some(metrics), + } + } else { + PollMetrics { inner: None } + }; + + Ok(Self { service, shutdown_tx, shutdown_rx, @@ -310,7 +408,8 @@ impl NotificationProtocol { sync_channel_size: config.sync_channel_size, async_channel_size: config.async_channel_size, should_dial: config.should_dial, - } + metrics, + }) } /// Connection established to remote node. @@ -1603,6 +1702,20 @@ impl NotificationProtocol { } } + /// Report metrics. + fn report_metrics(&self) { + if let Some(metrics) = self.metrics.inner.as_ref() { + metrics.connected_peers.set(self.peers.len() as u64); + metrics.pending_outbound_num.set(self.pending_outbound.len() as u64); + metrics + .pending_outbound_handshake_num + .set(self.negotiation.substreams_len() as u64); + metrics.ready_substreams_handshake_num.set(self.negotiation.ready_len() as u64); + metrics.pending_validations_num.set(self.pending_validations.len() as u64); + metrics.timers_num.set(self.timers.len() as u64); + } + } + /// Handle next notification event. async fn next_event(&mut self) { // biased select is used because the substream events must be prioritized above other events @@ -1808,6 +1921,11 @@ impl NotificationProtocol { } } }, + + // Maybe metrics. + _ = self.metrics.next() => { + self.report_metrics(); + } } } diff --git a/src/protocol/notification/negotiation.rs b/src/protocol/notification/negotiation.rs index 9c53c760..c57a61db 100644 --- a/src/protocol/notification/negotiation.rs +++ b/src/protocol/notification/negotiation.rs @@ -116,6 +116,16 @@ impl HandshakeService { } } + /// Number of substreams in the [`HandshakeService`]. + pub fn substreams_len(&self) -> usize { + self.substreams.len() + } + + /// Number of ready substreams in the [`HandshakeService`]. + pub fn ready_len(&self) -> usize { + self.ready.len() + } + /// Remove outbound substream from [`HandshakeService`]. pub fn remove_outbound(&mut self, peer: &PeerId) -> Option { self.substreams From fc70eea85bc7d49622de29b5a5277d8e08c8f238 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 18:18:34 +0200 Subject: [PATCH 15/29] ping: Add metrics Signed-off-by: Alexandru Vasile --- src/lib.rs | 8 +++++- src/protocol/libp2p/ping/mod.rs | 48 ++++++++++++++++++++++++++++++--- 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 56fffe79..53e7188a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -253,8 +253,14 @@ impl Litep2p { ping_config.codec, litep2p_config.keep_alive_timeout, ); + let ping = Ping::new( + service, + ping_config, + litep2p_config.metrics_registry.clone(), + )?; + litep2p_config.executor.run(Box::pin(async move { - Ping::new(service, ping_config).run().await + ping.run().await })); } diff --git a/src/protocol/libp2p/ping/mod.rs b/src/protocol/libp2p/ping/mod.rs index efe6fad0..1ffbc86e 100644 --- a/src/protocol/libp2p/ping/mod.rs +++ b/src/protocol/libp2p/ping/mod.rs @@ -22,6 +22,7 @@ use crate::{ error::{Error, SubstreamError}, + metrics::{MetricGauge, MetricsRegistry}, protocol::{Direction, TransportEvent, TransportService}, substream::Substream, types::SubstreamId, @@ -77,19 +78,50 @@ pub(crate) struct Ping { /// Pending inbound substreams. pending_inbound: FuturesUnordered>>, + + /// Metrics. + metrics: Option, +} + +struct Metrics { + peers: MetricGauge, + pending_outbound: MetricGauge, + pending_inbound: MetricGauge, } impl Ping { /// Create new [`Ping`] protocol. - pub fn new(service: TransportService, config: Config) -> Self { - Self { + pub fn new( + service: TransportService, + config: Config, + registry: Option, + ) -> Result { + let metrics = if let Some(registry) = registry { + Some(Metrics { + peers: registry + .register_gauge("litep2p_ping_peers".into(), "Connected peers".into())?, + pending_outbound: registry.register_gauge( + "litep2p_ping_pending_outbound".into(), + "Pending outbound substreams".into(), + )?, + pending_inbound: registry.register_gauge( + "litep2p_ping_pending_inbound".into(), + "Pending inbound substreams".into(), + )?, + }) + } else { + None + }; + + Ok(Self { service, tx: config.tx_event, peers: HashSet::new(), pending_outbound: FuturesUnordered::new(), pending_inbound: FuturesUnordered::new(), _max_failures: config.max_failures, - } + metrics, + }) } /// Connection established to remote peer. @@ -98,6 +130,7 @@ impl Ping { self.service.open_substream(peer)?; self.peers.insert(peer); + self.metrics.as_ref().map(|metrics| metrics.peers.set(self.peers.len() as u64)); Ok(()) } @@ -107,6 +140,7 @@ impl Ping { tracing::trace!(target: LOG_TARGET, ?peer, "connection closed"); self.peers.remove(&peer); + self.metrics.as_ref().map(|metrics| metrics.peers.set(self.peers.len() as u64)); } /// Handle outbound substream. @@ -137,6 +171,7 @@ impl Ping { Ok(Ok(elapsed)) => Ok((peer, elapsed)), } })); + self.metrics.as_ref().map(|metrics| metrics.pending_outbound.inc()); } /// Substream opened to remote peer. @@ -161,6 +196,7 @@ impl Ping { Ok(Ok(())) => Ok(()), } })); + self.metrics.as_ref().map(|metrics| metrics.pending_inbound.inc()); } /// Start [`Ping`] event loop. @@ -192,8 +228,12 @@ impl Ping { Some(_) => {} None => return, }, - _event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {} + _event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => { + self.metrics.as_ref().map(|metrics| metrics.pending_inbound.set(self.pending_inbound.len() as u64)); + } event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => { + self.metrics.as_ref().map(|metrics| metrics.pending_outbound.set(self.pending_outbound.len() as u64)); + match event { Some(Ok((peer, elapsed))) => { let _ = self From b91b8a677de3da9189dc940f53039b70e40b26d5 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 18:23:25 +0200 Subject: [PATCH 16/29] identify: Propagate metrics Signed-off-by: Alexandru Vasile --- src/lib.rs | 10 +++++--- src/protocol/libp2p/identify.rs | 44 ++++++++++++++++++++++++++++++--- src/protocol/libp2p/ping/mod.rs | 13 +++++----- 3 files changed, 53 insertions(+), 14 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 53e7188a..f7e0eb65 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -259,9 +259,7 @@ impl Litep2p { litep2p_config.metrics_registry.clone(), )?; - litep2p_config.executor.run(Box::pin(async move { - ping.run().await - })); + litep2p_config.executor.run(Box::pin(async move { ping.run().await })); } // start kademlia protocol event loop if enabled @@ -422,7 +420,11 @@ impl Litep2p { // if identify was enabled, give it the enabled protocols and listen addresses and start it if let Some((service, mut identify_config)) = identify_info.take() { identify_config.protocols = transport_manager.protocols().cloned().collect(); - let identify = Identify::new(service, identify_config); + let identify = Identify::new( + service, + identify_config, + litep2p_config.metrics_registry.clone(), + )?; litep2p_config.executor.run(Box::pin(async move { let _ = identify.run().await; diff --git a/src/protocol/libp2p/identify.rs b/src/protocol/libp2p/identify.rs index e8fa4f50..8592de77 100644 --- a/src/protocol/libp2p/identify.rs +++ b/src/protocol/libp2p/identify.rs @@ -24,6 +24,7 @@ use crate::{ codec::ProtocolCodec, crypto::PublicKey, error::{Error, SubstreamError}, + metrics::{MetricGauge, MetricsRegistry}, protocol::{Direction, TransportEvent, TransportService}, substream::Substream, transport::Endpoint, @@ -185,12 +186,42 @@ pub(crate) struct Identify { /// Pending inbound substreams. pending_inbound: FuturesUnordered>, + + /// Metrics. + metrics: Option, +} + +struct Metrics { + peers: MetricGauge, + pending_outbound: MetricGauge, + pending_inbound: MetricGauge, } impl Identify { /// Create new [`Identify`] protocol. - pub(crate) fn new(service: TransportService, config: Config) -> Self { - Self { + pub(crate) fn new( + service: TransportService, + config: Config, + registry: Option, + ) -> Result { + let metrics = if let Some(registry) = registry { + Some(Metrics { + peers: registry + .register_gauge("litep2p_identify_peers".into(), "Connected peers".into())?, + pending_outbound: registry.register_gauge( + "litep2p_identify_pending_outbound".into(), + "Pending outbound substreams".into(), + )?, + pending_inbound: registry.register_gauge( + "litep2p_identify_pending_inbound".into(), + "Pending inbound substreams".into(), + )?, + }) + } else { + None + }; + + Ok(Self { service, tx: config.tx_event, peers: HashMap::new(), @@ -200,7 +231,8 @@ impl Identify { pending_inbound: FuturesUnordered::new(), pending_outbound: FuturesUnordered::new(), protocols: config.protocols.iter().map(|protocol| protocol.to_string()).collect(), - } + metrics, + }) } /// Connection established to remote peer. @@ -354,6 +386,12 @@ impl Identify { tracing::debug!(target: LOG_TARGET, "starting identify event loop"); loop { + if let Some(metrics) = &self.metrics { + metrics.peers.set(self.peers.len() as u64); + metrics.pending_inbound.set(self.pending_inbound.len() as u64); + metrics.pending_outbound.set(self.pending_outbound.len() as u64); + } + tokio::select! { event = self.service.next() => match event { None => return, diff --git a/src/protocol/libp2p/ping/mod.rs b/src/protocol/libp2p/ping/mod.rs index 1ffbc86e..7358bc34 100644 --- a/src/protocol/libp2p/ping/mod.rs +++ b/src/protocol/libp2p/ping/mod.rs @@ -130,7 +130,6 @@ impl Ping { self.service.open_substream(peer)?; self.peers.insert(peer); - self.metrics.as_ref().map(|metrics| metrics.peers.set(self.peers.len() as u64)); Ok(()) } @@ -140,7 +139,6 @@ impl Ping { tracing::trace!(target: LOG_TARGET, ?peer, "connection closed"); self.peers.remove(&peer); - self.metrics.as_ref().map(|metrics| metrics.peers.set(self.peers.len() as u64)); } /// Handle outbound substream. @@ -171,7 +169,6 @@ impl Ping { Ok(Ok(elapsed)) => Ok((peer, elapsed)), } })); - self.metrics.as_ref().map(|metrics| metrics.pending_outbound.inc()); } /// Substream opened to remote peer. @@ -196,7 +193,6 @@ impl Ping { Ok(Ok(())) => Ok(()), } })); - self.metrics.as_ref().map(|metrics| metrics.pending_inbound.inc()); } /// Start [`Ping`] event loop. @@ -204,6 +200,12 @@ impl Ping { tracing::debug!(target: LOG_TARGET, "starting ping event loop"); loop { + if let Some(metrics) = &self.metrics { + metrics.peers.set(self.peers.len() as u64); + metrics.pending_inbound.set(self.pending_inbound.len() as u64); + metrics.pending_outbound.set(self.pending_outbound.len() as u64); + } + tokio::select! { event = self.service.next() => match event { Some(TransportEvent::ConnectionEstablished { peer, .. }) => { @@ -229,11 +231,8 @@ impl Ping { None => return, }, _event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => { - self.metrics.as_ref().map(|metrics| metrics.pending_inbound.set(self.pending_inbound.len() as u64)); } event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => { - self.metrics.as_ref().map(|metrics| metrics.pending_outbound.set(self.pending_outbound.len() as u64)); - match event { Some(Ok((peer, elapsed))) => { let _ = self From 48cb7ca9da1cd11aa6f21616dbd27b35c984c04f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 18:41:43 +0200 Subject: [PATCH 17/29] kad: Add metrics Signed-off-by: Alexandru Vasile --- src/lib.rs | 8 ++- src/protocol/libp2p/kademlia/executor.rs | 5 ++ src/protocol/libp2p/kademlia/mod.rs | 74 ++++++++++++++++++++++- src/protocol/libp2p/kademlia/query/mod.rs | 5 ++ src/protocol/libp2p/kademlia/store.rs | 20 ++++++ 5 files changed, 108 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f7e0eb65..6fc0a7a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -280,8 +280,14 @@ impl Litep2p { kademlia_config.codec, litep2p_config.keep_alive_timeout, ); + let kad = Kademlia::new( + service, + kademlia_config, + litep2p_config.metrics_registry.clone(), + )?; + litep2p_config.executor.run(Box::pin(async move { - let _ = Kademlia::new(service, kademlia_config).run().await; + let _ = kad.run().await; })); } diff --git a/src/protocol/libp2p/kademlia/executor.rs b/src/protocol/libp2p/kademlia/executor.rs index e94a09f2..155c8316 100644 --- a/src/protocol/libp2p/kademlia/executor.rs +++ b/src/protocol/libp2p/kademlia/executor.rs @@ -89,6 +89,11 @@ impl QueryExecutor { } } + /// Get number of pending futures. + pub fn futures_len(&self) -> usize { + self.futures.len() + } + /// Send message to remote peer. pub fn send_message(&mut self, peer: PeerId, message: Bytes, mut substream: Substream) { self.futures.push(Box::pin(async move { diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 03b98ea2..584d08ad 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -22,6 +22,7 @@ use crate::{ error::{Error, ImmediateDialError, SubstreamError}, + metrics::{MetricGauge, MetricsRegistry}, protocol::{ libp2p::kademlia::{ bucket::KBucketEntry, @@ -168,11 +169,30 @@ pub(crate) struct Kademlia { /// Query executor. executor: QueryExecutor, + + /// Metrics. + metrics: Option, +} + +struct Metrics { + peers: MetricGauge, + + mem_store_records: MetricGauge, + mem_store_providers: MetricGauge, + mem_store_local_providers: MetricGauge, + mem_store_provider_refresh: MetricGauge, + + engine_queries: MetricGauge, + executor_queries: MetricGauge, } impl Kademlia { /// Create new [`Kademlia`]. - pub(crate) fn new(mut service: TransportService, config: Config) -> Self { + pub(crate) fn new( + mut service: TransportService, + config: Config, + registry: Option, + ) -> Result { let local_peer_id = service.local_peer_id(); let local_key = Key::from(service.local_peer_id()); let mut routing_table = RoutingTable::new(local_key.clone()); @@ -193,7 +213,40 @@ impl Kademlia { }, ); - Self { + let metrics = if let Some(registry) = registry { + Some(Metrics { + peers: registry + .register_gauge("litep2p_kad_peers".into(), "Connected peers".into())?, + mem_store_records: registry.register_gauge( + "litep2p_kad_mem_store_records".into(), + "Number of records in memory store".into(), + )?, + mem_store_providers: registry.register_gauge( + "litep2p_kad_mem_store_providers".into(), + "Number of providers in memory store".into(), + )?, + mem_store_local_providers: registry.register_gauge( + "litep2p_kad_mem_store_local_providers".into(), + "Number of local providers in memory store".into(), + )?, + mem_store_provider_refresh: registry.register_gauge( + "litep2p_kad_mem_store_provider_refresh".into(), + "Number of provider refresh futures".into(), + )?, + engine_queries: registry.register_gauge( + "litep2p_kad_engine_queries".into(), + "Number of queries in the query engine".into(), + )?, + executor_queries: registry.register_gauge( + "litep2p_kad_executor_queries".into(), + "Number of queries in the query executor".into(), + )?, + }) + } else { + None + }; + + Ok(Self { service, routing_table, peers: HashMap::new(), @@ -210,7 +263,8 @@ impl Kademlia { record_ttl: config.record_ttl, replication_factor: config.replication_factor, engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR), - } + metrics, + }) } /// Allocate next query ID. @@ -877,6 +931,20 @@ impl Kademlia { tracing::debug!(target: LOG_TARGET, "starting kademlia event loop"); loop { + if let Some(metrics) = &self.metrics { + metrics.peers.set(self.peers.len() as u64); + + metrics.mem_store_records.set(self.store.records_len() as u64); + metrics.mem_store_providers.set(self.store.provider_keys_len() as u64); + metrics.mem_store_local_providers.set(self.store.local_providers_len() as u64); + metrics + .mem_store_provider_refresh + .set(self.store.pending_provider_refresh_len() as u64); + + metrics.engine_queries.set(self.engine.active_queries() as u64); + metrics.executor_queries.set(self.executor.futures_len() as u64); + } + // poll `QueryEngine` for next actions. while let Some(action) = self.engine.next_action() { if let Err((query, peer)) = self.on_query_action(action).await { diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index 9f14a6de..f95a176c 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -216,6 +216,11 @@ impl QueryEngine { } } + /// Get number of active queries. + pub fn active_queries(&self) -> usize { + self.queries.len() + } + /// Start `FIND_NODE` query. pub fn start_find_node( &mut self, diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index efb39f0f..8286c0ad 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -350,6 +350,26 @@ impl MemoryStore { } }) } + + /// Get the number of records in the store. + pub fn records_len(&self) -> usize { + self.records.len() + } + + /// Get the number of provider keys in the store. + pub fn provider_keys_len(&self) -> usize { + self.provider_keys.len() + } + + /// Get the number of local providers in the store. + pub fn local_providers_len(&self) -> usize { + self.local_providers.len() + } + + /// Get the number of pending provider refreshes in the store. + pub fn pending_provider_refresh_len(&self) -> usize { + self.pending_provider_refresh.len() + } } pub struct MemoryStoreConfig { From 27042cbd8364a1bf69f72390c0095d8f2c7b9657 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 19:00:31 +0200 Subject: [PATCH 18/29] transport/manager: Add metrics Signed-off-by: Alexandru Vasile --- src/lib.rs | 3 +- src/protocol/request_response/mod.rs | 2 +- src/transport/manager/limits.rs | 10 +++++ src/transport/manager/mod.rs | 64 ++++++++++++++++++++++++++-- 4 files changed, 74 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 6fc0a7a4..4262cefa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -165,7 +165,8 @@ impl Litep2p { bandwidth_sink.clone(), litep2p_config.max_parallel_dials, litep2p_config.connection_limits, - ); + litep2p_config.metrics_registry.clone(), + )?; // add known addresses to `TransportManager`, if any exist if !litep2p_config.known_addresses.is_empty() { diff --git a/src/protocol/request_response/mod.rs b/src/protocol/request_response/mod.rs index 4f93a657..54561268 100644 --- a/src/protocol/request_response/mod.rs +++ b/src/protocol/request_response/mod.rs @@ -1200,7 +1200,7 @@ impl RequestResponseProtocol { // Maybe metrics. _ = self.metrics.next() => { self.report_metrics(); - } + }, } } } diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 0af49eb1..a2dec9d1 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -81,6 +81,16 @@ impl ConnectionLimits { } } + /// Returns the number of established incoming connections. + pub fn num_incoming_connections(&self) -> usize { + self.incoming_connections.len() + } + + /// Returns the number of established outgoing connections. + pub fn num_outgoing_connections(&self) -> usize { + self.outgoing_connections.len() + } + /// Called when dialing an address. /// /// Returns the number of outgoing connections permitted to be established. diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index d7eba036..712a6aaa 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -24,6 +24,7 @@ use crate::{ crypto::ed25519::Keypair, error::{AddressError, DialError, Error}, executor::Executor, + metrics::{MetricGauge, MetricsRegistry}, protocol::{InnerTransportEvent, TransportService}, transport::{ manager::{ @@ -254,6 +255,17 @@ pub struct TransportManager { /// Opening connections errors. opening_errors: HashMap>, + + /// Metrics. + metrics: Option, +} + +struct Metrics { + peers: MetricGauge, + pending_connections: MetricGauge, + incoming_connections: MetricGauge, + outgoing_connections: MetricGauge, + opening_errors: MetricGauge, } impl TransportManager { @@ -265,7 +277,8 @@ impl TransportManager { bandwidth_sink: BandwidthSink, max_parallel_dials: usize, connection_limits_config: limits::ConnectionLimitsConfig, - ) -> (Self, TransportManagerHandle) { + registry: Option, + ) -> Result<(Self, TransportManagerHandle), Error> { let local_peer_id = PeerId::from_public_key(&keypair.public().into()); let peers = Arc::new(RwLock::new(HashMap::new())); let (cmd_tx, cmd_rx) = channel(256); @@ -281,7 +294,32 @@ impl TransportManager { public_addresses.clone(), ); - ( + let metrics = if let Some(registry) = registry { + Some(Metrics { + peers: registry + .register_gauge("litep2p_manager_peers".into(), "Connected peers".into())?, + pending_connections: registry.register_gauge( + "litep2p_manager_pending_connections".into(), + "Pending connections".into(), + )?, + incoming_connections: registry.register_gauge( + "litep2p_manager_incoming_connections".into(), + "Incoming connections".into(), + )?, + outgoing_connections: registry.register_gauge( + "litep2p_manager_outgoing_connections".into(), + "Outgoing connections".into(), + )?, + opening_errors: registry.register_gauge( + "litep2p_manager_opening_errors".into(), + "Opening errors".into(), + )?, + }) + } else { + None + }; + + Ok(( Self { peers, cmd_rx, @@ -302,9 +340,10 @@ impl TransportManager { next_connection_id: Arc::new(AtomicUsize::new(0usize)), connection_limits: limits::ConnectionLimits::new(connection_limits_config), opening_errors: HashMap::new(), + metrics, }, handle, - ) + )) } /// Get iterator to installed protocols. @@ -999,9 +1038,28 @@ impl TransportManager { Ok(None) } + /// Report metrics. + fn report_metrics(&self) { + if let Some(metrics) = &self.metrics { + metrics.peers.set(self.peers.read().len() as u64); + metrics.pending_connections.set(self.pending_connections.len() as u64); + + metrics + .incoming_connections + .set(self.connection_limits.num_incoming_connections() as u64); + metrics + .outgoing_connections + .set(self.connection_limits.num_outgoing_connections() as u64); + + metrics.opening_errors.set(self.opening_errors.len() as u64); + } + } + /// Poll next event from [`crate::transport::manager::TransportManager`]. pub async fn next(&mut self) -> Option { loop { + self.report_metrics(); + tokio::select! { event = self.event_rx.recv() => { let Some(event) = event else { From 2de8a68f6471738510e5bd8542e80543cb63d47c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 19:23:56 +0200 Subject: [PATCH 19/29] Simplify metrics collection Signed-off-by: Alexandru Vasile --- src/protocol/notification/mod.rs | 65 ++++------------------------ src/protocol/request_response/mod.rs | 58 ++++--------------------- 2 files changed, 17 insertions(+), 106 deletions(-) diff --git a/src/protocol/notification/mod.rs b/src/protocol/notification/mod.rs index 412ac2d0..77e44cc2 100644 --- a/src/protocol/notification/mod.rs +++ b/src/protocol/notification/mod.rs @@ -40,20 +40,14 @@ use crate::{ }; use bytes::BytesMut; -use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; +use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use multiaddr::Multiaddr; use tokio::sync::{ mpsc::{channel, Receiver, Sender}, oneshot, }; -use std::{ - collections::HashMap, - pin::Pin, - sync::Arc, - task::{Context, Poll}, - time::Duration, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; pub use config::{Config, ConfigBuilder}; pub use handle::{NotificationHandle, NotificationSink}; @@ -291,19 +285,11 @@ pub(crate) struct NotificationProtocol { should_dial: bool, /// Metrics. - metrics: PollMetrics, + metrics: Option, } /// Request-response protocol metrics. struct Metrics { - /// Interval for collecting metrics. - /// - /// This is a tradeoff we make in favor of simplicity and correctness. - /// An alternative to this would be to complicate the code by collecting - /// individual metrics in each method. This is error prone, as names are - /// easily mismatched, and it's hard to keep track of all the metrics. - interval: tokio::time::Interval, - connected_peers: MetricGauge, pending_outbound_num: MetricGauge, pending_outbound_handshake_num: MetricGauge, @@ -312,30 +298,6 @@ struct Metrics { timers_num: MetricGauge, } -struct PollMetrics { - inner: Option, -} - -impl Stream for PollMetrics { - type Item = tokio::time::Instant; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.inner.take() { - Some(mut metrics) => match metrics.interval.poll_tick(cx) { - Poll::Ready(_) => { - self.inner = Some(metrics); - return Poll::Ready(Some(tokio::time::Instant::now())); - } - Poll::Pending => { - self.inner = Some(metrics); - return Poll::Pending; - } - }, - None => Poll::Pending, - } - } -} - impl NotificationProtocol { pub(crate) fn new( service: TransportService, @@ -348,9 +310,7 @@ impl NotificationProtocol { let metrics = if let Some(registry) = registry { let protocol_name = config.protocol_name.to_string().replace("/", "_"); - let metrics = Metrics { - interval: tokio::time::interval(Duration::from_secs(15)), - + Some(Metrics { connected_peers: registry.register_gauge( format!("litep2p_notif{}_connected_peers", protocol_name), "Number of connected peers".to_string(), @@ -381,13 +341,9 @@ impl NotificationProtocol { format!("litep2p_notif{}_timers_num", protocol_name), "Number of pending timers".to_string(), )?, - }; - - PollMetrics { - inner: Some(metrics), - } + }) } else { - PollMetrics { inner: None } + None }; Ok(Self { @@ -1704,7 +1660,7 @@ impl NotificationProtocol { /// Report metrics. fn report_metrics(&self) { - if let Some(metrics) = self.metrics.inner.as_ref() { + if let Some(metrics) = &self.metrics { metrics.connected_peers.set(self.peers.len() as u64); metrics.pending_outbound_num.set(self.pending_outbound.len() as u64); metrics @@ -1718,6 +1674,8 @@ impl NotificationProtocol { /// Handle next notification event. async fn next_event(&mut self) { + self.report_metrics(); + // biased select is used because the substream events must be prioritized above other events // that is because a closed substream is detected by either `substreams` or `negotiation` // and if that event is not handled with priority but, e.g., inbound substream is @@ -1921,11 +1879,6 @@ impl NotificationProtocol { } } }, - - // Maybe metrics. - _ = self.metrics.next() => { - self.report_metrics(); - } } } diff --git a/src/protocol/request_response/mod.rs b/src/protocol/request_response/mod.rs index 54561268..bdc9e2b6 100644 --- a/src/protocol/request_response/mod.rs +++ b/src/protocol/request_response/mod.rs @@ -34,7 +34,7 @@ use crate::{ }; use bytes::BytesMut; -use futures::{channel, future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; +use futures::{channel, future::BoxFuture, stream::FuturesUnordered, StreamExt}; use tokio::{ sync::{ mpsc::{Receiver, Sender}, @@ -46,12 +46,10 @@ use tokio::{ use std::{ collections::{hash_map::Entry, HashMap, HashSet}, io::ErrorKind, - pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, - task::{Context, Poll}, time::Duration, }; @@ -188,43 +186,11 @@ pub(crate) struct RequestResponseProtocol { max_concurrent_inbound_requests: Option, /// Metrics. - metrics: PollMetrics, -} - -struct PollMetrics { - inner: Option, -} - -impl Stream for PollMetrics { - type Item = tokio::time::Instant; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.inner.take() { - Some(mut metrics) => match metrics.interval.poll_tick(cx) { - Poll::Ready(_) => { - self.inner = Some(metrics); - return Poll::Ready(Some(tokio::time::Instant::now())); - } - Poll::Pending => { - self.inner = Some(metrics); - return Poll::Pending; - } - }, - None => Poll::Pending, - } - } + metrics: Option, } /// Request-response protocol metrics. struct Metrics { - /// Interval for collecting metrics. - /// - /// This is a tradeoff we make in favor of simplicity and correctness. - /// An alternative to this would be to complicate the code by collecting - /// individual metrics in each method. This is error prone, as names are - /// easily mismatched, and it's hard to keep track of all the metrics. - interval: tokio::time::Interval, - connected_peers: MetricGauge, pending_outbound_num: MetricGauge, pending_outbound_responses_num: MetricGauge, @@ -244,9 +210,7 @@ impl RequestResponseProtocol { let metrics = if let Some(registry) = registry { let protocol_name = config.protocol_name.to_string().replace("/", "_"); - let metrics = Metrics { - interval: tokio::time::interval(Duration::from_secs(15)), - + Some(Metrics { connected_peers: registry.register_gauge( format!("litep2p_req_res{}_connected_peers", protocol_name), "Litep2p number of connected peers".into(), @@ -278,12 +242,9 @@ impl RequestResponseProtocol { format!("litep2p_req_res{}_pending_dials", protocol_name), "Litep2p number of pending dials".into(), )?, - }; - PollMetrics { - inner: Some(metrics), - } + }) } else { - PollMetrics { inner: None } + None }; Ok(Self { @@ -1114,7 +1075,7 @@ impl RequestResponseProtocol { /// Report metrics. fn report_metrics(&self) { - if let Some(metrics) = self.metrics.inner.as_ref() { + if let Some(metrics) = &self.metrics { metrics.connected_peers.set(self.peers.len() as u64); metrics.pending_outbound_num.set(self.pending_outbound.len() as u64); metrics @@ -1136,6 +1097,8 @@ impl RequestResponseProtocol { tracing::debug!(target: LOG_TARGET, "starting request-response event loop"); loop { + self.report_metrics(); + tokio::select! { // events coming from the network have higher priority than user commands as all user commands are // responses to network behaviour so ensure that the commands operate on the most up to date information. @@ -1196,11 +1159,6 @@ impl RequestResponseProtocol { return } }, - - // Maybe metrics. - _ = self.metrics.next() => { - self.report_metrics(); - }, } } } From 142e08f616a6cb32d59a7160dff4ac103298ea18 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 19:30:16 +0200 Subject: [PATCH 20/29] tests: Adjust testing Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/mod.rs | 6 +- src/protocol/mdns.rs | 8 +- src/protocol/notification/tests/mod.rs | 8 +- src/protocol/request_response/tests.rs | 6 +- src/transport/manager/mod.rs | 178 +++++++++++++++++-------- src/transport/quic/mod.rs | 4 +- src/transport/tcp/mod.rs | 4 +- 7 files changed, 148 insertions(+), 66 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 584d08ad..1587d027 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -1301,7 +1301,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let (transport_service, _tx) = TransportService::new( @@ -1332,7 +1334,7 @@ mod tests { }; ( - Kademlia::new(transport_service, config), + Kademlia::new(transport_service, config, None).unwrap(), Context { _cmd_tx, event_rx }, manager, ) diff --git a/src/protocol/mdns.rs b/src/protocol/mdns.rs index d305a3f7..f5a8562d 100644 --- a/src/protocol/mdns.rs +++ b/src/protocol/mdns.rs @@ -355,7 +355,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let mdns1 = Mdns::new( handle1, @@ -378,7 +380,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let mdns2 = Mdns::new( handle2, diff --git a/src/protocol/notification/tests/mod.rs b/src/protocol/notification/tests/mod.rs index 4aa48aa4..7f972b46 100644 --- a/src/protocol/notification/tests/mod.rs +++ b/src/protocol/notification/tests/mod.rs @@ -57,7 +57,9 @@ fn make_notification_protocol() -> ( BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let (transport_service, tx) = TransportService::new( @@ -84,7 +86,9 @@ fn make_notification_protocol() -> ( transport_service, config, std::sync::Arc::new(DefaultExecutor {}), - ), + None, + ) + .unwrap(), handle, manager, tx, diff --git a/src/protocol/request_response/tests.rs b/src/protocol/request_response/tests.rs index 03cd891e..d8bca2b5 100644 --- a/src/protocol/request_response/tests.rs +++ b/src/protocol/request_response/tests.rs @@ -55,7 +55,9 @@ fn protocol() -> ( BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let (transport_service, tx) = TransportService::new( @@ -70,7 +72,7 @@ fn protocol() -> ( ConfigBuilder::new(ProtocolName::from("/req/1")).with_max_size(1024).build(); ( - RequestResponseProtocol::new(transport_service, config), + RequestResponseProtocol::new(transport_service, config, None).unwrap(), handle, manager, tx, diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 712a6aaa..a15f6177 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -1566,8 +1566,9 @@ mod tests { sink, 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); manager.register_protocol( ProtocolName::from("/notif/1"), Vec::new(), @@ -1593,8 +1594,9 @@ mod tests { sink, 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); manager.register_protocol( ProtocolName::from("/notif/1"), Vec::new(), @@ -1623,8 +1625,9 @@ mod tests { sink, 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); manager.register_protocol( ProtocolName::from("/notif/1"), vec![ @@ -1656,8 +1659,9 @@ mod tests { sink, 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); } @@ -1673,8 +1677,9 @@ mod tests { sink, 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); assert!(manager.dial(local_peer_id).await.is_err()); } @@ -1686,7 +1691,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1716,7 +1723,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let dial_address = Multiaddr::empty() .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) @@ -1778,7 +1787,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1809,7 +1820,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1854,7 +1867,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1873,7 +1888,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1906,8 +1923,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); // ipv6 let address = Multiaddr::empty() .with(Protocol::Ip6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1))) @@ -1968,7 +1986,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2035,7 +2055,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2122,7 +2144,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2207,7 +2231,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); let peer = PeerId::random(); @@ -2316,7 +2342,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); let peer = PeerId::random(); @@ -2412,7 +2440,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); let peer = PeerId::random(); @@ -2521,7 +2551,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); let peer = PeerId::random(); @@ -2625,7 +2657,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); let peer = PeerId::random(); @@ -2769,8 +2803,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); manager.on_dial_failure(ConnectionId::random()).unwrap(); } @@ -2788,7 +2823,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.on_connection_closed(PeerId::random(), ConnectionId::random()).unwrap(); } @@ -2806,7 +2843,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager .on_connection_opened( SupportedTransport::Tcp, @@ -2830,7 +2869,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let connection_id = ConnectionId::random(); let peer = PeerId::random(); @@ -2854,7 +2895,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let connection_id = ConnectionId::random(); let peer = PeerId::random(); @@ -2881,8 +2924,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); manager .on_open_failure(SupportedTransport::Tcp, ConnectionId::random()) .unwrap(); @@ -2902,7 +2946,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let connection_id = ConnectionId::random(); let peer = PeerId::random(); @@ -2922,8 +2968,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); assert!(manager.next().await.is_none()); } @@ -2935,8 +2982,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); let peer = { let peer = PeerId::random(); let mut peers = manager.peers.write(); @@ -2983,8 +3031,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); let peer = { let peer = PeerId::random(); let mut peers = manager.peers.write(); @@ -3046,8 +3095,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); let peer = { let peer = PeerId::random(); let mut peers = manager.peers.write(); @@ -3089,8 +3139,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); // transport doesn't start with ip/dns { let address = Multiaddr::empty().with(Protocol::P2p(Multihash::from(PeerId::random()))); @@ -3155,8 +3206,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); async fn call_manager(manager: &mut TransportManager, address: Multiaddr) { match manager.dial_address(address).await { Err(Error::AddressError(AddressError::PeerIdMissing)) => {} @@ -3209,7 +3261,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let dial_address = Multiaddr::empty() .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) @@ -3295,7 +3349,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let dial_address = Multiaddr::empty() .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) @@ -3385,7 +3441,9 @@ mod tests { ConnectionLimitsConfig::default() .max_incoming_connections(Some(3)) .max_outgoing_connections(Some(2)), - ); + None, + ) + .unwrap(); // The connection limit is agnostic of the underlying transports. manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -3461,7 +3519,9 @@ mod tests { ConnectionLimitsConfig::default() .max_incoming_connections(Some(3)) .max_outgoing_connections(Some(2)), - ); + None, + ) + .unwrap(); // The connection limit is agnostic of the underlying transports. manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -3548,7 +3608,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); // Random peer ID. @@ -3601,7 +3663,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); // Random peer ID. @@ -3753,7 +3817,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let dial_address = Multiaddr::empty() .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) @@ -3839,7 +3905,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let connection_id = ConnectionId::from(0); diff --git a/src/transport/quic/mod.rs b/src/transport/quic/mod.rs index 482e6744..12c5e0db 100644 --- a/src/transport/quic/mod.rs +++ b/src/transport/quic/mod.rs @@ -624,7 +624,7 @@ mod tests { }; let (mut transport1, listen_addresses) = - QuicTransport::new(handle1, Default::default()).unwrap(); + QuicTransport::new(handle1, Default::default(), None).unwrap(); let listen_address = listen_addresses[0].clone(); let keypair2 = Keypair::generate(); @@ -649,7 +649,7 @@ mod tests { )]), }; - let (mut transport2, _) = QuicTransport::new(handle2, Default::default()).unwrap(); + let (mut transport2, _) = QuicTransport::new(handle2, Default::default(), None).unwrap(); let peer1: PeerId = PeerId::from_public_key(&keypair1.public().into()); let _peer2: PeerId = PeerId::from_public_key(&keypair2.public().into()); let listen_address = listen_address.with(Protocol::P2p( diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index a5aef70c..5f120ce5 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -1102,7 +1102,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport( SupportedTransport::Tcp, From 7079ade1d1e5206cd9bcd564346aac509f605e42 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 19:50:55 +0200 Subject: [PATCH 21/29] protocol: To metric name string Signed-off-by: Alexandru Vasile --- src/protocol/notification/mod.rs | 2 +- src/protocol/request_response/mod.rs | 2 +- src/types/protocol.rs | 7 +++++++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/protocol/notification/mod.rs b/src/protocol/notification/mod.rs index 77e44cc2..97b8ed6b 100644 --- a/src/protocol/notification/mod.rs +++ b/src/protocol/notification/mod.rs @@ -308,7 +308,7 @@ impl NotificationProtocol { let (shutdown_tx, shutdown_rx) = channel(DEFAULT_CHANNEL_SIZE); let metrics = if let Some(registry) = registry { - let protocol_name = config.protocol_name.to_string().replace("/", "_"); + let protocol_name = config.protocol_name.to_metric_string(); Some(Metrics { connected_peers: registry.register_gauge( diff --git a/src/protocol/request_response/mod.rs b/src/protocol/request_response/mod.rs index bdc9e2b6..81106b48 100644 --- a/src/protocol/request_response/mod.rs +++ b/src/protocol/request_response/mod.rs @@ -208,7 +208,7 @@ impl RequestResponseProtocol { registry: Option, ) -> Result { let metrics = if let Some(registry) = registry { - let protocol_name = config.protocol_name.to_string().replace("/", "_"); + let protocol_name = config.protocol_name.to_metric_string(); Some(Metrics { connected_peers: registry.register_gauge( diff --git a/src/types/protocol.rs b/src/types/protocol.rs index adbfe8b1..426c01ad 100644 --- a/src/types/protocol.rs +++ b/src/types/protocol.rs @@ -85,6 +85,13 @@ impl PartialEq for ProtocolName { impl Eq for ProtocolName {} +impl ProtocolName { + /// Get the protocol name as a metrics valid string. + pub fn to_metric_string(&self) -> String { + self.to_string().replace("/", "_").replace("-", "_") + } +} + #[cfg(test)] mod tests { use super::*; From e5438533ea00b87fb7f681fb6af46a55bb297a1c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 29 Nov 2024 20:58:19 +0200 Subject: [PATCH 22/29] transport: Simplify collection Signed-off-by: Alexandru Vasile --- src/transport/tcp/mod.rs | 39 +++++++++++----------------------- src/transport/websocket/mod.rs | 37 +++++++++++--------------------- 2 files changed, 24 insertions(+), 52 deletions(-) diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 5f120ce5..10130d7b 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -138,14 +138,6 @@ pub(crate) struct TcpTransport { /// TCP specific metrics. struct TcpMetrics { - /// Interval for collecting metrics. - /// - /// This is a tradeoff we make in favor of simplicity and correctness. - /// An alternative to this would be to complicate the code by collecting - /// individual metrics in each method. This is error prone, as names are - /// easily mismatched, and it's hard to keep track of all the metrics. - interval: tokio::time::Interval, - /// The following metrics are used for the transport itself. pending_dials_num: MetricGauge, pending_inbound_connections_num: MetricGauge, @@ -326,8 +318,6 @@ impl TransportBuilder for TcpTransport { let metrics = if let Some(registry) = registry { Some(TcpMetrics { - interval: tokio::time::interval(Duration::from_secs(15)), - pending_dials_num: registry.register_gauge( "litep2p_tcp_pending_dials".into(), "Litep2p number of pending dials".into(), @@ -636,23 +626,18 @@ impl Stream for TcpTransport { type Item = TransportEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Take the metrics to only poll the tick in case they are enabled. - if let Some(mut metrics) = self.metrics.take() { - if let Poll::Ready(_) = metrics.interval.poll_tick(cx) { - metrics.pending_dials_num.set(self.pending_dials.len() as u64); - metrics - .pending_inbound_connections_num - .set(self.pending_inbound_connections.len() as u64); - metrics.pending_connections_num.set(self.pending_connections.len() as u64); - metrics - .pending_raw_connections_num - .set(self.pending_raw_connections.len() as u64); - metrics.open_raw_connections_num.set(self.opened_raw.len() as u64); - metrics.cancel_futures_num.set(self.cancel_futures.len() as u64); - metrics.pending_open_num.set(self.pending_open.len() as u64); - } - - self.metrics = Some(metrics); + if let Some(metrics) = &self.metrics { + metrics.pending_dials_num.set(self.pending_dials.len() as u64); + metrics + .pending_inbound_connections_num + .set(self.pending_inbound_connections.len() as u64); + metrics.pending_connections_num.set(self.pending_connections.len() as u64); + metrics + .pending_raw_connections_num + .set(self.pending_raw_connections.len() as u64); + metrics.open_raw_connections_num.set(self.opened_raw.len() as u64); + metrics.cancel_futures_num.set(self.cancel_futures.len() as u64); + metrics.pending_open_num.set(self.pending_open.len() as u64); } if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) { diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 48aa87c8..fc2fcef8 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -141,14 +141,6 @@ pub(crate) struct WebSocketTransport { /// Websocket specific metrics. struct WebSocketMetrics { - /// Interval for collecting metrics. - /// - /// This is a tradeoff we make in favor of simplicity and correctness. - /// An alternative to this would be to complicate the code by collecting - /// individual metrics in each method. This is error prone, as names are - /// easily mismatched, and it's hard to keep track of all the metrics. - interval: tokio::time::Interval, - /// The following metrics are used for the transport itself. pending_dials_num: MetricGauge, pending_inbound_connections_num: MetricGauge, @@ -356,8 +348,6 @@ impl TransportBuilder for WebSocketTransport { let metrics = if let Some(registry) = registry { Some(WebSocketMetrics { - interval: tokio::time::interval(Duration::from_secs(15)), - pending_dials_num: registry.register_gauge( "litep2p_websocket_pending_dials".into(), "Litep2p number of pending dials".into(), @@ -676,21 +666,18 @@ impl Stream for WebSocketTransport { type Item = TransportEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Take the metrics to only poll the tick in case they are enabled. - if let Some(mut metrics) = self.metrics.take() { - if let Poll::Ready(_) = metrics.interval.poll_tick(cx) { - metrics.pending_dials_num.set(self.pending_dials.len() as u64); - metrics - .pending_inbound_connections_num - .set(self.pending_inbound_connections.len() as u64); - metrics.pending_connections_num.set(self.pending_connections.len() as u64); - metrics - .pending_raw_connections_num - .set(self.pending_raw_connections.len() as u64); - metrics.open_raw_connections_num.set(self.opened_raw.len() as u64); - metrics.cancel_futures_num.set(self.cancel_futures.len() as u64); - metrics.pending_open_num.set(self.pending_open.len() as u64); - } + if let Some(metrics) = &self.metrics { + metrics.pending_dials_num.set(self.pending_dials.len() as u64); + metrics + .pending_inbound_connections_num + .set(self.pending_inbound_connections.len() as u64); + metrics.pending_connections_num.set(self.pending_connections.len() as u64); + metrics + .pending_raw_connections_num + .set(self.pending_raw_connections.len() as u64); + metrics.open_raw_connections_num.set(self.opened_raw.len() as u64); + metrics.cancel_futures_num.set(self.cancel_futures.len() as u64); + metrics.pending_open_num.set(self.pending_open.len() as u64); } if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) { From 17786b6b22bcbb3b36a27e548b6136574a2823b7 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 3 Dec 2024 10:16:46 +0000 Subject: [PATCH 23/29] tcp: Decrement metrics on connection closed Signed-off-by: Alexandru Vasile --- src/metrics.rs | 6 ++++++ src/transport/tcp/connection.rs | 13 +++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index e144f711..4f65595c 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -48,6 +48,12 @@ pub trait MetricGaugeT: Send + Sync { /// Decrement the gauge. fn dec(&self); + + /// Add `value` to the gauge. + fn add(&self, value: u64); + + /// Subtract `value` from the gauge. + fn sub(&self, value: u64); } /// A registry for metrics. diff --git a/src/transport/tcp/connection.rs b/src/transport/tcp/connection.rs index 085a713b..03b3ca8d 100644 --- a/src/transport/tcp/connection.rs +++ b/src/transport/tcp/connection.rs @@ -742,8 +742,7 @@ impl TcpConnection { } } - /// Start connection event loop. - pub(crate) async fn start(mut self) -> crate::Result<()> { + async fn start_inner(&mut self) -> crate::Result<()> { self.protocol_set .report_connection_established(self.peer, self.endpoint.clone()) .await?; @@ -771,6 +770,16 @@ impl TcpConnection { } } } + + /// Start connection event loop. + pub(crate) async fn start(mut self) -> crate::Result<()> { + self.start_inner().await.inspect_err(|_| { + if let Some(metrics) = &self.metrics { + // All pending substreams must be decremented because the connection is closing. + metrics.pending_substreams_num.sub(self.pending_substreams.len() as u64); + } + }) + } } #[cfg(test)] From e43ee7117eccc6879640c527825d817648e5063d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 3 Dec 2024 10:20:04 +0000 Subject: [PATCH 24/29] websocket: Decrement metrics on connection closed Signed-off-by: Alexandru Vasile --- src/transport/websocket/connection.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/transport/websocket/connection.rs b/src/transport/websocket/connection.rs index 90d77d2e..b5f78e9d 100644 --- a/src/transport/websocket/connection.rs +++ b/src/transport/websocket/connection.rs @@ -436,8 +436,7 @@ impl WebSocketConnection { }) } - /// Start connection event loop. - pub(crate) async fn start(mut self) -> crate::Result<()> { + async fn start_inner(mut self) -> crate::Result<()> { self.protocol_set .report_connection_established(self.peer, self.endpoint) .await?; @@ -602,4 +601,14 @@ impl WebSocketConnection { } } } + + /// Start connection event loop. + pub(crate) async fn start(mut self) -> crate::Result<()> { + self.start_inner().await.inspect_err(|_| { + if let Some(metrics) = &self.metrics { + // All pending substreams must be decremented because the connection is closing. + metrics.pending_substreams_num.sub(self.pending_substreams.len() as u64); + } + }) + } } From fd084c82fc90d4912450708e8e8209ff20a86a0a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 3 Dec 2024 10:25:09 +0000 Subject: [PATCH 25/29] websocket: Use ref mut self Signed-off-by: Alexandru Vasile --- src/transport/websocket/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/websocket/connection.rs b/src/transport/websocket/connection.rs index b5f78e9d..0d2cdc52 100644 --- a/src/transport/websocket/connection.rs +++ b/src/transport/websocket/connection.rs @@ -436,7 +436,7 @@ impl WebSocketConnection { }) } - async fn start_inner(mut self) -> crate::Result<()> { + async fn start_inner(&mut self) -> crate::Result<()> { self.protocol_set .report_connection_established(self.peer, self.endpoint) .await?; From 09bd78696d1a9527dd4bf4907327592496b03833 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 3 Dec 2024 10:27:47 +0000 Subject: [PATCH 26/29] websocket: Fix clone move Signed-off-by: Alexandru Vasile --- src/transport/websocket/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/websocket/connection.rs b/src/transport/websocket/connection.rs index 0d2cdc52..3e0b1d0a 100644 --- a/src/transport/websocket/connection.rs +++ b/src/transport/websocket/connection.rs @@ -438,7 +438,7 @@ impl WebSocketConnection { async fn start_inner(&mut self) -> crate::Result<()> { self.protocol_set - .report_connection_established(self.peer, self.endpoint) + .report_connection_established(self.peer, self.endpoint.clone()) .await?; loop { From cbc273dec98283eddb6b8657353934eb989b5aa2 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 3 Dec 2024 11:54:37 +0000 Subject: [PATCH 27/29] metrics: Add MeteredFuturesStream Signed-off-by: Alexandru Vasile --- src/metrics.rs | 58 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index 4f65595c..73755712 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -23,8 +23,14 @@ //! //! Contains the traits and types that are used to define and interact with metrics. -use crate::Error; -use std::sync::Arc; +use crate::{utils::futures_stream::FuturesStream, Error}; +use futures::{Stream, StreamExt}; +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; pub type MetricCounter = Arc; @@ -86,3 +92,51 @@ impl Drop for ScopeGaugeMetric { self.inner.dec(); } } + +/// Wrapper around [`FuturesStream`] that provides information to the given metric. +pub struct MeteredFuturesStream { + stream: FuturesStream, + metric: MetricGauge, +} + +impl MeteredFuturesStream { + pub fn new(metric: MetricGauge) -> Self { + MeteredFuturesStream { + stream: FuturesStream::new(), + metric, + } + } + + pub fn push(&mut self, future: F) { + self.metric.inc(); + self.stream.push(future); + } + + /// Number of futures in the stream. + pub fn len(&self) -> usize { + self.stream.len() + } + + /// Returns `true` if the stream is empty. + pub fn is_empty(&self) -> bool { + self.stream.len() == 0 + } +} + +impl Stream for MeteredFuturesStream { + type Item = ::Output; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let result = self.stream.poll_next_unpin(cx); + if result.is_ready() { + self.metric.dec(); + } + result + } +} + +impl Drop for MeteredFuturesStream { + fn drop(&mut self) { + self.metric.sub(self.len() as u64); + } +} From deb8b1aee2f7cb1c94372dc4a2a32bacc0bee702 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 3 Dec 2024 13:12:06 +0000 Subject: [PATCH 28/29] metrics: Use MeteredFuturesStream for tcp and websocket Signed-off-by: Alexandru Vasile --- src/metrics.rs | 18 ++++++--- src/transport/tcp/connection.rs | 56 +++++++++++---------------- src/transport/websocket/connection.rs | 52 +++++++++++-------------- 3 files changed, 59 insertions(+), 67 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index 73755712..55a5c6de 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -94,13 +94,14 @@ impl Drop for ScopeGaugeMetric { } /// Wrapper around [`FuturesStream`] that provides information to the given metric. +#[derive(Default)] pub struct MeteredFuturesStream { stream: FuturesStream, - metric: MetricGauge, + metric: Option, } impl MeteredFuturesStream { - pub fn new(metric: MetricGauge) -> Self { + pub fn new(metric: Option) -> Self { MeteredFuturesStream { stream: FuturesStream::new(), metric, @@ -108,7 +109,10 @@ impl MeteredFuturesStream { } pub fn push(&mut self, future: F) { - self.metric.inc(); + if let Some(ref metric) = self.metric { + metric.inc(); + } + self.stream.push(future); } @@ -129,7 +133,9 @@ impl Stream for MeteredFuturesStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let result = self.stream.poll_next_unpin(cx); if result.is_ready() { - self.metric.dec(); + if let Some(ref metric) = self.metric { + metric.dec(); + } } result } @@ -137,6 +143,8 @@ impl Stream for MeteredFuturesStream { impl Drop for MeteredFuturesStream { fn drop(&mut self) { - self.metric.sub(self.len() as u64); + if let Some(ref metric) = self.metric { + metric.sub(self.len() as u64); + } } } diff --git a/src/transport/tcp/connection.rs b/src/transport/tcp/connection.rs index 03b3ca8d..eb7aa831 100644 --- a/src/transport/tcp/connection.rs +++ b/src/transport/tcp/connection.rs @@ -25,7 +25,7 @@ use crate::{ noise::{self, NoiseSocket}, }, error::{Error, NegotiationError, SubstreamError}, - metrics::{MetricGauge, ScopeGaugeMetric}, + metrics::{MeteredFuturesStream, MetricGauge, ScopeGaugeMetric}, multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}, protocol::{Direction, Permit, ProtocolCommand, ProtocolSet}, substream, @@ -38,11 +38,7 @@ use crate::{ BandwidthSink, PeerId, }; -use futures::{ - future::BoxFuture, - stream::{FuturesUnordered, StreamExt}, - AsyncRead, AsyncWrite, -}; +use futures::{future::BoxFuture, stream::StreamExt, AsyncRead, AsyncWrite}; use multiaddr::{Multiaddr, Protocol}; use tokio::net::TcpStream; use tokio_util::compat::{ @@ -179,10 +175,11 @@ pub struct TcpConnection { /// Pending substreams. pending_substreams: - FuturesUnordered>>, + MeteredFuturesStream>>, - /// Metrics. - metrics: Option, + /// Metric incremented when the connection starts + /// and decremented when the connection closes. + _active_connections_num: Option, } impl fmt::Debug for TcpConnection { @@ -211,6 +208,15 @@ impl TcpConnection { substream_open_timeout, } = context; + let (pending_substreams_num, _active_connections_num) = if let Some(metrics) = metrics { + ( + Some(metrics.pending_substreams_num), + Some(metrics._active_connections_num), + ) + } else { + (None, None) + }; + Self { protocol_set, connection, @@ -219,9 +225,9 @@ impl TcpConnection { endpoint, bandwidth_sink, next_substream_id, - pending_substreams: FuturesUnordered::new(), substream_open_timeout, - metrics, + pending_substreams: MeteredFuturesStream::new(pending_substreams_num), + _active_connections_num, } } @@ -551,9 +557,6 @@ impl TcpConnection { }), } })); - if let Some(metrics) = &self.metrics { - metrics.pending_substreams_num.inc(); - } Ok(false) } @@ -713,9 +716,6 @@ impl TcpConnection { }), } })); - if let Some(metrics) = &self.metrics { - metrics.pending_substreams_num.inc(); - } Ok(false) } @@ -742,7 +742,8 @@ impl TcpConnection { } } - async fn start_inner(&mut self) -> crate::Result<()> { + /// Start connection event loop. + pub(crate) async fn start(mut self) -> crate::Result<()> { self.protocol_set .report_connection_established(self.peer, self.endpoint.clone()) .await?; @@ -754,11 +755,10 @@ impl TcpConnection { return Ok(()); } }, - substream = self.pending_substreams.select_next_some(), if !self.pending_substreams.is_empty() => { - if let Some(metrics) = &self.metrics { - // This must be decremented and not set because the metric is shared across connections. - metrics.pending_substreams_num.dec(); - } + substream = self.pending_substreams.next(), if !self.pending_substreams.is_empty() => { + let Some(substream) = substream else { + continue; + }; self.handle_negotiated_substream(substream).await; } @@ -770,16 +770,6 @@ impl TcpConnection { } } } - - /// Start connection event loop. - pub(crate) async fn start(mut self) -> crate::Result<()> { - self.start_inner().await.inspect_err(|_| { - if let Some(metrics) = &self.metrics { - // All pending substreams must be decremented because the connection is closing. - metrics.pending_substreams_num.sub(self.pending_substreams.len() as u64); - } - }) - } } #[cfg(test)] diff --git a/src/transport/websocket/connection.rs b/src/transport/websocket/connection.rs index 3e0b1d0a..c1df2032 100644 --- a/src/transport/websocket/connection.rs +++ b/src/transport/websocket/connection.rs @@ -25,7 +25,7 @@ use crate::{ noise::{self, NoiseSocket}, }, error::{Error, NegotiationError, SubstreamError}, - metrics::{MetricGauge, ScopeGaugeMetric}, + metrics::{MeteredFuturesStream, MetricGauge, ScopeGaugeMetric}, multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}, protocol::{Direction, Permit, ProtocolCommand, ProtocolSet}, substream, @@ -37,7 +37,7 @@ use crate::{ BandwidthSink, PeerId, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, AsyncRead, AsyncWrite, StreamExt}; +use futures::{future::BoxFuture, AsyncRead, AsyncWrite, StreamExt}; use multiaddr::{multihash::Multihash, Multiaddr, Protocol}; use tokio::net::TcpStream; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; @@ -170,10 +170,11 @@ pub(crate) struct WebSocketConnection { /// Pending substreams. pending_substreams: - FuturesUnordered>>, + MeteredFuturesStream>>, - /// Metrics. - metrics: Option, + /// Metric incremented when the connection starts + /// and decremented when the connection closes. + _active_connections_num: Option, } impl WebSocketConnection { @@ -192,6 +193,15 @@ impl WebSocketConnection { control, } = connection; + let (pending_substreams_num, _active_connections_num) = if let Some(metrics) = metrics { + ( + Some(metrics.pending_substreams_num), + Some(metrics._active_connections_num), + ) + } else { + (None, None) + }; + Self { connection_id: endpoint.connection_id(), protocol_set, @@ -201,8 +211,8 @@ impl WebSocketConnection { endpoint, bandwidth_sink, substream_open_timeout, - pending_substreams: FuturesUnordered::new(), - metrics, + pending_substreams: MeteredFuturesStream::new(pending_substreams_num), + _active_connections_num, } } @@ -436,7 +446,8 @@ impl WebSocketConnection { }) } - async fn start_inner(&mut self) -> crate::Result<()> { + /// Start connection event loop. + pub(crate) async fn start(mut self) -> crate::Result<()> { self.protocol_set .report_connection_established(self.peer, self.endpoint.clone()) .await?; @@ -469,9 +480,6 @@ impl WebSocketConnection { }), } })); - if let Some(metrics) = &self.metrics { - metrics.pending_substreams_num.inc(); - } }, Some(Err(error)) => { tracing::debug!( @@ -492,11 +500,10 @@ impl WebSocketConnection { } }, // TODO: move this to a function - substream = self.pending_substreams.select_next_some(), if !self.pending_substreams.is_empty() => { - if let Some(metrics) = &self.metrics { - // This must be decremented and not set because the metric is shared across connections. - metrics.pending_substreams_num.dec(); - } + substream = self.pending_substreams.next(), if !self.pending_substreams.is_empty() => { + let Some(substream) = substream else { + continue; + }; match substream { // TODO: return error to protocol @@ -579,9 +586,6 @@ impl WebSocketConnection { }), } })); - if let Some(metrics) = &self.metrics { - metrics.pending_substreams_num.inc(); - } } Some(ProtocolCommand::ForceClose) => { tracing::debug!( @@ -601,14 +605,4 @@ impl WebSocketConnection { } } } - - /// Start connection event loop. - pub(crate) async fn start(mut self) -> crate::Result<()> { - self.start_inner().await.inspect_err(|_| { - if let Some(metrics) = &self.metrics { - // All pending substreams must be decremented because the connection is closing. - metrics.pending_substreams_num.sub(self.pending_substreams.len() as u64); - } - }) - } } From 125ee19ba2dbb033af28ad67fb0c62f139f4a785 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 3 Dec 2024 14:05:31 +0000 Subject: [PATCH 29/29] Fix docs Signed-off-by: Alexandru Vasile --- src/metrics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/metrics.rs b/src/metrics.rs index 55a5c6de..559e02a9 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -93,7 +93,7 @@ impl Drop for ScopeGaugeMetric { } } -/// Wrapper around [`FuturesStream`] that provides information to the given metric. +/// Wrapper around `FuturesStream` that provides information to the given metric. #[derive(Default)] pub struct MeteredFuturesStream { stream: FuturesStream,