Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
730 changes: 128 additions & 602 deletions Cargo.lock

Large diffs are not rendered by default.

1,905 changes: 325 additions & 1,580 deletions LICENSE-3rdparty.yml

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions libdd-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ http-body-util = "0.1"
tower-service = "0.3"
cc = "1.1.31"
mime = { version = "0.3.16", optional = true }
multipart = { version = "0.18", optional = true }
multer = { version = "3.1", optional = true }
bytes = { version = "1.4", optional = true }
pin-project = "1"
rand = { version = "0.8", optional = true }
regex = "1.5"
Expand Down Expand Up @@ -71,7 +72,8 @@ httparse = "1.9"
indexmap = "2.11"
maplit = "1.0"
mime = "0.3.16"
multipart = "0.18"
multer = "3.1"
bytes = "1.4"
rand = "0.8"
tempfile = "3.8"
tokio = { version = "1.23", features = ["rt", "macros", "time"] }
Expand All @@ -88,7 +90,7 @@ fips = ["https", "hyper-rustls/fips"]
# Enable reqwest client builder support with file dump debugging
reqwest = ["dep:reqwest", "test-utils"]
# Enable test utilities for use in other crates
test-utils = ["dep:httparse", "dep:rand", "dep:mime", "dep:multipart"]
test-utils = ["dep:httparse", "dep:rand", "dep:mime", "dep:multer", "dep:bytes"]

[lints.rust]
# We run coverage checks in our github actions. These checks are run with
Expand Down
193 changes: 140 additions & 53 deletions libdd-common/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,27 +99,16 @@ pub struct MultipartPart {
pub content: Vec<u8>,
}

