From 3ca1c3210aaa2a5ace9eadc19890e2d7825391ed Mon Sep 17 00:00:00 2001 From: Fisher Darling Date: Thu, 23 Oct 2025 20:17:52 +0200 Subject: [PATCH 1/8] Use workspace dependencies, 2024 edition, resolver = 3 --- Cargo.lock | 99 +++++++------------------- Cargo.toml | 10 ++- crates/nq-core/Cargo.toml | 8 +-- crates/nq-core/src/body/upload_body.rs | 2 +- crates/nq-latency/Cargo.toml | 3 +- crates/nq-load-generator/Cargo.toml | 3 +- crates/nq-packetloss/Cargo.toml | 5 +- crates/nq-rpm/Cargo.toml | 5 +- crates/nq-stats/Cargo.toml | 3 +- 9 files changed, 43 insertions(+), 95 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fbe3931..7e20189 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -500,7 +500,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" dependencies = [ "generic-array", - "rand_core 0.6.4", + "rand_core", "subtle", "zeroize", ] @@ -512,7 +512,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ "generic-array", - "rand_core 0.6.4", + "rand_core", "typenum", ] @@ -649,7 +649,7 @@ dependencies = [ "hkdf", "pem-rfc7468", "pkcs8", - "rand_core 0.6.4", + "rand_core", "sec1", "subtle", "zeroize", @@ -667,7 +667,7 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0b50bfb653653f9ca9095b427bed08ab8d75a137839d9ad64eb11810d5b6393" dependencies = [ - "rand_core 0.6.4", + "rand_core", "subtle", ] @@ -887,7 +887,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63" dependencies = [ "ff", - "rand_core 0.6.4", + "rand_core", "subtle", ] @@ -1076,7 +1076,7 @@ dependencies = [ "bytes", "log", "portable-atomic", - "rand 0.8.5", + "rand", "rtcp", "rtp", "thiserror", @@ -1276,7 +1276,7 @@ dependencies = [ "hyper", "hyper-util", "pin-project-lite", - "rand 0.9.0", + "rand", "rustls-native-certs", "tokio", "tokio-boring", @@ -1306,7 +1306,7 @@ dependencies = [ "http", "nq-core", "nq-stats", - "rand 0.8.5", + "rand", "serde", "tokio", "tokio-util", @@ -1636,19 +1636,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.4", -] - -[[package]] -name = "rand" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" -dependencies = [ - "rand_chacha 0.9.0", - "rand_core 0.9.0", - "zerocopy", + "rand_chacha", + "rand_core", ] [[package]] @@ -1658,17 +1647,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core 0.6.4", -] - -[[package]] -name = "rand_chacha" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" -dependencies = [ - "ppv-lite86", - "rand_core 0.9.0", + "rand_core", ] [[package]] @@ -1680,16 +1659,6 @@ dependencies = [ "getrandom 0.2.15", ] -[[package]] -name = "rand_core" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b08f3c9802962f7e1b25113931d94f43ed9725bebc59db9d0c3e9a23b67e15ff" -dependencies = [ - "getrandom 0.3.1", - "zerocopy", -] - [[package]] name = "rcgen" version = "0.13.2" @@ -1801,7 +1770,7 @@ dependencies = [ "bytes", "memchr", "portable-atomic", - "rand 0.8.5", + "rand", "serde", "thiserror", "webrtc-util", @@ -1917,7 +1886,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02a526161f474ae94b966ba622379d939a8fe46c930eebbadb73e339622599d5" dependencies = [ - "rand 0.8.5", + "rand", "substring", "thiserror", "url", @@ -2051,7 +2020,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ "digest", - "rand_core 0.6.4", + "rand_core", ] [[package]] @@ -2114,7 +2083,7 @@ dependencies = [ "crc", "lazy_static", "md-5", - "rand 0.8.5", + "rand", "ring", "subtle", "thiserror", @@ -2370,7 +2339,7 @@ dependencies = [ "log", "md-5", "portable-atomic", - "rand 0.8.5", + "rand", "ring", "stun", "thiserror", @@ -2509,7 +2478,7 @@ dependencies = [ "lazy_static", "log", "portable-atomic", - "rand 0.8.5", + "rand", "rcgen", "regex", "ring", @@ -2573,8 +2542,8 @@ dependencies = [ "p256", "p384", "portable-atomic", - "rand 0.8.5", - "rand_core 0.6.4", + "rand", + "rand_core", "rcgen", "ring", "rustls", @@ -2601,7 +2570,7 @@ dependencies = [ "crc", "log", "portable-atomic", - "rand 0.8.5", + "rand", "serde", "serde_json", "stun", @@ -2636,7 +2605,7 @@ checksum = "e153be16b8650021ad3e9e49ab6e5fa9fb7f6d1c23c213fd8bbd1a1135a4c704" dependencies = [ "byteorder", "bytes", - "rand 0.8.5", + "rand", "rtp", "thiserror", ] @@ -2653,7 +2622,7 @@ dependencies = [ "crc", "log", "portable-atomic", - "rand 0.8.5", + "rand", "thiserror", "tokio", "webrtc-util", @@ -2697,7 +2666,7 @@ dependencies = [ "log", "nix", "portable-atomic", - "rand 0.8.5", + "rand", "thiserror", "tokio", "winapi", @@ -2880,7 +2849,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7e468321c81fb07fa7f4c636c3972b9100f0346e5b6a9f2bd0603a52f7ed277" dependencies = [ "curve25519-dalek", - "rand_core 0.6.4", + "rand_core", "serde", "zeroize", ] @@ -2912,26 +2881,6 @@ dependencies = [ "time", ] -[[package]] -name = "zerocopy" -version = "0.8.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa91407dacce3a68c56de03abe2760159582b846c6a4acd2f456618087f12713" -dependencies = [ - "zerocopy-derive", -] - -[[package]] -name = "zerocopy-derive" -version = "0.8.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06718a168365cad3d5ff0bb133aad346959a2074bd4a85c121255a11304a8626" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "zeroize" version = "1.8.1" diff --git a/Cargo.toml b/Cargo.toml index a5ce135..0a67925 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,11 @@ [workspace] members = ["crates/*", "cli"] -resolver = "2" +resolver = "3" [workspace.package] version = "3.3.0" -repository = "https://github.com/cloudflare/foundations" -edition = "2021" +repository = "https://github.com/cloudflare/networkquality-rs" +edition = "2024" authors = [ "Fisher Darling ", "Lina Baquero ", @@ -29,14 +29,18 @@ nq-tokio-network = { path = "./crates/nq-tokio-network" } anyhow = "1.0" async-trait = { version = "0.1" } boring = "4.11.0" +bytes = "1.6.0" clap = "4.3" clap-verbosity-flag = "2.1" http = "1.0" http-body-util = "0.1.2" +humansize = "2.1.3" hyper = "1.0" hyper-util = "0.1" pin-project-lite = "0.2" rand = "0.8" +rustls = { version = "0.23.10", default-features = false, features = ["std", "ring"] } +rustls-native-certs = "0.7.0" serde = "1.0" serde_json = { version = "1.0", features = ["preserve_order"] } tracing = "0.1" diff --git a/crates/nq-core/Cargo.toml b/crates/nq-core/Cargo.toml index 19e0ad7..82b7f1b 100644 --- a/crates/nq-core/Cargo.toml +++ b/crates/nq-core/Cargo.toml @@ -10,15 +10,15 @@ readme = "../README.md" [dependencies] anyhow = { workspace = true } boring = { workspace = true } +bytes = { workspace = true } http = { workspace = true } http-body-util = { workspace = true } hyper = { workspace = true, features = ["client", "http1", "http2"] } hyper-util = { workspace = true, features = ["tokio"] } pin-project-lite = { workspace = true } -rustls-native-certs = "0.7.0" +rand = { workspace = true } +rustls-native-certs = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "time", "net", "test-util"] } tokio-boring = { workspace = true } tokio-util = { workspace = true } -tracing = { workspace = true } -rand = "0.9.0-beta.1" -bytes = "1.6.0" \ No newline at end of file +tracing = { workspace = true } \ No newline at end of file diff --git a/crates/nq-core/src/body/upload_body.rs b/crates/nq-core/src/body/upload_body.rs index 0a2320e..88594b5 100644 --- a/crates/nq-core/src/body/upload_body.rs +++ b/crates/nq-core/src/body/upload_body.rs @@ -26,7 +26,7 @@ impl UploadBody { pub fn new(size: usize) -> Self { const CHUNK_SIZE: usize = 256 * 1024; // 256 KB - let mut rng = StdRng::from_os_rng(); + let mut rng = StdRng::from_entropy(); let chunk_size = std::cmp::min(CHUNK_SIZE, size); let mut chunk = vec![0u8; chunk_size]; rng.fill(&mut chunk[..]); diff --git a/crates/nq-latency/Cargo.toml b/crates/nq-latency/Cargo.toml index 18fbf8b..618a89d 100644 --- a/crates/nq-latency/Cargo.toml +++ b/crates/nq-latency/Cargo.toml @@ -6,7 +6,6 @@ repository = { workspace = true } authors = { workspace = true } license = { workspace = true } readme = "../README.md" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] nq-core = { workspace = true } @@ -17,4 +16,4 @@ http = { workspace = true } http-body-util = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } -url = { workspace = true, features = ["serde"] } \ No newline at end of file +url = { workspace = true, features = ["serde"] } diff --git a/crates/nq-load-generator/Cargo.toml b/crates/nq-load-generator/Cargo.toml index ac17c04..b7ee85b 100644 --- a/crates/nq-load-generator/Cargo.toml +++ b/crates/nq-load-generator/Cargo.toml @@ -6,7 +6,6 @@ repository = { workspace = true } authors = { workspace = true } license = { workspace = true } readme = "../README.md" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] nq-core = { workspace = true } @@ -19,4 +18,4 @@ serde = { workspace = true, features = ["derive"] } tokio = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } -url = { workspace = true, features = ["serde"] } \ No newline at end of file +url = { workspace = true, features = ["serde"] } diff --git a/crates/nq-packetloss/Cargo.toml b/crates/nq-packetloss/Cargo.toml index eba413c..f4f509e 100644 --- a/crates/nq-packetloss/Cargo.toml +++ b/crates/nq-packetloss/Cargo.toml @@ -6,7 +6,6 @@ repository = { workspace = true } authors = { workspace = true } license = { workspace = true } readme = "../README.md" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] nq-core = { workspace = true } @@ -17,11 +16,11 @@ anyhow = { workspace = true, features = ["backtrace"] } http = { workspace = true } http-body-util = { workspace = true } hyper = { workspace = true } -rustls = { version = "0.23.10", default-features = false, features = ["std", "ring"] } +rustls = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } url = { workspace = true, features = ["serde"] } -webrtc = { workspace = true } \ No newline at end of file +webrtc = { workspace = true } diff --git a/crates/nq-rpm/Cargo.toml b/crates/nq-rpm/Cargo.toml index 06993a5..f12f1da 100644 --- a/crates/nq-rpm/Cargo.toml +++ b/crates/nq-rpm/Cargo.toml @@ -6,7 +6,6 @@ repository = { workspace = true } authors = { workspace = true } license = { workspace = true } readme = "../README.md" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] nq-core = { workspace = true } @@ -14,8 +13,8 @@ nq-load-generator = { workspace = true } nq-stats = { workspace = true } anyhow = { workspace = true, features = ["backtrace"] } -humansize = "*" +humansize = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } -url = { workspace = true, features = ["serde"] } \ No newline at end of file +url = { workspace = true, features = ["serde"] } diff --git a/crates/nq-stats/Cargo.toml b/crates/nq-stats/Cargo.toml index e6b7a2f..a19a282 100644 --- a/crates/nq-stats/Cargo.toml +++ b/crates/nq-stats/Cargo.toml @@ -6,8 +6,7 @@ repository = { workspace = true } authors = { workspace = true } license = { workspace = true } readme = "../README.md" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] nq-core = { workspace = true } -tracing = { workspace = true } \ No newline at end of file +tracing = { workspace = true } From 3fb792980d4cf67d4bf4ed0f4353888996d87422 Mon Sep 17 00:00:00 2001 From: Fisher Darling Date: Thu, 23 Oct 2025 20:23:56 +0200 Subject: [PATCH 2/8] run cargo fmt --- cli/src/latency.rs | 6 ++---- cli/src/packet_loss.rs | 2 +- crates/nq-core/src/body/mod.rs | 4 ++-- crates/nq-core/src/client.rs | 7 ++++--- crates/nq-core/src/connection/http.rs | 4 ++-- crates/nq-core/src/connection/map.rs | 2 +- crates/nq-core/src/lib.rs | 2 +- crates/nq-core/src/network/mod.rs | 2 +- crates/nq-load-generator/src/lib.rs | 6 +++--- crates/nq-packetloss/src/lib.rs | 21 ++++++------------- .../nq-packetloss/src/webrtc_data_channel.rs | 9 ++++---- crates/nq-rpm/src/lib.rs | 12 +++++------ crates/nq-stats/src/counter.rs | 4 +++- crates/nq-stats/src/lib.rs | 2 +- crates/nq-tokio-network/src/lib.rs | 6 +++--- 15 files changed, 40 insertions(+), 49 deletions(-) diff --git a/cli/src/latency.rs b/cli/src/latency.rs index 1bcec60..144e9ee 100644 --- a/cli/src/latency.rs +++ b/cli/src/latency.rs @@ -45,10 +45,8 @@ pub async fn run(url: String, runs: usize) -> anyhow::Result<()> { pub async fn run_test(config: &LatencyConfig) -> anyhow::Result { let shutdown = CancellationToken::new(); let time = Arc::new(TokioTime::new()) as Arc; - let network = Arc::new(TokioNetwork::new( - Arc::clone(&time), - shutdown.clone(), - )) as Arc; + let network = + Arc::new(TokioNetwork::new(Arc::clone(&time), shutdown.clone())) as Arc; let rtt = Latency::new(config.clone()); let results = rtt.run_test(network, time, shutdown).await?; diff --git a/cli/src/packet_loss.rs b/cli/src/packet_loss.rs index fccd837..d8b1bf2 100644 --- a/cli/src/packet_loss.rs +++ b/cli/src/packet_loss.rs @@ -80,4 +80,4 @@ async fn fetch_turn_server_creds( let creds = serde_json::from_slice(&response.into_body().collect().await?.to_bytes()) .context("parsing json creds from turn server url")?; Ok(creds) -} \ No newline at end of file +} diff --git a/crates/nq-core/src/body/mod.rs b/crates/nq-core/src/body/mod.rs index d351cbd..e85e4d4 100644 --- a/crates/nq-core/src/body/mod.rs +++ b/crates/nq-core/src/body/mod.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use tokio::sync::RwLock; use http::{HeaderMap, HeaderValue}; -use http_body_util::{combinators::BoxBody, Empty}; +use http_body_util::{Empty, combinators::BoxBody}; use hyper::body::Bytes; use tokio::sync::mpsc; @@ -21,7 +21,7 @@ pub fn empty() -> Empty { Empty::new() } -use crate::{connection::ConnectionTiming, EstablishedConnection, Timestamp}; +use crate::{EstablishedConnection, Timestamp, connection::ConnectionTiming}; pub use self::{ counting_body::{BodyEvent, CountingBody}, diff --git a/crates/nq-core/src/client.rs b/crates/nq-core/src/client.rs index ea32636..2e5c385 100644 --- a/crates/nq-core/src/client.rs +++ b/crates/nq-core/src/client.rs @@ -16,11 +16,12 @@ use hyper::body::{Body, Bytes, Incoming}; use tokio::select; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, Instrument}; +use tracing::{Instrument, debug, error, info}; use crate::{ - body::{empty, BodyEvent, CountingBody, InflightBody, NqBody, UploadBody}, - oneshot_result, ConnectionType, EstablishedConnection, Network, OneshotResult, Time, Timestamp, + ConnectionType, EstablishedConnection, Network, OneshotResult, Time, Timestamp, + body::{BodyEvent, CountingBody, InflightBody, NqBody, UploadBody, empty}, + oneshot_result, }; /// The default user agent for networkquality requests diff --git a/crates/nq-core/src/connection/http.rs b/crates/nq-core/src/connection/http.rs index c7e3420..e466f9d 100644 --- a/crates/nq-core/src/connection/http.rs +++ b/crates/nq-core/src/connection/http.rs @@ -7,15 +7,15 @@ use std::net::SocketAddr; use std::pin::Pin; use boring::ssl::{SslConnector, SslMethod, SslVerifyMode}; -use boring::x509::store::X509StoreBuilder; use boring::x509::X509; +use boring::x509::store::X509StoreBuilder; use http::{Request, Response}; use hyper::body::Incoming; use hyper::client::conn::{http1, http2}; use hyper_util::rt::TokioIo; use tokio::select; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, Instrument}; +use tracing::{Instrument, debug, error, info}; use crate::body::NqBody; use crate::util::ByteStream; diff --git a/crates/nq-core/src/connection/map.rs b/crates/nq-core/src/connection/map.rs index d970c58..a845938 100644 --- a/crates/nq-core/src/connection/map.rs +++ b/crates/nq-core/src/connection/map.rs @@ -15,7 +15,7 @@ use tokio_util::sync::CancellationToken; use tracing::info; use crate::connection::http::{ - start_h1_conn, start_h2_conn, tls_connection, EstablishedConnection, + EstablishedConnection, start_h1_conn, start_h2_conn, tls_connection, }; use crate::util::ByteStream; use crate::{ConnectionTiming, ConnectionType, ResponseFuture, Time}; diff --git a/crates/nq-core/src/lib.rs b/crates/nq-core/src/lib.rs index a0493b1..40935e3 100644 --- a/crates/nq-core/src/lib.rs +++ b/crates/nq-core/src/lib.rs @@ -23,7 +23,7 @@ pub use crate::{ network::Network, time::{Time, Timestamp, TokioTime}, upgraded::ConnectUpgraded, - util::{oneshot_result, OneshotResult, ResponseFuture}, + util::{OneshotResult, ResponseFuture, oneshot_result}, }; pub use anyhow::Error; diff --git a/crates/nq-core/src/network/mod.rs b/crates/nq-core/src/network/mod.rs index 74ab4be..051c4ee 100644 --- a/crates/nq-core/src/network/mod.rs +++ b/crates/nq-core/src/network/mod.rs @@ -1,7 +1,7 @@ // Copyright (c) 2023-2024 Cloudflare, Inc. // Licensed under the BSD-3-Clause license found in the LICENSE file or at https://opensource.org/licenses/BSD-3-Clause -use crate::{body::NqBody, ConnectionType, EstablishedConnection, OneshotResult, Timestamp}; +use crate::{ConnectionType, EstablishedConnection, OneshotResult, Timestamp, body::NqBody}; use http::Response; use hyper::body::Incoming; use std::net::SocketAddr; diff --git a/crates/nq-load-generator/src/lib.rs b/crates/nq-load-generator/src/lib.rs index 6f80d2a..a487142 100644 --- a/crates/nq-load-generator/src/lib.rs +++ b/crates/nq-load-generator/src/lib.rs @@ -7,14 +7,14 @@ use anyhow::Context; use http::{HeaderMap, HeaderName, HeaderValue}; use nq_core::client::{Direction, ThroughputClient}; use nq_core::{ - oneshot_result, BodyEvent, ConnectionType, EstablishedConnection, Network, OneshotResult, Time, - Timestamp, + BodyEvent, ConnectionType, EstablishedConnection, Network, OneshotResult, Time, Timestamp, + oneshot_result, }; use nq_stats::CounterSeries; use rand::seq::SliceRandom; use serde::Deserialize; -use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::RwLock; +use tokio::sync::mpsc::UnboundedReceiver; use tokio_util::sync::CancellationToken; use tracing::Instrument; diff --git a/crates/nq-packetloss/src/lib.rs b/crates/nq-packetloss/src/lib.rs index 6756dd2..1ee1334 100644 --- a/crates/nq-packetloss/src/lib.rs +++ b/crates/nq-packetloss/src/lib.rs @@ -9,15 +9,12 @@ mod webrtc_data_channel; -use nq_core::{ - client::Direction, - ConnectionType, Network, Time, TokioTime, -}; +use nq_core::{ConnectionType, Network, Time, TokioTime, client::Direction}; use nq_load_generator::{LoadConfig, LoadGenerator, LoadedConnection}; use nq_tokio_network::TokioNetwork; use serde::{Deserialize, Serialize}; use std::{cmp::min, collections::HashMap, fmt::Display, sync::Arc, time::Duration}; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::{RwLock, mpsc}; use tokio_util::sync::CancellationToken; use tracing::Instrument; use url::Url; @@ -55,9 +52,7 @@ impl Default for PacketLossConfig { download_url: "https://h3.speed.cloudflare.com/__down?bytes=10000000000" .parse() .unwrap(), - upload_url: "https://h3.speed.cloudflare.com/__up" - .parse() - .unwrap(), + upload_url: "https://h3.speed.cloudflare.com/__up".parse().unwrap(), } } } @@ -339,7 +334,7 @@ pub struct TurnServerCreds { #[cfg(test)] mod tests { use crate::{ - webrtc_data_channel::tests::TestTurnServer, PacketLoss, PacketLossConfig, PacketLossResult, + PacketLoss, PacketLossConfig, PacketLossResult, webrtc_data_channel::tests::TestTurnServer, }; use std::time::Duration; use tokio_util::sync::CancellationToken; @@ -363,9 +358,7 @@ mod tests { download_url: "https://h3.speed.cloudflare.com/__down?bytes=10000000000" .parse() .unwrap(), - upload_url: "https://h3.speed.cloudflare.com/__up" - .parse() - .unwrap(), + upload_url: "https://h3.speed.cloudflare.com/__up".parse().unwrap(), }; let packet_loss = PacketLoss::new_with_config(config)?; let packet_loss_result = packet_loss @@ -471,9 +464,7 @@ mod tests { download_url: "https://h3.speed.cloudflare.com/__down?bytes=10000000000" .parse() .unwrap(), - upload_url: "https://h3.speed.cloudflare.com/__up" - .parse() - .unwrap(), + upload_url: "https://h3.speed.cloudflare.com/__up".parse().unwrap(), }; let packet_loss = PacketLoss::new_with_config(config)?; diff --git a/crates/nq-packetloss/src/webrtc_data_channel.rs b/crates/nq-packetloss/src/webrtc_data_channel.rs index 55d5a8d..56a1019 100644 --- a/crates/nq-packetloss/src/webrtc_data_channel.rs +++ b/crates/nq-packetloss/src/webrtc_data_channel.rs @@ -6,13 +6,14 @@ use std::sync::{Arc, Weak}; use tokio::sync::mpsc::UnboundedSender; use webrtc::{ api::APIBuilder, - data_channel::{data_channel_init::RTCDataChannelInit, RTCDataChannel}, + data_channel::{RTCDataChannel, data_channel_init::RTCDataChannelInit}, ice_transport::{ ice_candidate::RTCIceCandidate, ice_protocol::RTCIceProtocol, ice_server::RTCIceServer, }, peer_connection::{ - configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState, - policy::ice_transport_policy::RTCIceTransportPolicy, RTCPeerConnection, + RTCPeerConnection, configuration::RTCConfiguration, + peer_connection_state::RTCPeerConnectionState, + policy::ice_transport_policy::RTCIceTransportPolicy, }, }; @@ -270,8 +271,8 @@ pub(crate) mod tests { }; use crate::{ - webrtc_data_channel::{DataChannelEvent, WebRTCDataChannel}, TurnServerCreds, + webrtc_data_channel::{DataChannelEvent, WebRTCDataChannel}, }; // Canned password for the test TURN server diff --git a/crates/nq-rpm/src/lib.rs b/crates/nq-rpm/src/lib.rs index eb40792..6ed65c2 100644 --- a/crates/nq-rpm/src/lib.rs +++ b/crates/nq-rpm/src/lib.rs @@ -10,16 +10,16 @@ use std::{ time::Duration, }; -use humansize::{format_size, DECIMAL}; +use humansize::{DECIMAL, format_size}; use nq_core::{ - client::{wait_for_finish, Direction, ThroughputClient}, ConnectionType, Network, Time, Timestamp, + client::{Direction, ThroughputClient, wait_for_finish}, }; use nq_load_generator::{LoadConfig, LoadGenerator, LoadedConnection}; -use nq_stats::{instant_minus_intervals, TimeSeries}; +use nq_stats::{TimeSeries, instant_minus_intervals}; use tokio::{select, sync::mpsc}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, Instrument}; +use tracing::{Instrument, debug, error, info}; use url::Url; #[derive(Debug, Clone)] @@ -55,9 +55,7 @@ impl Default for ResponsivenessConfig { small_download_url: "https://h3.speed.cloudflare.com/__down?bytes=10" .parse() .unwrap(), - upload_url: "https://h3.speed.cloudflare.com/__up" - .parse() - .unwrap(), + upload_url: "https://h3.speed.cloudflare.com/__up".parse().unwrap(), moving_average_distance: 4, interval_duration: Duration::from_millis(1000), test_duration: Duration::from_secs(20), diff --git a/crates/nq-stats/src/counter.rs b/crates/nq-stats/src/counter.rs index f293b45..61bdee7 100644 --- a/crates/nq-stats/src/counter.rs +++ b/crates/nq-stats/src/counter.rs @@ -73,7 +73,9 @@ impl CounterSeries { .copied() .zip(self.samples.get(end_idx.saturating_sub(1)).copied())?; - debug!("sample interval: from={from:?}, to={to:?}, start_idx={start_idx}, end_idx={end_idx}, start={start:?}, end={end:?}"); + debug!( + "sample interval: from={from:?}, to={to:?}, start_idx={start_idx}, end_idx={end_idx}, start={start:?}, end={end:?}" + ); Some(SampleRange { start, end }) } diff --git a/crates/nq-stats/src/lib.rs b/crates/nq-stats/src/lib.rs index 679ea59..ab16854 100644 --- a/crates/nq-stats/src/lib.rs +++ b/crates/nq-stats/src/lib.rs @@ -153,7 +153,7 @@ mod tests { use nq_core::Timestamp; - use crate::{instant_minus_intervals, TimeSeries}; + use crate::{TimeSeries, instant_minus_intervals}; fn avg_first_n(n: f64) -> f64 { (n + 1.0) / 2.0 diff --git a/crates/nq-tokio-network/src/lib.rs b/crates/nq-tokio-network/src/lib.rs index eb9568e..bc4128e 100644 --- a/crates/nq-tokio-network/src/lib.rs +++ b/crates/nq-tokio-network/src/lib.rs @@ -9,13 +9,13 @@ use tokio::sync::RwLock; use http::{Request, Response}; use hyper::body::Incoming; use nq_core::{ - oneshot_result, ConnectionManager, ConnectionTiming, ConnectionType, EstablishedConnection, - Network, NqBody, OneshotResult, ResponseFuture, Time, Timestamp, + ConnectionManager, ConnectionTiming, ConnectionType, EstablishedConnection, Network, NqBody, + OneshotResult, ResponseFuture, Time, Timestamp, oneshot_result, }; use tokio::net::TcpStream; use tokio_util::sync::CancellationToken; -use tracing::{error, info, Instrument}; +use tracing::{Instrument, error, info}; #[derive(Debug, Clone)] pub struct TokioNetwork { From 1aa1611511b1b4552704359c3d26fd0ffc259be4 Mon Sep 17 00:00:00 2001 From: Fisher Darling Date: Fri, 24 Oct 2025 09:42:53 +0200 Subject: [PATCH 3/8] Ability to disable TLS for HTTP/1.1 connections --- cli/src/args/mod.rs | 13 ++++++++++++ cli/src/packet_loss.rs | 2 +- cli/src/up_down.rs | 17 +++------------- crates/nq-core/src/connection/http.rs | 6 +++++- crates/nq-core/src/connection/map.rs | 10 ++++++--- crates/nq-core/src/connection/mod.rs | 29 +++++++++++++++++++++++++-- crates/nq-latency/src/lib.rs | 3 ++- 7 files changed, 58 insertions(+), 22 deletions(-) diff --git a/cli/src/args/mod.rs b/cli/src/args/mod.rs index 29e0f98..76a5763 100644 --- a/cli/src/args/mod.rs +++ b/cli/src/args/mod.rs @@ -6,6 +6,7 @@ pub(crate) mod rpm; pub(crate) mod up_down; use clap::{Parser, Subcommand, ValueEnum}; +use nq_core::ConnectionType; use packet_loss::PacketLossArgs; use crate::args::rpm::RpmArgs; @@ -77,7 +78,19 @@ pub enum Command { /// Describes which underlying transport a connection uses. #[derive(Debug, Clone, ValueEnum)] pub enum ConnType { + H1ClearText, H1, H2, H3, } + +impl From for ConnectionType { + fn from(conn_type: ConnType) -> Self { + match conn_type { + ConnType::H1ClearText => ConnectionType::H1 { use_tls: false }, + ConnType::H1 => ConnectionType::H1 { use_tls: true }, + ConnType::H2 => ConnectionType::H2, + ConnType::H3 => ConnectionType::H3, + } + } +} diff --git a/cli/src/packet_loss.rs b/cli/src/packet_loss.rs index d8b1bf2..ce3eb81 100644 --- a/cli/src/packet_loss.rs +++ b/cli/src/packet_loss.rs @@ -59,7 +59,7 @@ async fn fetch_turn_server_creds( headers.append(hyper::header::HOST, HeaderValue::from_str(host)?); let response = Client::default() - .new_connection(ConnectionType::H1) + .new_connection(ConnectionType::h1()) .method("GET") .headers(headers) .send( diff --git a/cli/src/up_down.rs b/cli/src/up_down.rs index 523ff67..1a53d4d 100644 --- a/cli/src/up_down.rs +++ b/cli/src/up_down.rs @@ -5,13 +5,12 @@ use std::sync::Arc; use anyhow::Context; use nq_core::client::{wait_for_finish, ThroughputClient}; -use nq_core::{ConnectionType, Network, Time, TokioTime}; +use nq_core::{Network, Time, TokioTime}; use nq_tokio_network::TokioNetwork; use tokio_util::sync::CancellationToken; use tracing::info; use crate::args::up_down::{DownloadArgs, UploadArgs}; -use crate::args::ConnType; use crate::util::pretty_secs; use serde_json::json; @@ -23,12 +22,7 @@ pub async fn download(args: DownloadArgs) -> anyhow::Result<()> { let network = Arc::new(TokioNetwork::new(Arc::clone(&time), shutdown.clone())) as Arc; - let conn_type = match args.conn_type { - ConnType::H1 => ConnectionType::H1, - ConnType::H2 => ConnectionType::H2, - ConnType::H3 => unimplemented!("H3 is not yet implemented"), // ConnectionType::H3, - }; - + let conn_type = args.conn_type.into(); info!("downloading: {}", args.url); let inflight_body = ThroughputClient::download() @@ -90,12 +84,7 @@ pub async fn upload(args: UploadArgs) -> anyhow::Result<()> { let network = Arc::new(TokioNetwork::new(Arc::clone(&time), shutdown.clone())) as Arc; - let conn_type = match args.conn_type { - ConnType::H1 => ConnectionType::H1, // ConnectionType::H1, - ConnType::H2 => ConnectionType::H2, - ConnType::H3 => unimplemented!("H3 is not yet implemented"), // ConnectionType::H3, - }; - + let conn_type = args.conn_type.into(); let bytes = args.bytes.unwrap_or(10_000_000); println!("{}\n", args.url); diff --git a/crates/nq-core/src/connection/http.rs b/crates/nq-core/src/connection/http.rs index e466f9d..80cfe1b 100644 --- a/crates/nq-core/src/connection/http.rs +++ b/crates/nq-core/src/connection/http.rs @@ -6,6 +6,7 @@ use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; +use anyhow::bail; use boring::ssl::{SslConnector, SslMethod, SslVerifyMode}; use boring::x509::X509; use boring::x509::store::X509StoreBuilder; @@ -78,7 +79,10 @@ pub async fn tls_connection( builder.set_verify(SslVerifyMode::PEER); let alpn: &[u8] = match conn_type { - ConnectionType::H1 => b"\x08http/1.1", + ConnectionType::H1 { use_tls: false } => { + bail!("cannot create tls connection if `use_tls: false`") + } + ConnectionType::H1 { use_tls: true } => b"\x08http/1.1", ConnectionType::H2 => b"\x02h2", ConnectionType::H3 => b"\x02h3", }; diff --git a/crates/nq-core/src/connection/map.rs b/crates/nq-core/src/connection/map.rs index a845938..038581d 100644 --- a/crates/nq-core/src/connection/map.rs +++ b/crates/nq-core/src/connection/map.rs @@ -40,9 +40,13 @@ impl ConnectionManager { shutdown: CancellationToken, ) -> Result>> { let connection = match conn_type { - ConnectionType::H1 => { - let stream = tls_connection(conn_type, &domain, &mut timing, io, time).await?; - start_h1_conn(domain, timing, stream, time, shutdown).await? + ConnectionType::H1 { use_tls } => { + if use_tls { + let stream = tls_connection(conn_type, &domain, &mut timing, io, time).await?; + start_h1_conn(domain, timing, stream, time, shutdown).await? + } else { + start_h1_conn(domain, timing, io, time, shutdown).await? + } } ConnectionType::H2 => { let stream = tls_connection(conn_type, &domain, &mut timing, io, time).await?; diff --git a/crates/nq-core/src/connection/mod.rs b/crates/nq-core/src/connection/mod.rs index fd7ffa6..ce31ae3 100644 --- a/crates/nq-core/src/connection/mod.rs +++ b/crates/nq-core/src/connection/mod.rs @@ -14,14 +14,39 @@ pub use self::map::ConnectionManager; /// The L7 type of a connection. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ConnectionType { - /// Create an HTTP/1.1 connection. - H1, + /// Create an HTTP/1.1 connection. To disable tls, set `use_tls: false`. + H1 { + /// enable tls for this HTTP/1.1 connection. + use_tls: bool, + }, /// Create an HTTP/2 connection. H2, /// Create an HTTP/3 connection. H3, } +impl ConnectionType { + /// Creates an HTTP/1.1 connection type with TLS disabled. + pub fn h1_clear_text() -> ConnectionType { + ConnectionType::H1 { use_tls: false } + } + + /// Creates an HTTP/1.1 connection type. + pub fn h1() -> ConnectionType { + ConnectionType::H1 { use_tls: true } + } + + /// Creates an HTTP/2 connection type. + pub fn h2() -> ConnectionType { + ConnectionType::H2 + } + + /// Creates an HTTP/3 connection type. + pub fn h3() -> ConnectionType { + ConnectionType::H3 + } +} + /// Timing stats for the establishment of a connection. All durations /// are calculated from the start of the connection. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] diff --git a/crates/nq-latency/src/lib.rs b/crates/nq-latency/src/lib.rs index 8d1f570..7a51cab 100644 --- a/crates/nq-latency/src/lib.rs +++ b/crates/nq-latency/src/lib.rs @@ -71,8 +71,9 @@ impl Latency { .context("unable to resolve host")?; let time_lookup = time.now(); + let conn_type = ConnectionType::H1 { use_tls: true }; let connection = network - .new_connection(conn_start, addrs[0], host.to_string(), ConnectionType::H1) + .new_connection(conn_start, addrs[0], host.to_string(), conn_type) .await .context("unable to create new connection")?; { From 40a2c905f0d2a309d2504e3d07012a5582b74821 Mon Sep 17 00:00:00 2001 From: Fisher Darling Date: Fri, 24 Oct 2025 09:56:46 +0200 Subject: [PATCH 4/8] feat: 'mach saturate' command to determine maximum network capacity --- cli/Cargo.toml | 2 +- cli/src/aim_report.rs | 8 +- cli/src/args/mod.rs | 11 +- cli/src/args/saturate.rs | 86 ++++++++++++++ cli/src/main.rs | 4 +- cli/src/rpm.rs | 8 +- cli/src/saturate.rs | 154 ++++++++++++++++++++++++++ cli/src/up_down.rs | 15 ++- crates/nq-core/src/client.rs | 23 +++- crates/nq-core/src/connection/http.rs | 20 +++- crates/nq-rpm/src/lib.rs | 11 +- 11 files changed, 318 insertions(+), 24 deletions(-) create mode 100644 cli/src/args/saturate.rs create mode 100644 cli/src/saturate.rs diff --git a/cli/Cargo.toml b/cli/Cargo.toml index b15e86d..e04be99 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -27,4 +27,4 @@ serde_json = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "time", "net", "macros",] } tokio-util = { workspace = true } tracing = { workspace = true } -tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } \ No newline at end of file +tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } diff --git a/cli/src/aim_report.rs b/cli/src/aim_report.rs index 960973a..fa8105a 100644 --- a/cli/src/aim_report.rs +++ b/cli/src/aim_report.rs @@ -132,9 +132,9 @@ impl CloudflareAimResults { #[serde(rename_all = "camelCase")] pub struct BpsMeasurement { /// The total number of bytes. - bytes: usize, + pub(crate) bytes: usize, /// The bits per second of the transfer. - bps: usize, + pub(crate) bps: usize, } impl BpsMeasurement { @@ -154,7 +154,7 @@ impl BpsMeasurement { } /// Use the test duration and network capacity to create a synthetic bps result. - fn from_rpm_result(rpm_result: &ResponsivenessResult) -> BpsMeasurement { + pub(crate) fn from_rpm_result(rpm_result: &ResponsivenessResult) -> BpsMeasurement { let throughput = rpm_result.throughput().unwrap_or(0) as f64; let bytes = throughput * rpm_result.duration.as_secs_f64(); @@ -190,7 +190,7 @@ impl Default for PacketLossMeasurement { /// https://developers.cloudflare.com/speed/aim/ #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -#[allow(missing_docs)] +#[allow(missing_docs, dead_code)] pub enum AimScore { Streaming { points: usize, diff --git a/cli/src/args/mod.rs b/cli/src/args/mod.rs index 76a5763..ddd5f5e 100644 --- a/cli/src/args/mod.rs +++ b/cli/src/args/mod.rs @@ -3,6 +3,7 @@ pub(crate) mod packet_loss; pub(crate) mod rpm; +pub(crate) mod saturate; pub(crate) mod up_down; use clap::{Parser, Subcommand, ValueEnum}; @@ -10,7 +11,9 @@ use nq_core::ConnectionType; use packet_loss::PacketLossArgs; use crate::args::rpm::RpmArgs; -use crate::args::up_down::DownloadArgs; +use crate::args::saturate::SaturateArgs; +// use crate::args::saturate::SaturateArgs; +use crate::args::up_down::{DownloadArgs, UploadArgs}; /// mach runs multiple different network performance tests. The main focus of /// mach and this tool is to implement the IETF draft: "Responsiveness under @@ -40,7 +43,7 @@ pub enum Command { Download(DownloadArgs), /// Upload data (POST) to an endpoint, reporting latency measurements and total /// throughput. - // Upload(UploadArgs), + Upload(UploadArgs), /// Determine the Round-Trip-Time (RTT), or latency, of a link using the /// time it takes to establish a TCP connection. /// @@ -58,6 +61,10 @@ pub enum Command { }, /// Send UDP packets to a TURN server, reporting lost packets. PacketLoss(PacketLossArgs), + /// Saturate the network in some direction and report maximum goodput. The + /// direction, `up`, `down`, `both` must be specified. By default, the + /// command runs for 20s. + Saturate(SaturateArgs), } // todo(fisher): figure out proxy chaining. Preparsing args or using the -- sentinal? diff --git a/cli/src/args/saturate.rs b/cli/src/args/saturate.rs new file mode 100644 index 0000000..908f692 --- /dev/null +++ b/cli/src/args/saturate.rs @@ -0,0 +1,86 @@ +// Copyright (c) 2023-2024 Cloudflare, Inc. +// Licensed under the BSD-3-Clause license found in the LICENSE file or at https://opensource.org/licenses/BSD-3-Clause + +//! Arguments for running a network saturation test. + +use crate::args::ConnType; + +/// Download data (GET) from an endpoint, reporting latency measurements and +/// total throughput. +#[derive(Debug, clap::Args)] +pub struct SaturateArgs { + /// The connection type to use. + #[clap(short = 't', long, default_value = "h1-clear-text")] + pub(crate) conn_type: ConnType, + /// Which direction to saturate: `up`, `down` or `both`. + #[clap(subcommand)] + pub(crate) direction: Direction, + /// The duration in seconds to saturate the network for. Defaults + /// to 20s. + #[clap(long, default_value = "20")] + pub(crate) duration: u64, +} + +#[derive(Debug, Clone, clap::Subcommand)] +pub enum Direction { + /// Saturate the download (ingress) side of the network. + Down { + /// The URL to upload data to. + #[clap( + short, + long, + default_value = "http://speed.cloudflare.com/__down?bytes=1000000000" + )] + download_url: String, + }, + /// Saturate the upload (egress) side of the network. + Up { + /// The URL to upload data to. + #[clap(short, long, default_value = "http://speed.cloudflare.com/__up")] + upload_url: String, + }, + /// Saturate both the download (ingress) and upload (egress) side of the network. + Both { + /// The URL to download data from. + #[clap( + short, + long, + default_value = "http://speed.cloudflare.com/__down?bytes=1000000000" + )] + download_url: String, + /// The URL to upload data to. + #[clap(short, long, default_value = "http://speed.cloudflare.com/__up")] + upload_url: String, + }, +} + +impl Default for Direction { + fn default() -> Self { + Self::Down { + download_url: "http://speed.cloudflare.com/__down?bytes=1000000000".into(), + } + } +} + +pub(crate) struct Urls { + pub(crate) upload: String, + pub(crate) download: String, +} + +impl Direction { + pub(crate) fn urls(&self) -> Urls { + match self { + Direction::Up { upload_url: url } | Direction::Down { download_url: url } => Urls { + upload: url.clone(), + download: url.clone(), + }, + Direction::Both { + download_url, + upload_url, + } => Urls { + download: download_url.clone(), + upload: upload_url.clone(), + }, + } + } +} diff --git a/cli/src/main.rs b/cli/src/main.rs index 8a15aa3..039c258 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -7,6 +7,7 @@ mod latency; mod packet_loss; mod report; mod rpm; +mod saturate; mod up_down; mod util; @@ -30,9 +31,10 @@ async fn main() -> anyhow::Result<()> { match command { Command::Rpm(config) => rpm::run(config).await?, Command::Download(config) => up_down::download(config).await?, - // Command::Upload(config) => up_down::upload(config).await?, + Command::Upload(config) => up_down::upload(config).await?, Command::Rtt { url, runs } => latency::run(url, runs).await?, Command::PacketLoss(config) => packet_loss::run(config).await?, + Command::Saturate(config) => saturate::run(config).await?, } Ok(()) diff --git a/cli/src/rpm.rs b/cli/src/rpm.rs index f8aaa4c..0864a19 100644 --- a/cli/src/rpm.rs +++ b/cli/src/rpm.rs @@ -74,6 +74,8 @@ pub async fn run(cli_config: RpmArgs) -> anyhow::Result<()> { trimmed_mean_percent: cli_config.trimmed_mean_percent, std_tolerance: cli_config.std_tolerance, max_loaded_connections: cli_config.max_loaded_connections, + conn_type: ConnectionType::H2, + determine_load_only: false, }; info!("running download test"); @@ -121,10 +123,8 @@ async fn run_test( ) -> anyhow::Result { let shutdown = CancellationToken::new(); let time = Arc::new(TokioTime::new()) as Arc; - let network = Arc::new(TokioNetwork::new( - Arc::clone(&time), - shutdown.clone().into(), - )) as Arc; + let network = + Arc::new(TokioNetwork::new(Arc::clone(&time), shutdown.clone())) as Arc; let rpm = Responsiveness::new(config.clone(), download)?; let result = rpm.run_test(network, time, shutdown.clone()).await?; diff --git a/cli/src/saturate.rs b/cli/src/saturate.rs new file mode 100644 index 0000000..34309f3 --- /dev/null +++ b/cli/src/saturate.rs @@ -0,0 +1,154 @@ +// Copyright (c) 2023-2024 Cloudflare, Inc. +// Licensed under the BSD-3-Clause license found in the LICENSE file or at https://opensource.org/licenses/BSD-3-Clause + +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context; +use nq_core::{Network, Time, TokioTime}; +use nq_rpm::{Responsiveness, ResponsivenessConfig, ResponsivenessResult}; +use nq_tokio_network::TokioNetwork; +use serde_json::json; +use tokio::join; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info}; + +use crate::args::saturate::{Direction, SaturateArgs, Urls}; +use crate::util::pretty_secs; + +/// Run a network saturation test. This is effectively an RPM test +/// with both TLS disabled AND no RPM measurements. +pub async fn run(cli_config: SaturateArgs) -> anyhow::Result<()> { + debug!("running network saturation test with arguments {cli_config:?}"); + + let Urls { upload, download } = cli_config.direction.urls(); + + let config = ResponsivenessConfig { + large_download_url: download.parse().context("parsing download url")?, + small_download_url: download.parse().context("parsing download url")?, + upload_url: upload.parse().context("parsing upload url")?, + determine_load_only: true, + conn_type: cli_config.conn_type.into(), + test_duration: Duration::from_secs(cli_config.duration), + ..Default::default() + }; + + let results = run_test(&config, &cli_config.direction) + .await + .context("running staturation test")?; + + let json = results.as_json(); + println!("{:#}", json); + + Ok(()) +} + +struct SaturateResults { + download_result: Option, + upload_result: Option, +} + +impl SaturateResults { + fn as_json(&self) -> serde_json::Value { + fn build_json(res: &ResponsivenessResult) -> serde_json::Value { + let dur = pretty_secs(res.duration.as_secs_f64()); + let mut capacity = res.capacity as u64; + if capacity == 0 { + capacity = res + .average_goodput_series + .quantile(0.90) + .unwrap_or_default() as u64; + } + + json!({ "capacity": capacity, "dur": dur }) + } + + json!({ + "download": self.download_result.as_ref().map(build_json), + "upload": self.upload_result.as_ref().map(build_json) + }) + } +} + +async fn run_test( + config: &ResponsivenessConfig, + direction: &Direction, +) -> anyhow::Result { + match direction { + Direction::Down { .. } => { + info!( + download_url = %config.large_download_url, + "running download (ingress) network saturation test", + ); + + let download_result = run_single_test(config, true) + .await + .context("running download saturation test")?; + + Ok(SaturateResults { + download_result: Some(download_result), + upload_result: None, + }) + } + Direction::Up { .. } => { + info!( + upload_url = %config.upload_url, + "running upload (egress) network saturation test", + ); + + let upload_result = run_single_test(config, false) + .await + .context("running upload saturation test")?; + + Ok(SaturateResults { + download_result: None, + upload_result: Some(upload_result), + }) + } + Direction::Both { .. } => { + info!( + download_url = %config.large_download_url, + upload_url = %config.upload_url, + "running parallel download/upload (ingress/egress) network saturation test" + ); + + let download_result_fut = run_single_test(config, true); + let upload_result_fut = run_single_test(config, false); + + let (download_res, upload_res) = join!(download_result_fut, upload_result_fut); + let download_result = download_res + .inspect_err(|err| error!(%err, "error running download test")) + .ok(); + let upload_result = upload_res + .context("running upload saturation test") + .inspect_err(|err| error!(%err, "error running download test")) + .ok(); + + Ok(SaturateResults { + download_result, + upload_result, + }) + } + } +} + +async fn run_single_test( + config: &ResponsivenessConfig, + download: bool, +) -> anyhow::Result { + let shutdown = CancellationToken::new(); + let time = Arc::new(TokioTime::new()) as Arc; + let network = + Arc::new(TokioNetwork::new(Arc::clone(&time), shutdown.clone())) as Arc; + + let rpm = Responsiveness::new(config.clone(), download)?; + let result = rpm.run_test(network, time, shutdown.clone()).await?; + + debug!("shutting down rpm test"); + let _ = tokio::time::timeout(tokio::time::Duration::from_secs(1), async { + shutdown.cancel(); + }) + .await; + + Ok(result) +} diff --git a/cli/src/up_down.rs b/cli/src/up_down.rs index 1a53d4d..d360670 100644 --- a/cli/src/up_down.rs +++ b/cli/src/up_down.rs @@ -39,6 +39,8 @@ pub async fn download(args: DownloadArgs) -> anyhow::Result<()> { .timing .context("expected inflight body to have connection timing data")?; + info!("headers: {:?}", inflight_body.headers); + let body_start = time.now(); let finished_result = wait_for_finish(inflight_body.events).await?; let finished = time.now(); @@ -74,7 +76,7 @@ pub async fn download(args: DownloadArgs) -> anyhow::Result<()> { Ok(()) } -/// Run a download test. +/// Run an upload test. // todo(fisher): investigate body completion events. Moving to Socket stats is // likely the best option. #[allow(dead_code)] @@ -86,8 +88,7 @@ pub async fn upload(args: UploadArgs) -> anyhow::Result<()> { let conn_type = args.conn_type.into(); let bytes = args.bytes.unwrap_or(10_000_000); - - println!("{}\n", args.url); + info!("uploading {bytes} bytes to: {}", args.url); let inflight_body = ThroughputClient::upload(bytes) .new_connection(conn_type) @@ -96,13 +97,17 @@ pub async fn upload(args: UploadArgs) -> anyhow::Result<()> { Arc::clone(&network), Arc::clone(&time), shutdown.clone(), - )? - .await?; + ) + .context("sending upload POST")? + .await + .context("waiting for upload POST response")?; let timing = inflight_body .timing .context("expected inflight body to have connection timing data")?; + info!("headers: {:?}", inflight_body.headers); + let body_start = time.now(); let finished_result = wait_for_finish(inflight_body.events).await?; let finished = time.now(); diff --git a/crates/nq-core/src/client.rs b/crates/nq-core/src/client.rs index 2e5c385..e85f7b8 100644 --- a/crates/nq-core/src/client.rs +++ b/crates/nq-core/src/client.rs @@ -107,7 +107,24 @@ impl ThroughputClient { } let host = uri.host().context("uri is missing a host")?.to_string(); - let host_with_port = format!("{}:{}", host, uri.port_u16().unwrap_or(443)); + debug!( + "host: {}, uri: {}, uri.scheme: {:?}", + host, + uri, + uri.scheme_str() + ); + let host_with_port = format!( + "{}:{}", + host, + uri.port_u16().unwrap_or_else(|| { + if matches!(uri.scheme_str(), Some("http") | None) { + 80 + } else { + 443 + } + }) + ); + debug!("host with port: {host_with_port}"); let method = match self.direction { Direction::Down => "GET", @@ -142,9 +159,8 @@ impl ThroughputClient { .uri(uri) .body(body)?; - tracing::debug!("created request"); - *request.headers_mut() = headers.clone(); + tracing::debug!("created request: {request:?}"); tokio::spawn( async move { @@ -201,6 +217,7 @@ impl ThroughputClient { } Direction::Down => { let (parts, incoming) = response_fut.await?.into_parts(); + info!("download response parts: {:?}", parts); let (counting_body, events) = CountingBody::new( incoming, diff --git a/crates/nq-core/src/connection/http.rs b/crates/nq-core/src/connection/http.rs index 80cfe1b..9131be0 100644 --- a/crates/nq-core/src/connection/http.rs +++ b/crates/nq-core/src/connection/http.rs @@ -10,7 +10,8 @@ use anyhow::bail; use boring::ssl::{SslConnector, SslMethod, SslVerifyMode}; use boring::x509::X509; use boring::x509::store::X509StoreBuilder; -use http::{Request, Response}; +use http::header::HOST; +use http::{HeaderValue, Request, Response}; use hyper::body::Incoming; use hyper::client::conn::{http1, http2}; use hyper_util::rt::TokioIo; @@ -188,8 +189,11 @@ pub enum SendRequest { impl SendRequest { fn send_request( &mut self, - req: Request, + mut req: Request, ) -> Pin>> + Send>> { + // inject the host header it it's missing and this is an HTTP/1.1 req. + self.insert_host_if_missing(&mut req); + match self { SendRequest::H1 { dispatch: send_request, @@ -199,6 +203,18 @@ impl SendRequest { } => Box::pin(send_request.send_request(req)), } } + + fn insert_host_if_missing(&mut self, req: &mut Request) { + if !matches!(self, SendRequest::H1 { .. }) && !req.headers().contains_key(HOST) { + return; + } + + let Some(Ok(host)) = req.uri().host().map(HeaderValue::from_str) else { + return; + }; + + req.headers_mut().insert(HOST, host); + } } #[derive(Clone)] diff --git a/crates/nq-rpm/src/lib.rs b/crates/nq-rpm/src/lib.rs index 6ed65c2..b26e360 100644 --- a/crates/nq-rpm/src/lib.rs +++ b/crates/nq-rpm/src/lib.rs @@ -33,6 +33,8 @@ pub struct ResponsivenessConfig { pub trimmed_mean_percent: f64, pub std_tolerance: f64, pub max_loaded_connections: usize, + pub conn_type: ConnectionType, + pub determine_load_only: bool, } impl ResponsivenessConfig { @@ -62,6 +64,8 @@ impl Default for ResponsivenessConfig { trimmed_mean_percent: 0.95, std_tolerance: 0.05, max_loaded_connections: 16, + conn_type: ConnectionType::H2, + determine_load_only: false, } } } @@ -135,7 +139,10 @@ impl Responsiveness { let (event_tx, mut event_rx) = mpsc::channel(1024); self.new_load_generating_connection(event_tx.clone(), &env, shutdown.clone())?; - self.send_foreign_probe(event_tx.clone(), &env, shutdown.clone())?; + + if !self.config.determine_load_only { + self.send_foreign_probe(event_tx.clone(), &env, shutdown.clone())?; + } loop { select! { @@ -401,7 +408,7 @@ impl Responsiveness { ) -> anyhow::Result<()> { let oneshot_res = self.load_generator.new_loaded_connection( self.direction, - ConnectionType::H2, + self.config.conn_type, Arc::clone(&env.network), Arc::clone(&env.time), shutdown, From b81381eed85c42bcbd7ef748f952270b293d86f4 Mon Sep 17 00:00:00 2001 From: Fisher Darling Date: Thu, 6 Nov 2025 18:27:02 +0100 Subject: [PATCH 5/8] split ThroughputClient::send into smaller functions --- crates/nq-core/Cargo.toml | 4 +- crates/nq-core/src/client.rs | 214 +++++++++++++++++++++-------------- 2 files changed, 128 insertions(+), 90 deletions(-) diff --git a/crates/nq-core/Cargo.toml b/crates/nq-core/Cargo.toml index 82b7f1b..256b78a 100644 --- a/crates/nq-core/Cargo.toml +++ b/crates/nq-core/Cargo.toml @@ -18,7 +18,7 @@ hyper-util = { workspace = true, features = ["tokio"] } pin-project-lite = { workspace = true } rand = { workspace = true } rustls-native-certs = { workspace = true } -tokio = { workspace = true, features = ["rt-multi-thread", "time", "net", "test-util"] } +tokio = { workspace = true, features = ["rt-multi-thread", "time", "net", "test-util", "macros"] } tokio-boring = { workspace = true } tokio-util = { workspace = true } -tracing = { workspace = true } \ No newline at end of file +tracing = { workspace = true } diff --git a/crates/nq-core/src/client.rs b/crates/nq-core/src/client.rs index e85f7b8..d3a0c7d 100644 --- a/crates/nq-core/src/client.rs +++ b/crates/nq-core/src/client.rs @@ -92,15 +92,15 @@ impl ThroughputClient { } /// Execute a download or upload request against the given [`Uri`]. - #[tracing::instrument(skip(self, network, time, shutdown))] + // #[tracing::instrument(skip(self, network, time, shutdown))] pub fn send( - self, + mut self, uri: Uri, network: Arc, time: Arc, shutdown: CancellationToken, ) -> anyhow::Result> { - let mut headers = self.headers.unwrap_or_default(); + let mut headers = self.headers.take().unwrap_or_default(); if !headers.contains_key("User-Agent") { headers.insert("User-Agent", HeaderValue::from_static("mach/0.1.0")); @@ -166,25 +166,9 @@ impl ThroughputClient { async move { let start = time.now(); - let connection = if let Some(connection) = self.connection { - connection - } else if let Some(conn_type) = self.new_connection_type { - info!("creating new connection to {host_with_port}"); - - let addrs = network - .resolve(host_with_port) - .await - .context("unable to resolve host")?; - - debug!("addrs: {addrs:?}"); - - network - .new_connection(start, addrs[0], host, conn_type) - .await - .context("creating new connection")? - } else { - todo!() - }; + let connection = self + .get_or_create_connection(&network, host, host_with_port, start) + .await?; let conn_timing = { let conn = connection.read().await; @@ -194,72 +178,20 @@ impl ThroughputClient { debug!("connection used"); let response_fut = network.send_request(connection.clone(), request); - let mut response_body = match self.direction { - Direction::Up(_) => { - debug!("sending upload events"); - if tx - .send(Ok(InflightBody { - connection: connection.clone(), - timing: Some(conn_timing), - events: events.expect("events were set above"), - start, - headers, - })) - .is_err() - { - error!("error sending upload events"); - } - - let (parts, incoming) = response_fut.await?.into_parts(); - info!("upload response parts: {:?}", parts); - - incoming.boxed() - } - Direction::Down => { - let (parts, incoming) = response_fut.await?.into_parts(); - info!("download response parts: {:?}", parts); - - let (counting_body, events) = CountingBody::new( - incoming, - Duration::from_millis(100), - Arc::clone(&time), - ); - - debug!("sending download events"); - if tx - .send(Ok(InflightBody { - connection: connection.clone(), - timing: Some(conn_timing), - start, - events, - headers: parts.headers, - })) - .is_err() - { - error!("error sending download events"); - } - - counting_body.boxed() - } - }; - - tokio::spawn( - async move { - // Consume the response body and keep the connection alive. Stop if we hit an error. - info!("waiting for response body"); - - loop { - select! { - Some(res) = response_body.frame() => if let Err(e) = res { - error!("body closing: {e}"); - break; - }, - _ = shutdown.cancelled() => break, - } - } - } - .in_current_span(), - ); + let response_body = self + .create_response_body( + time, + headers, + tx, + events, + start, + connection, + conn_timing, + response_fut, + ) + .await?; + + tokio::spawn(consume_body(shutdown, response_body).in_current_span()); Ok::<_, anyhow::Error>(()) } @@ -268,6 +200,112 @@ impl ThroughputClient { Ok(rx) } + + async fn create_response_body( + &self, + time: Arc, + headers: HeaderMap, + tx: tokio::sync::oneshot::Sender>, + events: Option>, + start: Timestamp, + connection: Arc>, + conn_timing: crate::ConnectionTiming, + response_fut: OneshotResult>, + ) -> Result, anyhow::Error> { + let response_body = match self.direction { + Direction::Up(_) => { + debug!("sending upload events"); + if tx + .send(Ok(InflightBody { + connection: connection.clone(), + timing: Some(conn_timing), + events: events.expect("events were set above"), + start, + headers, + })) + .is_err() + { + error!("error sending upload events"); + } + + let (parts, incoming) = response_fut.await?.into_parts(); + info!("upload response parts: {:?}", parts); + + incoming.boxed() + } + Direction::Down => { + let (parts, incoming) = response_fut.await?.into_parts(); + info!("download response parts: {:?}", parts); + + let (counting_body, events) = + CountingBody::new(incoming, Duration::from_millis(100), Arc::clone(&time)); + + debug!("sending download events"); + if tx + .send(Ok(InflightBody { + connection: connection.clone(), + timing: Some(conn_timing), + start, + events, + headers: parts.headers, + })) + .is_err() + { + error!("error sending download events"); + } + + counting_body.boxed() + } + }; + Ok(response_body) + } + + async fn get_or_create_connection( + &mut self, + network: &Arc, + host: String, + host_with_port: String, + start: Timestamp, + ) -> Result>, anyhow::Error> { + let connection = if let Some(connection) = self.connection.take() { + connection + } else if let Some(conn_type) = self.new_connection_type { + info!("creating new connection to {host_with_port}"); + + let addrs = network + .resolve(host_with_port) + .await + .context("unable to resolve host")?; + + debug!("addrs: {addrs:?}"); + + network + .new_connection(start, addrs[0], host, conn_type) + .await + .context("creating new connection")? + } else { + todo!() + }; + + Ok(connection) + } +} + +async fn consume_body( + shutdown: CancellationToken, + response_body: http_body_util::combinators::BoxBody, +) -> impl Future { + // Consume the response body and keep the connection alive. Stop if we hit an error. + info!("waiting for response body"); + loop { + select! { + Some(res) = response_body.frame() => if let Err(e) = res { + error!("body closing: {e}"); + break; + }, + _ = shutdown.cancelled() => break, + } + } } /// A [`Client`] is a simple client which sends a request and returns a response. From e4bcd988bdd071f80d148d56da9c317d23494813 Mon Sep 17 00:00:00 2001 From: Fisher Darling Date: Thu, 6 Nov 2025 18:41:31 +0100 Subject: [PATCH 6/8] cleanup logging and add error logging around send_request --- cli/src/up_down.rs | 2 - crates/nq-core/src/client.rs | 104 ++++++++++++++++---------- crates/nq-core/src/connection/http.rs | 2 +- crates/nq-tokio-network/src/lib.rs | 10 +-- 4 files changed, 71 insertions(+), 47 deletions(-) diff --git a/cli/src/up_down.rs b/cli/src/up_down.rs index d360670..8d84140 100644 --- a/cli/src/up_down.rs +++ b/cli/src/up_down.rs @@ -106,8 +106,6 @@ pub async fn upload(args: UploadArgs) -> anyhow::Result<()> { .timing .context("expected inflight body to have connection timing data")?; - info!("headers: {:?}", inflight_body.headers); - let body_start = time.now(); let finished_result = wait_for_finish(inflight_body.events).await?; let finished = time.now(); diff --git a/crates/nq-core/src/client.rs b/crates/nq-core/src/client.rs index d3a0c7d..c5d5d2b 100644 --- a/crates/nq-core/src/client.rs +++ b/crates/nq-core/src/client.rs @@ -16,7 +16,7 @@ use hyper::body::{Body, Bytes, Incoming}; use tokio::select; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; -use tracing::{Instrument, debug, error, info}; +use tracing::{Instrument, debug, error, info, trace}; use crate::{ ConnectionType, EstablishedConnection, Network, OneshotResult, Time, Timestamp, @@ -107,12 +107,6 @@ impl ThroughputClient { } let host = uri.host().context("uri is missing a host")?.to_string(); - debug!( - "host: {}, uri: {}, uri.scheme: {:?}", - host, - uri, - uri.scheme_str() - ); let host_with_port = format!( "{}:{}", host, @@ -124,7 +118,7 @@ impl ThroughputClient { } }) ); - debug!("host with port: {host_with_port}"); + debug!("host: {host_with_port}"); let method = match self.direction { Direction::Down => "GET", @@ -136,7 +130,7 @@ impl ThroughputClient { let body: NqBody = match self.direction { Direction::Up(size) => { - tracing::debug!("tracking upload body"); + tracing::trace!("tracking upload body"); let dummy_body = UploadBody::new(size); let (body, events_rx) = @@ -164,36 +158,22 @@ impl ThroughputClient { tokio::spawn( async move { - let start = time.now(); - - let connection = self - .get_or_create_connection(&network, host, host_with_port, start) - .await?; - - let conn_timing = { - let conn = connection.read().await; - conn.timing() - }; - - debug!("connection used"); - let response_fut = network.send_request(connection.clone(), request); - - let response_body = self - .create_response_body( + if let Err(error) = self + .send_request( + network, time, + shutdown, headers, + host, + host_with_port, tx, events, - start, - connection, - conn_timing, - response_fut, + request, ) - .await?; - - tokio::spawn(consume_body(shutdown, response_body).in_current_span()); - - Ok::<_, anyhow::Error>(()) + .await + { + error!("error sending ThroughputClient request: {error:#}"); + } } .in_current_span(), ); @@ -201,6 +181,51 @@ impl ThroughputClient { Ok(rx) } + #[allow(clippy::too_many_arguments)] + async fn send_request( + mut self, + network: Arc, + time: Arc, + shutdown: CancellationToken, + headers: HeaderMap, + host: String, + host_with_port: String, + tx: tokio::sync::oneshot::Sender>, + events: Option>, + request: http::Request>, + ) -> Result, anyhow::Error> { + let start = time.now(); + let connection = self + .get_or_create_connection(&network, host, host_with_port, start) + .await?; + let conn_timing = { + let conn = connection.read().await; + conn.timing() + }; + + debug!("sending request"); + let response_fut = network.send_request(connection.clone(), request); + + let response_body = self + .create_response_body( + time, + headers, + tx, + events, + start, + connection, + conn_timing, + response_fut, + ) + .await + .context("creating response body")?; + + tokio::spawn(consume_body(shutdown, response_body).in_current_span()); + + Ok(Ok::<_, anyhow::Error>(())) + } + + #[allow(clippy::too_many_arguments)] async fn create_response_body( &self, time: Arc, @@ -214,7 +239,7 @@ impl ThroughputClient { ) -> Result, anyhow::Error> { let response_body = match self.direction { Direction::Up(_) => { - debug!("sending upload events"); + trace!("sending upload events"); if tx .send(Ok(InflightBody { connection: connection.clone(), @@ -228,7 +253,10 @@ impl ThroughputClient { error!("error sending upload events"); } - let (parts, incoming) = response_fut.await?.into_parts(); + let (parts, incoming) = response_fut + .await + .context("waiting for response")? + .into_parts(); info!("upload response parts: {:?}", parts); incoming.boxed() @@ -293,8 +321,8 @@ impl ThroughputClient { async fn consume_body( shutdown: CancellationToken, - response_body: http_body_util::combinators::BoxBody, -) -> impl Future { + mut response_body: http_body_util::combinators::BoxBody, +) { // Consume the response body and keep the connection alive. Stop if we hit an error. info!("waiting for response body"); loop { diff --git a/crates/nq-core/src/connection/http.rs b/crates/nq-core/src/connection/http.rs index 9131be0..ef70bef 100644 --- a/crates/nq-core/src/connection/http.rs +++ b/crates/nq-core/src/connection/http.rs @@ -117,7 +117,7 @@ pub async fn start_h1_conn( async move { select! { Err(e) = connection => { - error!(error=%e, "error running h1 connection"); + debug!(error=%e, "error running h1 connection"); } _ = shutdown.cancelled() => { debug!("shutting down h1 connection"); diff --git a/crates/nq-tokio-network/src/lib.rs b/crates/nq-tokio-network/src/lib.rs index bc4128e..72ccf66 100644 --- a/crates/nq-tokio-network/src/lib.rs +++ b/crates/nq-tokio-network/src/lib.rs @@ -15,7 +15,7 @@ use nq_core::{ use tokio::net::TcpStream; use tokio_util::sync::CancellationToken; -use tracing::{Instrument, error, info}; +use tracing::{Instrument, debug, error, info, trace}; #[derive(Debug, Clone)] pub struct TokioNetwork { @@ -87,8 +87,7 @@ impl Network for TokioNetwork { let inner = self.inner.clone(); tokio::spawn( async move { - info!("sending request"); - + trace!("sending request"); let response_result = match inner.send_request(connection, request).await { Ok(fut) => fut.await, Err(error) => { @@ -105,7 +104,7 @@ impl Network for TokioNetwork { } }; - info!("sending response future"); + trace!("sending response future"); let _ = tx.send(Ok(response)); } .in_current_span(), @@ -174,13 +173,12 @@ impl TokioNetworkInner { Ok(connection) } - #[tracing::instrument(skip(self, request), fields(uri=%request.uri()))] async fn send_request( &self, connection: Arc>, request: http::Request, ) -> anyhow::Result { - info!("sending request"); + debug!("sending request"); let mut conn = connection.write().await; let response_fut = conn From 2b375ad320ced1746307b55b114ee237e3444d25 Mon Sep 17 00:00:00 2001 From: Fisher Darling Date: Mon, 17 Nov 2025 15:20:36 +0100 Subject: [PATCH 7/8] fix body upload counting --- crates/nq-core/src/body/counting_body.rs | 42 +++++++++++++++++++++--- crates/nq-core/src/body/upload_body.rs | 10 ++++-- crates/nq-core/src/client.rs | 18 +++++++--- 3 files changed, 59 insertions(+), 11 deletions(-) diff --git a/crates/nq-core/src/body/counting_body.rs b/crates/nq-core/src/body/counting_body.rs index 4c47700..f881e58 100644 --- a/crates/nq-core/src/body/counting_body.rs +++ b/crates/nq-core/src/body/counting_body.rs @@ -98,6 +98,8 @@ where } trace!("polling frame"); + let size_hint_before = this.inner.size_hint(); + match this.inner.poll_frame(cx) { Poll::Ready(Some(Ok(frame))) => { if let Some(data) = frame.data_ref() { @@ -123,25 +125,55 @@ where let _ = this.events_tx.send(event); } + // Check if the body signals it's at the end after sending this + // frame. This handles cases where hyper won't poll the body + // again after the last frame. We use the size hint from before + // polling: if it shows exact remaining bytes equal to what we + // just sent, then this was the last frame. + // + // This does only work if the size of the body is known. E.g. + // this probably doesn't work for streamed bodies + // + // This fix works for RPM and saturation tests, so it should be + // sufficient for now. Ideally, we'd use TCP socket stats to + // calculate total throughput, but that's for later. + let frame_size = frame.data_ref().map(|d| d.len()).unwrap_or(0); + let was_last_frame = size_hint_before.upper() == Some(frame_size as u64) + && size_hint_before.lower() == frame_size as u64; + + if was_last_frame && !*this.sent_finished { + debug!( + total = *this.total, + "detected last frame via size_hint, sending finished event" + ); + let _ = this.events_tx.send(BodyEvent::ByteCount { + at: now, + total: *this.total, + }); + let _ = this.events_tx.send(BodyEvent::Finished { at: now }); + *this.sent_finished = true; + } + Poll::Ready(Some(Ok(frame))) } // Stream finished, send the last count Poll::Ready(None) => { let now = this.time.now(); - - debug!("body finished"); let event = BodyEvent::ByteCount { at: now, total: *this.total, }; - debug!(?event, "sending event"); + debug!( + ?event, + total = *this.total, + "sending final byte count event" + ); let _ = this.events_tx.send(event); if !*this.sent_finished { - debug!(at=?now, "sending finished"); + debug!(at=?now, "sending finished event"); let _ = this.events_tx.send(BodyEvent::Finished { at: now }); - *this.sent_finished = true; } else { debug!("already sent finish"); } diff --git a/crates/nq-core/src/body/upload_body.rs b/crates/nq-core/src/body/upload_body.rs index 88594b5..638c54b 100644 --- a/crates/nq-core/src/body/upload_body.rs +++ b/crates/nq-core/src/body/upload_body.rs @@ -20,6 +20,7 @@ pub struct UploadBody { remaining: usize, chunk: Bytes, rng: StdRng, + finished: bool, } impl UploadBody { @@ -35,6 +36,7 @@ impl UploadBody { remaining: size, chunk: Bytes::from(chunk), rng, + finished: false, } } } @@ -54,7 +56,10 @@ impl Body for UploadBody { ); Poll::Ready(match self.remaining { - 0 => None, + 0 => { + self.finished = true; + None + } remaining if remaining > self.chunk.len() => { self.remaining -= self.chunk.len(); // Use BytesMut for in-place modifications @@ -67,13 +72,14 @@ impl Body for UploadBody { } remaining => { self.remaining = 0; + self.finished = true; Some(Ok(Frame::data(self.chunk.slice(..remaining)))) } }) } fn is_end_stream(&self) -> bool { - self.remaining == 0 + self.finished } fn size_hint(&self) -> SizeHint { diff --git a/crates/nq-core/src/client.rs b/crates/nq-core/src/client.rs index c5d5d2b..33c471a 100644 --- a/crates/nq-core/src/client.rs +++ b/crates/nq-core/src/client.rs @@ -172,7 +172,7 @@ impl ThroughputClient { ) .await { - error!("error sending ThroughputClient request: {error:#}"); + debug!("error sending ThroughputClient request: {error:#}"); } } .in_current_span(), @@ -327,9 +327,19 @@ async fn consume_body( info!("waiting for response body"); loop { select! { - Some(res) = response_body.frame() => if let Err(e) = res { - error!("body closing: {e}"); - break; + res = response_body.frame() => match res { + Some(Ok(_)) => { + // Continue consuming frames + }, + Some(Err(e)) => { + error!("body closing: {e}"); + break; + }, + None => { + // Body finished successfully + debug!("response body finished"); + break; + } }, _ = shutdown.cancelled() => break, } From f1b688ee7fe6a78e45a6858c636b567b7acd2190 Mon Sep 17 00:00:00 2001 From: Fisher Darling Date: Mon, 17 Nov 2025 17:48:04 +0100 Subject: [PATCH 8/8] Bump mach-cli version to 0.2.0 --- Cargo.lock | 2 +- cli/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7e20189..fec28ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1159,7 +1159,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "mach-cli" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "clap", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index e04be99..a9fa990 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mach-cli" -version = "0.1.0" +version = "0.2.0" authors = ["Fisher Darling "] edition = "2021"