From a3c7499ba0b42c350a0b4aa79c77e7beca719fca Mon Sep 17 00:00:00 2001 From: Ben Matase Date: Tue, 23 Sep 2025 21:36:19 -0400 Subject: [PATCH] Allow mach to run in non-tls mode --- Cargo.lock | 1 + cli/Cargo.toml | 3 +- cli/src/aim_report.rs | 14 ++- cli/src/args/rpm.rs | 5 ++ cli/src/latency.rs | 2 + cli/src/packet_loss.rs | 1 + cli/src/report.rs | 17 ++-- cli/src/rpm.rs | 46 ++++++++-- cli/src/up_down.rs | 6 +- crates/nq-core/src/client.rs | 95 ++++++++++++++++++++- crates/nq-core/src/connection/http.rs | 48 +++++++++-- crates/nq-core/src/connection/map.rs | 25 +++++- crates/nq-core/src/lib.rs | 1 + crates/nq-latency/src/lib.rs | 17 +++- crates/nq-load-generator/src/lib.rs | 19 ++++- crates/nq-packetloss/src/lib.rs | 3 +- crates/nq-rpm/src/lib.rs | 118 ++++++++++++++++++++++---- crates/nq-tokio-network/src/lib.rs | 16 ++-- 18 files changed, 374 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fbe3931..ad76d84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1179,6 +1179,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "url", ] [[package]] diff --git a/cli/Cargo.toml b/cli/Cargo.toml index b15e86d..4a39928 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -27,4 +27,5 @@ serde_json = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "time", "net", "macros",] } tokio-util = { workspace = true } tracing = { workspace = true } -tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } \ No newline at end of file +tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } +url = { workspace = true } \ No newline at end of file diff --git a/cli/src/aim_report.rs b/cli/src/aim_report.rs index 960973a..36e05c4 100644 --- a/cli/src/aim_report.rs +++ b/cli/src/aim_report.rs @@ -16,6 +16,7 @@ use nq_tokio_network::TokioNetwork; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; use tracing::debug; +use url::Url; use crate::util::{pretty_ms, pretty_secs_to_ms}; @@ -84,13 +85,22 @@ impl CloudflareAimResults { pub async fn upload(&self) -> anyhow::Result<()> { let results = self.clone(); - let origin = self.origin.clone(); + // Always report over HTTPS regardless of --no-tls test mode. If an http origin was + // provided (should not normally happen), rewrite scheme to https for the upload. + let mut origin = self.origin.clone(); + if let Ok(mut parsed) = Url::parse(&origin) { + if parsed.scheme() != "https" { + let _ = parsed.set_scheme("https"); + origin = parsed.to_string(); + } + } let shutdown = CancellationToken::new(); let time = Arc::new(TokioTime::new()); let network = Arc::new(TokioNetwork::new( Arc::clone(&time) as Arc, shutdown, + false, )); let mut headers = HeaderMap::new(); @@ -98,8 +108,8 @@ impl CloudflareAimResults { headers.append("Content-Type", HeaderValue::from_static("application/json")); let body = serde_json::to_string(&results).unwrap(); + // Force a fresh H2 (TLS) connection for AIM upload. let response = Client::default() - .new_connection(nq_core::ConnectionType::H2) .new_connection(nq_core::ConnectionType::H2) .headers(headers) .method("POST") diff --git a/cli/src/args/rpm.rs b/cli/src/args/rpm.rs index f200914..d67ab2b 100644 --- a/cli/src/args/rpm.rs +++ b/cli/src/args/rpm.rs @@ -69,6 +69,10 @@ pub struct RpmArgs { /// https://blog.cloudflare.com/aim-database-for-internet-quality/ #[clap(long)] pub disable_aim_scores: bool, + /// Disable TLS for H1 connections (plain TCP). When set, HTTP/1.1 is used + /// without a TLS handshake; otherwise HTTP/2 is preferred. + #[clap(long = "no-tls")] + pub no_tls: bool, } impl Default for RpmArgs { @@ -86,6 +90,7 @@ impl Default for RpmArgs { interval_duration_ms: 1000, // 1s test_duration_ms: 12_000, // 12s disable_aim_scores: false, + no_tls: false, } } } diff --git a/cli/src/latency.rs b/cli/src/latency.rs index 1bcec60..e4216b6 100644 --- a/cli/src/latency.rs +++ b/cli/src/latency.rs @@ -22,6 +22,7 @@ pub async fn run(url: String, runs: usize) -> anyhow::Result<()> { let result = run_test(&LatencyConfig { url: url.parse()?, runs, + no_tls: false, }) .await?; @@ -48,6 +49,7 @@ pub async fn run_test(config: &LatencyConfig) -> anyhow::Result { let network = Arc::new(TokioNetwork::new( Arc::clone(&time), shutdown.clone(), + config.no_tls, )) as Arc; let rtt = Latency::new(config.clone()); diff --git a/cli/src/packet_loss.rs b/cli/src/packet_loss.rs index fccd837..13d75a8 100644 --- a/cli/src/packet_loss.rs +++ b/cli/src/packet_loss.rs @@ -49,6 +49,7 @@ async fn fetch_turn_server_creds( let network = Arc::new(TokioNetwork::new( Arc::clone(&time) as Arc, shutdown.clone(), + false, )); let host = config diff --git a/cli/src/report.rs b/cli/src/report.rs index d7aba1c..1449304 100644 --- a/cli/src/report.rs +++ b/cli/src/report.rs @@ -55,13 +55,18 @@ struct RpmReport { impl RpmReport { pub fn from_rpm_result(result: &ResponsivenessResult) -> anyhow::Result { + let throughput = result.throughput().context("no throughputs available")?; + let loaded_latency_ms = match result.self_probe_latencies.quantile(0.5).map(pretty_ms) { + Some(v) => v, + None => { + tracing::warn!("no loaded latency measurements; defaulting to 0ms"); + 0.0 + } + }; + Ok(RpmReport { - throughput: result.throughput().context("no throughputs available")?, - loaded_latency_ms: result - .self_probe_latencies - .quantile(0.5) - .map(pretty_ms) - .context("no loaded latency measurements")?, + throughput, + loaded_latency_ms, rpm: result.rpm as usize, }) } diff --git a/cli/src/rpm.rs b/cli/src/rpm.rs index 82ec8e3..3661e3d 100644 --- a/cli/src/rpm.rs +++ b/cli/src/rpm.rs @@ -25,10 +25,11 @@ use crate::util::pretty_secs_to_ms; pub async fn run(cli_config: RpmArgs) -> anyhow::Result<()> { info!("running responsiveness test"); - let rpm_urls = match cli_config.config.clone() { + + let mut rpm_urls = match cli_config.config.clone() { Some(endpoint) => { info!("fetching configuration from {endpoint}"); - let urls = get_rpm_config(endpoint).await?.urls; + let urls = get_rpm_config(endpoint, cli_config.no_tls).await?.urls; info!("retrieved configuration urls: {urls:?}"); urls @@ -45,11 +46,41 @@ pub async fn run(cli_config: RpmArgs) -> anyhow::Result<()> { } }; + // If --no-tls is specified, automatically convert provided HTTPS test URLs to HTTP. + // We only rewrite schemes; host, path, query, fragment remain unchanged. + if cli_config.no_tls { + let downgrade = |orig: &str| -> String { + if let Ok(mut url) = url::Url::parse(orig) { + if url.scheme() == "https" { + // Ignore result of set_scheme (fails only if new scheme invalid length per spec). + let _ = url.set_scheme("http"); + return url.to_string(); + } + } + orig.to_string() + }; + + let original = rpm_urls.clone(); + rpm_urls.small_https_download_url = downgrade(&rpm_urls.small_https_download_url); + rpm_urls.large_https_download_url = downgrade(&rpm_urls.large_https_download_url); + rpm_urls.https_upload_url = downgrade(&rpm_urls.https_upload_url); + info!( + "converted urls for --no-tls: small: {} -> {}, large: {} -> {}, upload: {} -> {}", + original.small_https_download_url, + rpm_urls.small_https_download_url, + original.large_https_download_url, + rpm_urls.large_https_download_url, + original.https_upload_url, + rpm_urls.https_upload_url + ); + } + // first get unloaded RTT measurements info!("determining unloaded latency"); let rtt_result = crate::latency::run_test(&LatencyConfig { url: rpm_urls.small_https_download_url.parse()?, runs: 20, + no_tls: cli_config.no_tls, }) .await?; info!( @@ -74,6 +105,7 @@ pub async fn run(cli_config: RpmArgs) -> anyhow::Result<()> { trimmed_mean_percent: cli_config.trimmed_mean_percent, std_tolerance: cli_config.std_tolerance, max_loaded_connections: cli_config.max_loaded_connections, + no_tls: cli_config.no_tls, }; info!("running download test"); @@ -120,6 +152,7 @@ async fn run_test( let network = Arc::new(TokioNetwork::new( Arc::clone(&time), shutdown.clone().into(), + config.no_tls, )) as Arc; let rpm = Responsiveness::new(config.clone(), download)?; @@ -139,7 +172,7 @@ pub struct RpmServerConfig { urls: RpmUrls, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct RpmUrls { #[serde(alias = "small_download_url")] small_https_download_url: String, @@ -149,16 +182,19 @@ pub struct RpmUrls { https_upload_url: String, } -pub async fn get_rpm_config(config_url: String) -> anyhow::Result { +pub async fn get_rpm_config(config_url: String, no_tls: bool) -> anyhow::Result { let shutdown = CancellationToken::new(); let time = Arc::new(TokioTime::new()); let network = Arc::new(TokioNetwork::new( Arc::clone(&time) as Arc, shutdown.clone(), + no_tls, )); + let conn_type = if no_tls { ConnectionType::H1 } else { ConnectionType::H2 }; let response = Client::default() - .new_connection(ConnectionType::H2) + .plain_http_mode(no_tls) + .new_connection(conn_type) .method("GET") .send( config_url.parse().context("parsing rpm config url")?, diff --git a/cli/src/up_down.rs b/cli/src/up_down.rs index 523ff67..6159fe5 100644 --- a/cli/src/up_down.rs +++ b/cli/src/up_down.rs @@ -20,8 +20,7 @@ use serde_json::json; pub async fn download(args: DownloadArgs) -> anyhow::Result<()> { let shutdown = CancellationToken::new(); let time = Arc::new(TokioTime::new()) as Arc; - let network = - Arc::new(TokioNetwork::new(Arc::clone(&time), shutdown.clone())) as Arc; + let network = Arc::new(TokioNetwork::new(Arc::clone(&time), shutdown.clone(), false)) as Arc; let conn_type = match args.conn_type { ConnType::H1 => ConnectionType::H1, @@ -87,8 +86,7 @@ pub async fn download(args: DownloadArgs) -> anyhow::Result<()> { pub async fn upload(args: UploadArgs) -> anyhow::Result<()> { let shutdown = CancellationToken::new(); let time = Arc::new(TokioTime::new()) as Arc; - let network = - Arc::new(TokioNetwork::new(Arc::clone(&time), shutdown.clone())) as Arc; + let network = Arc::new(TokioNetwork::new(Arc::clone(&time), shutdown.clone(), false)) as Arc; let conn_type = match args.conn_type { ConnType::H1 => ConnectionType::H1, // ConnectionType::H1, diff --git a/crates/nq-core/src/client.rs b/crates/nq-core/src/client.rs index ea32636..c8fd2c4 100644 --- a/crates/nq-core/src/client.rs +++ b/crates/nq-core/src/client.rs @@ -49,6 +49,7 @@ pub struct ThroughputClient { new_connection_type: Option, headers: Option, direction: Direction, + plain_http_mode: bool, } impl ThroughputClient { @@ -59,6 +60,7 @@ impl ThroughputClient { new_connection_type: None, headers: None, direction: Direction::Down, + plain_http_mode: false, } } @@ -69,6 +71,7 @@ impl ThroughputClient { new_connection_type: None, headers: None, direction: Direction::Up(size), + plain_http_mode: false, } } @@ -90,6 +93,14 @@ impl ThroughputClient { self } + /// Enable plain HTTP mode (no TLS) specific header adjustments. + /// + /// In this mode additional headers (Accept, Connection, Content-Length) + /// are injected to better emulate typical plain HTTP/1.1 client + /// behavior. Leaving this disabled preserves the minimal header set used + /// previously for HTTPS tests. + pub fn plain_http_mode(mut self, yes: bool) -> Self { self.plain_http_mode = yes; self } + /// Execute a download or upload request against the given [`Uri`]. #[tracing::instrument(skip(self, network, time, shutdown))] pub fn send( @@ -105,8 +116,11 @@ impl ThroughputClient { headers.insert("User-Agent", HeaderValue::from_static("mach/0.1.0")); } - let host = uri.host().context("uri is missing a host")?.to_string(); - let host_with_port = format!("{}:{}", host, uri.port_u16().unwrap_or(443)); + + let host = uri.host().context("uri is missing a host")?.to_string(); + // Use correct default port based on scheme so HTTPS downloads go to 443. + let default_port = match uri.scheme_str() { Some("https") => 443, _ => 80 }; + let host_with_port = format!("{}:{}", host, uri.port_u16().unwrap_or(default_port)); let method = match self.direction { Direction::Down => "GET", @@ -144,6 +158,31 @@ impl ThroughputClient { tracing::debug!("created request"); *request.headers_mut() = headers.clone(); + // Capture URI parts before mutable borrow + let uri_host = request.uri().host().map(|s| s.to_string()); + let uri_port = request.uri().port_u16(); + let uri_scheme = request.uri().scheme_str().map(|s| s.to_string()); + let h = request.headers_mut(); + if let Some(host_hdr) = uri_host.as_deref() { + let scheme = uri_scheme.as_deref(); + let default_port = match scheme { Some("https") => 443, _ => 80 }; + let need_port = uri_port.is_some() && uri_port.unwrap() != default_port; + let host_value = if need_port { format!("{}:{}", host_hdr, uri_port.unwrap()) } else { host_hdr +.to_string() }; + if let Ok(val) = http::HeaderValue::from_str(&host_value) { h.insert("Host", val); } + } + if self.plain_http_mode { + h.insert("Accept", http::HeaderValue::from_static("*/*")); + h.insert("Connection", http::HeaderValue::from_static("keep-alive")); + if let Direction::Up(size) = self.direction { + if !h.contains_key("Content-Length") { + if let Ok(val) = http::HeaderValue::from_str(&size.to_string()) { + h.insert("Content-Length", val); + } + } + } + } + tokio::spawn( async move { @@ -194,6 +233,8 @@ impl ThroughputClient { } let (parts, incoming) = response_fut.await?.into_parts(); + tracing::info!(status=?parts.status, headers=?parts.headers, "received response headers (upload)"); + info!("upload response parts: {:?}", parts); incoming.boxed() @@ -262,6 +303,7 @@ pub struct Client { new_connection_type: Option, headers: Option, method: Option, + plain_http_mode: bool, } impl Client { @@ -289,6 +331,14 @@ impl Client { self } + /// Enable or disable plain (non-TLS) HTTP mode specific header normalization. + /// + /// When set to `true`, the client will add extra headers such as + /// `Accept: */*`, `Connection: keep-alive`, and (for sized bodies) a + /// `Content-Length`. In TLS mode + /// (`false`) these are omitted to preserve the original HTTPS behavior. + pub fn plain_http_mode(mut self, yes: bool) -> Self { self.plain_http_mode = yes; self } + /// Send a request to the given uri with the given body, timing how long it /// took. #[tracing::instrument(skip(self, body, network, time))] @@ -310,19 +360,58 @@ impl Client { let host = uri.host().context("uri is missing a host")?.to_string(); - let remote_addr = (host.as_str(), uri.port_u16().unwrap_or(443)) + // Choose default port based on scheme (https -> 443, http -> 80, else 443) + let default_port = match uri.scheme_str() { + Some("https") => 443, + Some("http") => 80, + _ => 443, + }; + let remote_addr = (host.as_str(), uri.port_u16().unwrap_or(default_port)) .to_socket_addrs()? .next() .context("could not resolve large download url")?; let method: http::Method = self.method.as_deref().unwrap_or("GET").parse()?; + // Add Content-Length if body reports an exact size and header not already set. + if !headers.contains_key("Content-Length") { + let size_hint = body.size_hint(); + if let Some(exact) = size_hint.exact() { + if let Ok(v) = HeaderValue::from_str(&exact.to_string()) { + headers.insert("Content-Length", v); + } + } + } + let mut request = http::Request::builder() .method(method) .uri(uri) .body(body.boxed())?; *request.headers_mut() = headers.clone(); + // Always ensure Host header is present (required for HTTP/1.1). Only include port + // if it differs from the default for the scheme. + { + let uri_host = request.uri().host().map(|s| s.to_string()); + if let Some(host_hdr) = uri_host.as_deref() { + let uri_port = request.uri().port_u16(); + let scheme = request.uri().scheme_str(); + let default_port = match scheme { Some("https") => 443, Some("http") => 80, _ => 443 }; + let need_port = uri_port.is_some() && uri_port.unwrap() != default_port; + let host_value = if need_port { format!("{}:{}", host_hdr, uri_port.unwrap()) } else { host_hdr.to_string() }; + let h = request.headers_mut(); + if !h.contains_key("Host") { + if let Ok(val) = http::HeaderValue::from_str(&host_value) { h.insert("Host", val); } + } + } + } + + // Plain HTTP (no TLS) mode header normalization + if self.plain_http_mode { + let h = request.headers_mut(); + h.insert("Accept", http::HeaderValue::from_static("*/*")); + h.insert("Connection", http::HeaderValue::from_static("keep-alive")); + } debug!("sending request"); diff --git a/crates/nq-core/src/connection/http.rs b/crates/nq-core/src/connection/http.rs index c7e3420..94d107a 100644 --- a/crates/nq-core/src/connection/http.rs +++ b/crates/nq-core/src/connection/http.rs @@ -77,10 +77,14 @@ pub async fn tls_connection( builder.set_verify_cert_store(store_builder.build())?; builder.set_verify(SslVerifyMode::PEER); + // Offer ALPN protocols. For H2 we also offer http/1.1 as a fallback so that if the + // server declines h2 we can downgrade gracefully instead of sending an h2 preface + // on a connection the server expects to speak HTTP/1.1 (which leads to an immediate RST). + // Wire format: each protocol is prefixed with length byte. let alpn: &[u8] = match conn_type { ConnectionType::H1 => b"\x08http/1.1", - ConnectionType::H2 => b"\x02h2", - ConnectionType::H3 => b"\x02h3", + ConnectionType::H2 => b"\x02h2\x08http/1.1", + ConnectionType::H3 => b"\x02h3", // TODO: QUIC required; placeholder }; builder.set_alpn_protos(alpn)?; @@ -91,8 +95,12 @@ pub async fn tls_connection( .map_err(|e| anyhow::anyhow!("unable to create tls stream: {e}"))?; timing.set_secure(time.now()); - - debug!("created tls connection"); + let negotiated_alpn = ssl_stream + .ssl() + .selected_alpn_protocol() + .map(|p| String::from_utf8_lossy(p).to_string()) + .unwrap_or_else(|| "".to_string()); + debug!(alpn=%negotiated_alpn, "created tls connection"); Ok(ssl_stream) } @@ -139,10 +147,40 @@ pub async fn start_h2_conn( addr: SocketAddr, domain: String, mut timing: ConnectionTiming, - io: impl ByteStream, + io: TlsStream, time: &dyn Time, shutdown: CancellationToken, ) -> anyhow::Result { + // Inspect negotiated ALPN directly from the TLS stream. + let negotiated = io + .ssl() + .selected_alpn_protocol() + .map(|p| String::from_utf8_lossy(p).to_string()); + + match negotiated.as_deref() { + Some("h2") => debug!(alpn="h2", "proceeding with h2 handshake"), + Some("http/1.1") | None | Some("") => { + debug!(alpn=?negotiated, "ALPN not h2; falling back to HTTP/1.1 on existing TLS session"); + let (send_request, connection) = http1::handshake(TokioIo::new(io)).await?; + timing.set_application(time.now()); + tokio::spawn( + async move { + select! { + Err(e) = connection => { error!(error=%e, "error running h1(fallback) connection"); } + _ = shutdown.cancelled() => { debug!("shutting down h1(fallback) connection"); } + } + info!("connection finished"); + } + .in_current_span(), + ); + info!(?timing, "established fallback h1 connection"); + return Ok(EstablishedConnection::new(timing, SendRequest::H1 { dispatch: send_request })); + } + Some(other) => { + debug!(alpn=other, "unexpected ALPN; attempting h2 anyway"); + } + } + let (dispatch, connection) = http2::handshake(TokioExecutor, TokioIo::new(io)).await?; timing.set_application(time.now()); diff --git a/crates/nq-core/src/connection/map.rs b/crates/nq-core/src/connection/map.rs index d970c58..87e4f45 100644 --- a/crates/nq-core/src/connection/map.rs +++ b/crates/nq-core/src/connection/map.rs @@ -21,9 +21,20 @@ use crate::util::ByteStream; use crate::{ConnectionTiming, ConnectionType, ResponseFuture, Time}; /// Creates and holds [`EstablishedConnection`]s in a VecDeque. -#[derive(Default, Debug)] +#[derive(Debug)] pub struct ConnectionManager { connections: RwLock>>>, + no_tls: bool, +} + +impl Default for ConnectionManager { + fn default() -> Self { Self { connections: RwLock::new(VecDeque::new()), no_tls: false } } +} + +impl ConnectionManager { + /// Create a new `ConnectionManager` with the provided `no_tls` flag controlling + /// whether H1 connections skip the TLS handshake. + pub fn new(no_tls: bool) -> Self { Self { connections: RwLock::new(VecDeque::new()), no_tls } } } impl ConnectionManager { @@ -41,8 +52,16 @@ impl ConnectionManager { ) -> Result>> { let connection = match conn_type { ConnectionType::H1 => { - let stream = tls_connection(conn_type, &domain, &mut timing, io, time).await?; - start_h1_conn(domain, timing, stream, time, shutdown).await? + if self.no_tls { + info!("Starting plain H1 (no TLS) connection to {}", domain); + // Mark secure time immediately since no TLS handshake occurs. + timing.set_secure(time.now()); + start_h1_conn(domain, timing, io, time, shutdown).await? + } else { + info!("Starting TLS H1 connection to {}", domain); + let stream = tls_connection(conn_type, &domain, &mut timing, io, time).await?; + start_h1_conn(domain, timing, stream, time, shutdown).await? + } } ConnectionType::H2 => { let stream = tls_connection(conn_type, &domain, &mut timing, io, time).await?; diff --git a/crates/nq-core/src/lib.rs b/crates/nq-core/src/lib.rs index a0493b1..549e1ee 100644 --- a/crates/nq-core/src/lib.rs +++ b/crates/nq-core/src/lib.rs @@ -26,5 +26,6 @@ pub use crate::{ util::{oneshot_result, OneshotResult, ResponseFuture}, }; + pub use anyhow::Error; pub use anyhow::Result; diff --git a/crates/nq-latency/src/lib.rs b/crates/nq-latency/src/lib.rs index 8d1f570..b3f2401 100644 --- a/crates/nq-latency/src/lib.rs +++ b/crates/nq-latency/src/lib.rs @@ -17,6 +17,9 @@ use url::Url; pub struct LatencyConfig { pub url: Url, pub runs: usize, + /// Disable TLS for H1 connections (plain TCP). Used when higher level + /// command passes the `--no-tls` flag. + pub no_tls: bool, } impl Default for LatencyConfig { @@ -26,6 +29,7 @@ impl Default for LatencyConfig { .parse() .unwrap(), runs: 20, + no_tls: false, } } } @@ -61,7 +65,14 @@ impl Latency { let host = url .host_str() .context("small download url must have a domain")?; - let host_with_port = format!("{}:{}", host, url.port_or_known_default().unwrap_or(443)); + // Use explicit default ports based on scheme: 443 for HTTPS, 80 for HTTP, else fallback to 443. + let default_port = match url.scheme() { + "https" => 443, + "http" => 80, + _ => 443, + }; + let port = url.port().unwrap_or(default_port); + let host_with_port = format!("{}:{}", host, port); let conn_start = time.now(); @@ -71,8 +82,10 @@ impl Latency { .context("unable to resolve host")?; let time_lookup = time.now(); + // Use H1 only when TLS is disabled; otherwise prefer H2. + let conn_type = if self.config.no_tls { ConnectionType::H1 } else { ConnectionType::H2 }; let connection = network - .new_connection(conn_start, addrs[0], host.to_string(), ConnectionType::H1) + .new_connection(conn_start, addrs[0], host.to_string(), conn_type) .await .context("unable to create new connection")?; { diff --git a/crates/nq-load-generator/src/lib.rs b/crates/nq-load-generator/src/lib.rs index 6f80d2a..3092e13 100644 --- a/crates/nq-load-generator/src/lib.rs +++ b/crates/nq-load-generator/src/lib.rs @@ -24,6 +24,7 @@ pub struct LoadConfig { pub download_url: url::Url, pub upload_url: url::Url, pub upload_size: usize, + pub no_tls: bool, } pub struct LoadGenerator { @@ -62,8 +63,8 @@ impl LoadGenerator { let (tx, rx) = oneshot_result(); let client = match direction { - Direction::Down => ThroughputClient::download(), - Direction::Up(size) => ThroughputClient::upload(size), + Direction::Down => ThroughputClient::download().plain_http_mode(self.config.no_tls), + Direction::Up(size) => ThroughputClient::upload(size).plain_http_mode(self.config.no_tls), }; let response_fut = client @@ -115,6 +116,20 @@ impl LoadGenerator { .map(|c| c.connection.clone()) } + /// Select a random finished (idle) connection that has completed its load but + /// is still kept alive so it can be re-used for a self probe in non-multiplexed + /// protocols (e.g. HTTP/1.1) to avoid incurring connection setup latency. + pub fn random_finished_connection(&self) -> Option>> { + let finished: Vec<_> = self + .loads + .iter() + .filter(|l| l.finished_at.is_some()) + .collect(); + finished + .choose(&mut rand::thread_rng()) + .map(|c| c.connection.clone()) + } + pub fn push(&mut self, loaded_connection: LoadedConnection) { self.loads.push(loaded_connection); } diff --git a/crates/nq-packetloss/src/lib.rs b/crates/nq-packetloss/src/lib.rs index 6756dd2..f6bd4d6 100644 --- a/crates/nq-packetloss/src/lib.rs +++ b/crates/nq-packetloss/src/lib.rs @@ -69,6 +69,7 @@ impl PacketLossConfig { download_url: self.download_url.clone(), upload_url: self.upload_url.clone(), upload_size: 4_000_000_000, // 4 GB + no_tls: false, } } } @@ -104,7 +105,7 @@ impl PacketLoss { // Start generating load on the network in both directions let time = Arc::new(TokioTime::new()) as Arc; let network = - Arc::new(TokioNetwork::new(Arc::clone(&time), shutdown.clone())) as Arc; + Arc::new(TokioNetwork::new(Arc::clone(&time), shutdown.clone(), false)) as Arc; self.new_load_generating_connection( packet_event_tx.clone(), diff --git a/crates/nq-rpm/src/lib.rs b/crates/nq-rpm/src/lib.rs index eb40792..8cd7f57 100644 --- a/crates/nq-rpm/src/lib.rs +++ b/crates/nq-rpm/src/lib.rs @@ -13,8 +13,9 @@ use std::{ use humansize::{format_size, DECIMAL}; use nq_core::{ client::{wait_for_finish, Direction, ThroughputClient}, - ConnectionType, Network, Time, Timestamp, + ConnectionType, Network, Time, Timestamp, EstablishedConnection, }; +use tokio::sync::RwLock; use nq_load_generator::{LoadConfig, LoadGenerator, LoadedConnection}; use nq_stats::{instant_minus_intervals, TimeSeries}; use tokio::{select, sync::mpsc}; @@ -33,6 +34,7 @@ pub struct ResponsivenessConfig { pub trimmed_mean_percent: f64, pub std_tolerance: f64, pub max_loaded_connections: usize, + pub no_tls: bool, } impl ResponsivenessConfig { @@ -41,7 +43,11 @@ impl ResponsivenessConfig { headers: HashMap::default(), download_url: self.large_download_url.clone(), upload_url: self.upload_url.clone(), - upload_size: 4_000_000_000, // 4 GB + // Preserve original large upload size for HTTPS path to keep behavior identical. + // When running in --no-tls mode we override this later when constructing the + // Responsiveness instance for uploads. + upload_size: 4_000_000_000, // 4 GB (legacy default) + no_tls: self.no_tls, } } } @@ -64,6 +70,7 @@ impl Default for ResponsivenessConfig { trimmed_mean_percent: 0.95, std_tolerance: 0.05, max_loaded_connections: 16, + no_tls: false, } } } @@ -81,10 +88,17 @@ pub struct Responsiveness { direction: Direction, rpm: f64, capacity: f64, + // Dedicated warm connection for self probes in non-multiplexed (H1) mode to avoid + // paying connection setup on every probe and inflating loaded latency. + self_probe_connection: Option>>, } impl Responsiveness { + fn connection_type(&self) -> ConnectionType { + if self.config.no_tls { ConnectionType::H1 } else { ConnectionType::H2 } + } pub fn new(config: ResponsivenessConfig, download: bool) -> anyhow::Result { + let no_tls = config.no_tls; // capture before move let load_generator = LoadGenerator::new(config.load_config())?; Ok(Self { @@ -99,11 +113,17 @@ impl Responsiveness { rpm_saturated: false, direction: if download { Direction::Down + } else if no_tls { + // In no-tls (plain HTTP) mode, use reduced synthetic upload size to avoid + // extremely long single-connection uploads and exercise self-probe behavior. + Direction::Up(std::cmp::min(16u64 * 1024 * 1024, usize::MAX as u64) as usize) } else { - Direction::Up(std::cmp::min(32u64 * 1024 * 1024 * 1024, usize::MAX as u64) as usize) + // HTTPS path: preserve original behavior (large upload). 4GB is set in load config. + Direction::Up(4_000_000_000usize.min(usize::MAX)) }, rpm: 0.0, capacity: 0.0, + self_probe_connection: None, }) } } @@ -126,6 +146,29 @@ impl Responsiveness { let env = Env { time, network }; self.start = env.time.now(); + if self.config.no_tls { + // Establish a dedicated warm self-probe connection (H1) so subsequent self probes + // measure in-connection latency instead of including connect time. This is only + // enabled for plain HTTP (--no-tls) mode. HTTPS code path remains unchanged from + // original implementation. + if self.self_probe_connection.is_none() { + if let Ok(inflight) = ThroughputClient::download().plain_http_mode(true) + .new_connection(ConnectionType::H1) + .send( + self.config.small_download_url.as_str().parse()?, + Arc::clone(&env.network), + Arc::clone(&env.time), + shutdown.clone(), + )? + .await + { + // Wait for the tiny body to finish to ensure connection is idle. + let _ = wait_for_finish(inflight.events).await; + self.self_probe_connection = Some(inflight.connection); + } + } + } + info!("running responsiveness test: {:?}", self.config); let mut interval = None; @@ -403,7 +446,7 @@ impl Responsiveness { ) -> anyhow::Result<()> { let oneshot_res = self.load_generator.new_loaded_connection( self.direction, - ConnectionType::H2, + self.connection_type(), Arc::clone(&env.network), Arc::clone(&env.time), shutdown, @@ -439,7 +482,7 @@ impl Responsiveness { shutdown: CancellationToken, ) -> anyhow::Result<()> { let inflight_body_fut = ThroughputClient::download() - .new_connection(ConnectionType::H2) + .new_connection(self.connection_type()) .send( self.config.small_download_url.as_str().parse()?, Arc::clone(&env.network), @@ -500,21 +543,60 @@ impl Responsiveness { env: &Env, shutdown: CancellationToken, ) -> anyhow::Result { - // The test client should uniformly and randomly select from the active - // load-generating connections on which to send self probes. - let Some(connection) = self.load_generator.random_connection() else { - return Ok(false); + let inflight_body_fut = if self.config.no_tls { + // Plain HTTP path (no multiplexing). Reuse warm dedicated or a finished load, else new. + if let Some(conn) = self.self_probe_connection.clone() { + ThroughputClient::download().plain_http_mode(true) + .with_connection(conn) + .send( + self.config.small_download_url.as_str().parse()?, + Arc::clone(&env.network), + Arc::clone(&env.time), + shutdown.clone(), + )? + } else if let Some(conn) = self.load_generator.random_finished_connection() { + ThroughputClient::download().plain_http_mode(true) + .with_connection(conn) + .send( + self.config.small_download_url.as_str().parse()?, + Arc::clone(&env.network), + Arc::clone(&env.time), + shutdown.clone(), + )? + } else { + ThroughputClient::download().plain_http_mode(true) + .new_connection(self.connection_type()) + .send( + self.config.small_download_url.as_str().parse()?, + Arc::clone(&env.network), + Arc::clone(&env.time), + shutdown, + )? + } + } else { + // HTTPS path with H2 multiplexing enabled: attempt to reuse an ongoing load connection. + if let Some(conn) = self.load_generator.random_connection() { + ThroughputClient::download().plain_http_mode(false) + .with_connection(conn) + .send( + self.config.small_download_url.as_str().parse()?, + Arc::clone(&env.network), + Arc::clone(&env.time), + shutdown.clone(), + )? + } else { + // If no ongoing load yet, create a new one of the configured type (H2). + ThroughputClient::download().plain_http_mode(false) + .new_connection(self.connection_type()) + .send( + self.config.small_download_url.as_str().parse()?, + Arc::clone(&env.network), + Arc::clone(&env.time), + shutdown, + )? + } }; - let inflight_body_fut = ThroughputClient::download() - .with_connection(connection) - .send( - self.config.small_download_url.as_str().parse()?, - Arc::clone(&env.network), - Arc::clone(&env.time), - shutdown, - )?; - tokio::spawn(report_err( event_tx.clone(), async move { diff --git a/crates/nq-tokio-network/src/lib.rs b/crates/nq-tokio-network/src/lib.rs index eb9568e..fbee9b7 100644 --- a/crates/nq-tokio-network/src/lib.rs +++ b/crates/nq-tokio-network/src/lib.rs @@ -23,10 +23,8 @@ pub struct TokioNetwork { } impl TokioNetwork { - pub fn new(time: Arc, shutdown: CancellationToken) -> Self { - Self { - inner: TokioNetworkInner::new(time, shutdown), - } + pub fn new(time: Arc, shutdown: CancellationToken, no_tls: bool) -> Self { + Self { inner: TokioNetworkInner::new(time, shutdown, no_tls) } } } @@ -123,8 +121,8 @@ pub struct TokioNetworkInner { } impl TokioNetworkInner { - pub fn new(time: Arc, shutdown: CancellationToken) -> Self { - let connections: Arc = Default::default(); + pub fn new(time: Arc, shutdown: CancellationToken, no_tls: bool) -> Self { + let connections: Arc = Arc::new(ConnectionManager::new(no_tls)); tokio::spawn({ let connections = Arc::clone(&connections); @@ -137,11 +135,7 @@ impl TokioNetworkInner { } }); - Self { - connections, - time, - shutdown, - } + Self { connections, time, shutdown } } async fn new_connection(