/// Parse an HTTP request from raw bytes
///
/// If the Content-Type header indicates multipart/form-data, the multipart body will be
/// automatically parsed and available in the `multipart_parts` field.
///
/// # Arguments
/// * `data` - Raw HTTP request bytes including headers and body
///
/// # Returns
/// A parsed `HttpRequest` or an error if parsing fails
///
/// # Example
/// ```no_run
/// use libdd_common::test_utils::parse_http_request;
///
/// let request_bytes = b"POST /v1/input HTTP/1.1\r\nHost: example.com\r\n\r\nbody";
/// let request = parse_http_request(request_bytes).unwrap();
/// assert_eq!(request.method, "POST");
/// assert_eq!(request.path, "/v1/input");
/// ```
pub fn parse_http_request(data: &[u8]) -> anyhow::Result<HttpRequest> {
/// Parsed HTTP request components without multipart parsing.
/// This is the shared result from `parse_http_request_headers`.
struct ParsedRequestParts {
method: String,
path: String,
headers: HashMap<String, String>,
body: Vec<u8>,
}

fn parse_http_request_headers(data: &[u8]) -> anyhow::Result<ParsedRequestParts> {
let mut header_buf = [httparse::EMPTY_HEADER; 64];
let mut req = httparse::Request::new(&mut header_buf);

Expand All @@ -131,7 +120,6 @@ pub fn parse_http_request(data: &[u8]) -> anyhow::Result<HttpRequest> {
let method = req.method.context("No method found")?.to_string();
let path = req.path.context("No path found")?.to_string();

// Convert headers to HashMap with lowercase keys
let mut headers = HashMap::new();
for header in req.headers {
let key = header.name.to_lowercase();
Expand All @@ -141,51 +129,131 @@ pub fn parse_http_request(data: &[u8]) -> anyhow::Result<HttpRequest> {

let body = data[headers_len..].to_vec();

// Auto-parse multipart if Content-Type indicates multipart/form-data
let multipart_parts = match headers.get("content-type") {
Some(ct) if ct.contains("multipart/form-data") => parse_multipart(ct, &body)?,
_ => Vec::new(),
};

Ok(HttpRequest {
Ok(ParsedRequestParts {
method,
path,
headers,
body,
multipart_parts,
})
}

/// Parse multipart form data from Content-Type header and body
///
/// Extracts the boundary from the Content-Type header and parses the multipart body.
fn parse_multipart(content_type: &str, body: &[u8]) -> anyhow::Result<Vec<MultipartPart>> {
use multipart::server::Multipart;
use std::io::Cursor;

// Extract boundary from Content-Type header
fn extract_multipart_boundary(content_type: &str) -> anyhow::Result<String> {
let mime: mime::Mime = content_type
.parse()
.map_err(|e| anyhow::anyhow!("Failed to parse Content-Type as MIME type: {}", e))?;

let boundary = mime
.get_param(mime::BOUNDARY)
.context("No boundary parameter found in Content-Type")?
.as_str();
.to_string();

// Parse multipart body
let cursor = Cursor::new(body);
let mut multipart = Multipart::with_body(cursor, boundary);
let mut parts = Vec::new();
Ok(boundary)
}

while let Some(mut field) = multipart.read_entry()? {
let headers = &field.headers;
let name = headers.name.to_string();
let filename = headers.filename.clone();
let content_type = headers.content_type.as_ref().map(|ct| ct.to_string());
/// Parse an HTTP request from raw bytes (async version).
///
/// If the Content-Type header indicates multipart/form-data, the multipart body will be
/// automatically parsed and available in the `multipart_parts` field.
///
/// Use this function in async contexts (e.g., `#[tokio::test]`). For synchronous contexts,
/// use [`parse_http_request_sync`] instead.
///
/// # Arguments
/// * `data` - Raw HTTP request bytes including headers and body
///
/// # Returns
/// A parsed `HttpRequest` or an error if parsing fails
///
/// # Example
/// ```no_run
/// use libdd_common::test_utils::parse_http_request;
///
/// # async fn example() -> anyhow::Result<()> {
/// let request_bytes = b"POST /v1/input HTTP/1.1\r\nHost: example.com\r\n\r\nbody";
/// let request = parse_http_request(request_bytes).await?;
/// assert_eq!(request.method, "POST");
/// assert_eq!(request.path, "/v1/input");
/// # Ok(())
/// # }
/// ```
pub async fn parse_http_request(data: &[u8]) -> anyhow::Result<HttpRequest> {
let parts = parse_http_request_headers(data)?;

let mut content = Vec::new();
std::io::Read::read_to_end(&mut field.data, &mut content)?;
// Auto-parse multipart if Content-Type indicates multipart/form-data
let multipart_parts = match parts.headers.get("content-type") {
Some(ct) if ct.contains("multipart/form-data") => {
let boundary = extract_multipart_boundary(ct)?;
parse_multipart(boundary, parts.body.clone()).await?
}
_ => Vec::new(),
};

Ok(HttpRequest {
method: parts.method,
path: parts.path,
headers: parts.headers,
body: parts.body,
multipart_parts,
})
}

/// Parse an HTTP request from raw bytes (sync version).
///
/// If the Content-Type header indicates multipart/form-data, the multipart body will be
/// automatically parsed and available in the `multipart_parts` field.
///
/// **Note:** This function uses `futures::executor::block_on` internally for multipart parsing.
/// In async contexts (e.g., `#[tokio::test]`), prefer [`parse_http_request`] to avoid blocking
/// the async runtime.
///
/// # Arguments
/// * `data` - Raw HTTP request bytes including headers and body
///
/// # Returns
/// A parsed `HttpRequest` or an error if parsing fails
///
/// # Example
/// ```no_run
/// use libdd_common::test_utils::parse_http_request_sync;
///
/// let request_bytes = b"POST /v1/input HTTP/1.1\r\nHost: example.com\r\n\r\nbody";
/// let request = parse_http_request_sync(request_bytes).unwrap();
/// assert_eq!(request.method, "POST");
/// assert_eq!(request.path, "/v1/input");
/// ```
pub fn parse_http_request_sync(data: &[u8]) -> anyhow::Result<HttpRequest> {
let parts = parse_http_request_headers(data)?;

// Auto-parse multipart if Content-Type indicates multipart/form-data
let multipart_parts = match parts.headers.get("content-type") {
Some(ct) if ct.contains("multipart/form-data") => {
let boundary = extract_multipart_boundary(ct)?;
futures::executor::block_on(parse_multipart(boundary, parts.body.clone()))?
}
_ => Vec::new(),
};

Ok(HttpRequest {
method: parts.method,
path: parts.path,
headers: parts.headers,
body: parts.body,
multipart_parts,
})
}

async fn parse_multipart(boundary: String, body: Vec<u8>) -> anyhow::Result<Vec<MultipartPart>> {
use futures_util::stream::once;

let stream = once(async move { Ok::<_, std::io::Error>(bytes::Bytes::from(body)) });
let mut multipart = multer::Multipart::new(stream, boundary);
let mut parts = Vec::new();

while let Some(field) = multipart.next_field().await? {
let name = field.name().unwrap_or_default().to_string();
let filename = field.file_name().map(|s| s.to_string());
let content_type = field.content_type().map(|m| m.to_string());
let content = field.bytes().await?.to_vec();

parts.push(MultipartPart {
name,
Expand Down Expand Up @@ -232,7 +300,7 @@ mod tests {
#[test]
fn test_parse_http_request_basic() {
let request = b"POST /v1/input HTTP/1.1\r\nHost: example.com\r\nContent-Type: application/json\r\n\r\n{\"test\":true}";
let parsed = parse_http_request(request).unwrap();
let parsed = parse_http_request_sync(request).unwrap();

assert_eq!(parsed.method, "POST");
assert_eq!(parsed.path, "/v1/input");
Expand All @@ -249,7 +317,7 @@ mod tests {
fn test_parse_http_request_with_custom_headers() {
let request =
b"GET /test HTTP/1.1\r\nX-Custom-Header: value\r\nAnother-Header: 123\r\n\r\n";
let parsed = parse_http_request(request).unwrap();
let parsed = parse_http_request_sync(request).unwrap();

assert_eq!(parsed.method, "GET");
assert_eq!(parsed.path, "/test");
Expand All @@ -270,7 +338,26 @@ mod tests {
let mut request_bytes = request.into_bytes();
request_bytes.extend_from_slice(body);

let parsed = parse_http_request(&request_bytes).unwrap();
let parsed = parse_http_request_sync(&request_bytes).unwrap();

assert_eq!(parsed.method, "POST");
assert_eq!(parsed.multipart_parts.len(), 1);
assert_eq!(parsed.multipart_parts[0].name, "field");
assert_eq!(parsed.multipart_parts[0].content, b"value");
}

#[tokio::test]
async fn test_parse_http_request_async_with_multipart() {
let content_type = "multipart/form-data; boundary=----WebKitFormBoundary";
let body = b"------WebKitFormBoundary\r\nContent-Disposition: form-data; name=\"field\"\r\n\r\nvalue\r\n------WebKitFormBoundary--";
let request = format!(
"POST /v1/input HTTP/1.1\r\nHost: example.com\r\nContent-Type: {}\r\n\r\n",
content_type
);
let mut request_bytes = request.into_bytes();
request_bytes.extend_from_slice(body);

let parsed = parse_http_request(&request_bytes).await.unwrap();

assert_eq!(parsed.method, "POST");
assert_eq!(parsed.multipart_parts.len(), 1);
Expand Down
4 changes: 3 additions & 1 deletion libdd-common/tests/reqwest_builder_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ mod tests {
let captured = std::fs::read(&*file_path).expect("should read dump file");

// Parse and validate
let request = parse_http_request(&captured).expect("should parse captured request");
let request = parse_http_request(&captured)
.await
.expect("should parse captured request");

assert_eq!(request.method, "POST");
assert_eq!(request.path, "/");
Expand Down
3 changes: 1 addition & 2 deletions libdd-profiling/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ bench = false
[features]
default = []
cxx = ["dep:cxx", "dep:cxx-build"]
test-utils = ["dep:multipart"]
test-utils = ["libdd-common/test-utils"]

[[bench]]
name = "main"
Expand All @@ -45,7 +45,6 @@ libdd-alloc = { version = "1.0.0", path = "../libdd-alloc" }
libdd-common = { version = "1.1.0", path = "../libdd-common", default-features = false, features = ["reqwest", "test-utils"] }
libdd-profiling-protobuf = { version = "1.0.0", path = "../libdd-profiling-protobuf", features = ["prost_impls"] }
mime = "0.3.16"
multipart = { version = "0.18", optional = true }
parking_lot = { version = "0.12", default-features = false }
prost = "0.14.1"
rand = "0.8"
Expand Down
8 changes: 4 additions & 4 deletions libdd-profiling/tests/exporter_e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

mod common;

use libdd_common::test_utils::parse_http_request;
use libdd_common::test_utils::{parse_http_request, parse_http_request_sync};
use libdd_profiling::exporter::config;
use libdd_profiling::exporter::{File, ProfileExporter};
use libdd_profiling::internal::EncodedProfile;
Expand Down Expand Up @@ -187,7 +187,7 @@ async fn read_and_capture_request<S>(
}
}

if let Ok(req) = parse_http_request(&buffer) {
if let Ok(req) = parse_http_request(&buffer).await {
received_requests.lock().unwrap().push(ReceivedRequest {
method: req.method,
path: req.path,
Expand Down Expand Up @@ -277,7 +277,7 @@ async fn export_full_profile(
RequestSource::File(path) => {
// No sleep needed - send_blocking() waits for file to be synced
let request_bytes = std::fs::read(&path)?;
let req = parse_http_request(&request_bytes)?;
let req = parse_http_request(&request_bytes).await?;
Ok(ReceivedRequest {
method: req.method,
path: req.path,
Expand Down Expand Up @@ -317,7 +317,7 @@ fn validate_full_export(req: &ReceivedRequest, expected_path: &str) -> anyhow::R
http_request_bytes.extend_from_slice(b"\r\n");
http_request_bytes.extend_from_slice(&req.body);

let parsed_req = parse_http_request(&http_request_bytes)?;
let parsed_req = parse_http_request_sync(&http_request_bytes)?;
let parts = &parsed_req.multipart_parts;

// Find event JSON
Expand Down
12 changes: 6 additions & 6 deletions libdd-profiling/tests/file_exporter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

mod common;

use libdd_common::test_utils::{create_temp_file_path, parse_http_request, TempFileGuard};
use libdd_common::test_utils::{create_temp_file_path, parse_http_request_sync, TempFileGuard};
use libdd_profiling::exporter::ProfileExporter;
use libdd_profiling::internal::EncodedProfile;

Expand Down Expand Up @@ -72,7 +72,7 @@ mod tests {
let request_bytes = std::fs::read(&file_path).expect("read dump file");

// Parse HTTP request
let request = parse_http_request(&request_bytes).expect("parse HTTP request");
let request = parse_http_request_sync(&request_bytes).expect("parse HTTP request");

// Validate request line
assert_eq!(request.method, "POST");
Expand Down Expand Up @@ -182,7 +182,7 @@ mod tests {
let request_bytes = std::fs::read(&file_path).expect("read dump file");

// Parse and validate
let request = parse_http_request(&request_bytes).expect("parse HTTP request");
let request = parse_http_request_sync(&request_bytes).expect("parse HTTP request");
let event_part = request
.multipart_parts
.iter()
Expand Down Expand Up @@ -231,7 +231,7 @@ mod tests {
let request_bytes = std::fs::read(&file_path).expect("read dump file");

// Parse and validate
let request = parse_http_request(&request_bytes).expect("parse HTTP request");
let request = parse_http_request_sync(&request_bytes).expect("parse HTTP request");
let event_part = request
.multipart_parts
.iter()
Expand Down Expand Up @@ -287,7 +287,7 @@ mod tests {
let request_bytes = std::fs::read(&file_path).expect("read dump file");

// Parse and validate
let request = parse_http_request(&request_bytes).expect("parse HTTP request");
let request = parse_http_request_sync(&request_bytes).expect("parse HTTP request");
let event_part = request
.multipart_parts
.iter()
Expand Down Expand Up @@ -327,7 +327,7 @@ mod tests {
let request_bytes = std::fs::read(&file_path).expect("read dump file");

// Parse HTTP request
let request = parse_http_request(&request_bytes).expect("parse HTTP request");
let request = parse_http_request_sync(&request_bytes).expect("parse HTTP request");

// Validate headers - API key should be present
assert_eq!(request.headers.get("dd-api-key").unwrap(), api_key);
Expand Down
Loading