diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml new file mode 100644 index 0000000..d837262 --- /dev/null +++ b/.github/workflows/bench.yml @@ -0,0 +1,53 @@ +name: Bench + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + bench: + runs-on: ubuntu-latest + + strategy: + matrix: + rust: [stable, nightly] + + steps: + - uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@master + with: + toolchain: ${{ matrix.rust }} + components: rustfmt + + - name: Cache cargo dependencies + uses: actions/cache@v4 + with: + path: | + ~/.cargo/bin/ + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + target/ + key: ${{ runner.os }}-cargo-${{ matrix.rust }}-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-${{ matrix.rust }}- + ${{ runner.os }}-cargo- + + - name: Build + run: cargo build --release + + - name: Run benchmarks for mill-io + run: cargo bench -p mill-io + + - name: Run benchmarks for mill-net + run: cargo bench -p mill-net + + - name: Upload benchmark results + uses: actions/upload-artifact@v4 + with: + name: benchmark-results-${{ matrix.rust }} + path: target/criterion \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 86a3c89..09db920 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ resolver = "2" [workspace.package] version = "2.0.1" -authors = ["Citadel-tech", "Mohamed Emad "] +authors = ["Citadel-tech", "Mohamed Emad"] license = "Apache-2.0" edition = "2021" rust-version = "1.70" diff --git a/mill-io/Cargo.toml b/mill-io/Cargo.toml index 2cee2e9..0514363 100644 --- a/mill-io/Cargo.toml +++ b/mill-io/Cargo.toml @@ -22,6 +22,7 @@ parking_lot.workspace = true serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.141" mill-net = { path = "../mill-net" } +criterion.workspace = true [features] default = [] @@ -29,4 +30,8 @@ unstable-mpmc = [] unstable = ["unstable-mpmc"] [target.'cfg(target_os = "linux")'.dev-dependencies] -inotify = "0.10" \ No newline at end of file +inotify = "0.10" + +[[bench]] +name = "comprehensive" +harness = false diff --git a/mill-io/benches/comprehensive.rs b/mill-io/benches/comprehensive.rs new file mode 100644 index 0000000..e4c9f77 --- /dev/null +++ b/mill-io/benches/comprehensive.rs @@ -0,0 +1,335 @@ +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use mill_io::{EventHandler, EventLoop, ObjectPool, TaskPriority}; +use mio::{ + event::Event, + net::{TcpListener, TcpStream}, + Interest, Token, +}; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Barrier, + }, + thread, + time::{Duration, Instant}, +}; + +fn bench_threadpool_latency(c: &mut Criterion) { + let mut group = c.benchmark_group("threadpool_latency"); + + for pool_size in [1, 4, 8] { + group.bench_with_input( + BenchmarkId::from_parameter(pool_size), + &pool_size, + |b, &size| { + let event_loop = EventLoop::new(size, 1024, 100).unwrap(); + + b.iter(|| { + let done = Arc::new(AtomicBool::new(false)); + let d = done.clone(); + let start = Instant::now(); + + event_loop.spawn_compute(move || { + d.store(true, Ordering::Release); + }); + + while !done.load(Ordering::Acquire) { + thread::yield_now(); + } + + black_box(start.elapsed()); + }); + }, + ); + } + group.finish(); +} + +fn bench_compute_priority_ordering(c: &mut Criterion) { + let mut group = c.benchmark_group("compute_priority"); + + group.bench_function("priority_enforcement", |b| { + b.iter(|| { + let event_loop = EventLoop::new(1, 1024, 100).unwrap(); // Single thread + let order = Arc::new(std::sync::Mutex::new(Vec::new())); + let barrier = Arc::new(Barrier::new(2)); + + let b1 = barrier.clone(); + event_loop.spawn_compute_with_priority( + move || { + b1.wait(); + thread::sleep(Duration::from_millis(10)); + }, + TaskPriority::Low, + ); + + barrier.wait(); + let priorities = [ + TaskPriority::Low, + TaskPriority::High, + TaskPriority::Critical, + TaskPriority::Normal, + ]; + + let completion = Arc::new(Barrier::new(5)); // 4 tasks + main + + for (i, &priority) in priorities.iter().enumerate() { + let o = order.clone(); + let c = completion.clone(); + event_loop.spawn_compute_with_priority( + move || { + o.lock().unwrap().push((i, priority)); + c.wait(); + }, + priority, + ); + } + + completion.wait(); + }); + }); + + group.finish(); +} + +fn bench_compute_pool_metrics_overhead(c: &mut Criterion) { + let mut group = c.benchmark_group("compute_metrics"); + + group.bench_function("metrics_collection", |b| { + let event_loop = EventLoop::default(); + + b.iter(|| { + for _ in 0..1000 { + event_loop.spawn_compute(|| { + black_box(2 + 2); + }); + } + + let metrics = event_loop.get_compute_metrics(); + black_box(metrics.tasks_submitted()); + black_box(metrics.queue_depth_high()); + }); + }); + + group.finish(); +} + +fn bench_object_pool_allocation(c: &mut Criterion) { + let mut group = c.benchmark_group("object_pool"); + group.throughput(Throughput::Elements(1000)); + + group.bench_function("with_pool_sized", |b| { + let pool = ObjectPool::new(1000, || vec![0u8; 8192]); // Match iteration count + + b.iter(|| { + for _ in 0..1000 { + let obj = pool.acquire(); + black_box(obj.as_ref()); + } + }); + }); + + group.bench_function("with_pool_small", |b| { + let pool = ObjectPool::new(100, || vec![0u8; 8192]); + + b.iter(|| { + for _ in 0..1000 { + let obj = pool.acquire(); + black_box(obj.as_ref()); + } + }); + }); + + group.bench_function("without_pool", |b| { + b.iter(|| { + for _ in 0..1000 { + let obj = vec![0u8; 8192]; + black_box(&obj); + } + }); + }); + + group.bench_function("with_pool_reuse", |b| { + let pool = ObjectPool::new(10, || vec![0u8; 8192]); + + b.iter(|| { + for _ in 0..1000 { + let mut obj = pool.acquire(); + obj.as_mut()[0] = 42; + black_box(obj.as_ref()); + } + }); + }); + + group.finish(); +} + +fn bench_object_pool_contention(c: &mut Criterion) { + let mut group = c.benchmark_group("object_pool_contention"); + + for thread_count in [1, 2, 4, 8] { + group.bench_with_input( + BenchmarkId::from_parameter(thread_count), + &thread_count, + |b, &threads| { + let pool = Arc::new(ObjectPool::new(50, || vec![0u8; 8192])); + + b.iter(|| { + let barrier = Arc::new(Barrier::new(threads + 1)); + let mut handles = vec![]; + + for _ in 0..threads { + let p = pool.clone(); + let b = barrier.clone(); + + handles.push(thread::spawn(move || { + b.wait(); + for _ in 0..500 { + let obj = p.acquire(); + black_box(obj.as_ref()); + } + })); + } + + barrier.wait(); + for h in handles { + h.join().unwrap(); + } + }); + }, + ); + } + + group.finish(); +} + +fn bench_event_dispatch_rate(c: &mut Criterion) { + let mut group = c.benchmark_group("event_dispatch"); + + group.bench_function("tcp_events", |b| { + b.iter(|| { + let event_loop = Arc::new(EventLoop::default()); + let counter = Arc::new(AtomicUsize::new(0)); + + // Create listener + let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + let mut listener = TcpListener::bind(addr).unwrap(); + let listener_addr = listener.local_addr().unwrap(); + + struct CountHandler { + counter: Arc, + } + + impl EventHandler for CountHandler { + fn handle_event(&self, _: &Event) { + self.counter.fetch_add(1, Ordering::Relaxed); + } + } + + event_loop + .register( + &mut listener, + Token(1), + Interest::READABLE, + CountHandler { + counter: counter.clone(), + }, + ) + .unwrap(); + + let el = event_loop.clone(); + let handle = thread::spawn(move || { + let _ = el.run(); + }); + + // Generate events + for _ in 0..100 { + let _ = TcpStream::connect(listener_addr); + } + + event_loop.stop(); + let _ = handle.join(); + + black_box(counter.load(Ordering::Relaxed)); + }); + }); + + group.finish(); +} + +fn bench_registration_deregistration(c: &mut Criterion) { + let mut group = c.benchmark_group("registration"); + + group.bench_function("register_deregister_cycle", |b| { + let event_loop = EventLoop::default(); + + struct NoOpHandler; + impl EventHandler for NoOpHandler { + fn handle_event(&self, _: &Event) {} + } + + b.iter(|| { + let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + let mut listener = TcpListener::bind(addr).unwrap(); + + event_loop + .register(&mut listener, Token(1), Interest::READABLE, NoOpHandler) + .unwrap(); + + event_loop.deregister(&mut listener, Token(1)).unwrap(); + }); + }); + + group.finish(); +} + +fn bench_stress_max_throughput(c: &mut Criterion) { + let mut group = c.benchmark_group("stress_test"); + group.measurement_time(Duration::from_secs(10)); + group.sample_size(10); + + group.bench_function("rapid_task_submission", |b| { + let event_loop = EventLoop::new(4, 1024, 100).unwrap(); + + b.iter(|| { + for _ in 0..1000 { + event_loop.spawn_compute(|| { + black_box(42); + }); + } + }); + }); + + group.finish(); +} + +criterion_group!(threadpool_benches, bench_threadpool_latency,); + +criterion_group!( + compute_benches, + bench_compute_priority_ordering, + bench_compute_pool_metrics_overhead, +); + +criterion_group!( + pool_benches, + bench_object_pool_allocation, + bench_object_pool_contention, +); + +criterion_group!( + reactor_benches, + bench_event_dispatch_rate, + bench_registration_deregistration, +); + +criterion_group!(stress_benches, bench_stress_max_throughput,); + +criterion_main!( + threadpool_benches, + compute_benches, + pool_benches, + reactor_benches, + stress_benches, +); diff --git a/mill-net/Cargo.toml b/mill-net/Cargo.toml index b366f21..2ee16fc 100644 --- a/mill-net/Cargo.toml +++ b/mill-net/Cargo.toml @@ -22,10 +22,10 @@ parking_lot.workspace = true [dev-dependencies] serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.141" - -[features] -default = ["tcp"] -tcp = [] +criterion = { workspace = true } +tokio = { version = "1", features = ["full"] } +async-std = { version = "1", features = ["attributes"] } +smol = "2" [[example]] name = "tcp_echo_server" @@ -38,3 +38,11 @@ required-features = ["tcp"] [[example]] name = "tcp_jsonrpc_server" required-features = ["tcp"] + +[[bench]] +name = "tcp_benchmarks" +harness = false + +[[bench]] +name = "comparison" +harness = false diff --git a/mill-net/benches/comparison.rs b/mill-net/benches/comparison.rs new file mode 100644 index 0000000..661a6c9 --- /dev/null +++ b/mill-net/benches/comparison.rs @@ -0,0 +1,475 @@ +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use std::{ + io::{Read, Write}, + net::TcpStream, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + thread, + time::Duration, +}; + +mod mill_io_impl { + use super::*; + use mill_io::{error::Result, EventLoop}; + use mill_net::tcp::{ + traits::{ConnectionId, NetworkHandler}, + ServerContext, TcpServer, TcpServerConfig, + }; + use mio::Token; + + #[derive(Clone)] + pub struct EchoHandler { + pub bytes_received: Arc, + } + + impl NetworkHandler for EchoHandler { + fn on_data(&self, ctx: &ServerContext, conn_id: ConnectionId, data: &[u8]) -> Result<()> { + self.bytes_received + .fetch_add(data.len() as u64, Ordering::Relaxed); + ctx.send_to(conn_id, data)?; + Ok(()) + } + } + + pub struct ServerHandle { + event_loop: Arc, + handle: Option>, + } + + impl ServerHandle { + pub fn new(event_loop: Arc) -> Self { + let el = event_loop.clone(); + let handle = thread::spawn(move || { + let _ = el.run(); + }); + Self { + event_loop, + handle: Some(handle), + } + } + } + + impl Drop for ServerHandle { + fn drop(&mut self) { + self.event_loop.stop(); + if let Some(handle) = self.handle.take() { + let _ = handle.join(); + } + } + } + + /// thread pool dispatch + pub fn setup_server() -> (String, ServerHandle, Arc) { + let event_loop = Arc::new(EventLoop::default()); + setup_server_with_event_loop(event_loop) + } + + /// direct dispatch + pub fn setup_server_low_latency() -> (String, ServerHandle, Arc) { + let event_loop = Arc::new(EventLoop::new_low_latency(1024, 10).unwrap()); + setup_server_with_event_loop(event_loop) + } + + fn setup_server_with_event_loop( + event_loop: Arc, + ) -> (String, ServerHandle, Arc) { + let bytes_received = Arc::new(AtomicU64::new(0)); + + let handler = EchoHandler { + bytes_received: bytes_received.clone(), + }; + + let config = TcpServerConfig::builder() + .address("127.0.0.1:0".parse().unwrap()) + .build(); + + let server = Arc::new(TcpServer::new(config, handler).unwrap()); + let addr = server.local_addr().unwrap().to_string(); + + server.start(&event_loop, Token(1)).unwrap(); + let handle = ServerHandle::new(event_loop); + + thread::sleep(Duration::from_millis(50)); + + (addr, handle, bytes_received) + } +} + +mod tokio_impl { + use super::*; + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpListener, + runtime::Runtime, + sync::oneshot, + }; + + pub struct ServerHandle { + _runtime: Runtime, + shutdown: Option>, + } + + impl Drop for ServerHandle { + fn drop(&mut self) { + if let Some(tx) = self.shutdown.take() { + let _ = tx.send(()); + } + } + } + + pub fn setup_server() -> (String, ServerHandle, Arc) { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .unwrap(); + + let bytes_received = Arc::new(AtomicU64::new(0)); + let bytes_clone = bytes_received.clone(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let addr = runtime.block_on(async { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap().to_string(); + + tokio::spawn(async move { + tokio::select! { + _ = async { + loop { + if let Ok((mut socket, _)) = listener.accept().await { + let bytes = bytes_clone.clone(); + tokio::spawn(async move { + let mut buf = vec![0u8; 8192]; + loop { + match socket.read(&mut buf).await { + Ok(0) => break, + Ok(n) => { + bytes.fetch_add(n as u64, Ordering::Relaxed); + if socket.write_all(&buf[..n]).await.is_err() { + break; + } + } + Err(_) => break, + } + } + }); + } + } + } => {} + _ = shutdown_rx => {} + } + }); + + addr + }); + + thread::sleep(Duration::from_millis(50)); + + ( + addr, + ServerHandle { + _runtime: runtime, + shutdown: Some(shutdown_tx), + }, + bytes_received, + ) + } +} + +mod async_std_impl { + use super::*; + use async_std::{ + io::{ReadExt, WriteExt}, + net::TcpListener, + task, + }; + + pub struct ServerHandle { + _marker: (), + } + + pub fn setup_server() -> (String, ServerHandle, Arc) { + let bytes_received = Arc::new(AtomicU64::new(0)); + let bytes_clone = bytes_received.clone(); + + let (tx, rx) = std::sync::mpsc::channel(); + + task::spawn(async move { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap().to_string(); + tx.send(addr).unwrap(); + + loop { + if let Ok((mut socket, _)) = listener.accept().await { + let bytes = bytes_clone.clone(); + task::spawn(async move { + let mut buf = vec![0u8; 8192]; + loop { + match socket.read(&mut buf).await { + Ok(0) => break, + Ok(n) => { + bytes.fetch_add(n as u64, Ordering::Relaxed); + if socket.write_all(&buf[..n]).await.is_err() { + break; + } + } + Err(_) => break, + } + } + }); + } + } + }); + + let addr = rx.recv_timeout(Duration::from_secs(2)).unwrap(); + thread::sleep(Duration::from_millis(50)); + + (addr, ServerHandle { _marker: () }, bytes_received) + } +} + +mod smol_impl { + use super::*; + use smol::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpListener, + }; + + pub struct ServerHandle { + _marker: (), + } + + pub fn setup_server() -> (String, ServerHandle, Arc) { + let bytes_received = Arc::new(AtomicU64::new(0)); + let bytes_clone = bytes_received.clone(); + + let (tx, rx) = std::sync::mpsc::channel(); + + std::thread::spawn(move || { + smol::block_on(async { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap().to_string(); + tx.send(addr).unwrap(); + + loop { + if let Ok((mut socket, _)) = listener.accept().await { + let bytes = bytes_clone.clone(); + smol::spawn(async move { + let mut buf = vec![0u8; 8192]; + loop { + match socket.read(&mut buf).await { + Ok(0) => break, + Ok(n) => { + bytes.fetch_add(n as u64, Ordering::Relaxed); + if socket.write_all(&buf[..n]).await.is_err() { + break; + } + } + Err(_) => break, + } + } + }) + .detach(); + } + } + }); + }); + + let addr = rx.recv_timeout(Duration::from_secs(2)).unwrap(); + thread::sleep(Duration::from_millis(50)); + + (addr, ServerHandle { _marker: () }, bytes_received) + } +} + +fn bench_echo_throughput(c: &mut Criterion) { + let mut group = c.benchmark_group("echo_throughput"); + + for msg_size in [128, 1024, 4096] { + group.throughput(Throughput::Bytes((msg_size * 100) as u64)); + + // Mill-IO (default mode - thread pool dispatch) + group.bench_with_input( + BenchmarkId::new("mill-io", msg_size), + &msg_size, + |b, &size| { + let (addr, _handle, _) = mill_io_impl::setup_server(); + let mut stream = TcpStream::connect(&addr).unwrap(); + stream.set_nodelay(true).unwrap(); + let data = vec![42u8; size]; + let mut response = vec![0u8; size]; + + b.iter(|| { + for _ in 0..100 { + stream.write_all(&data).unwrap(); + stream.read_exact(&mut response).unwrap(); + } + black_box(&response); + }); + }, + ); + + // Mill-IO (low-latency mode - direct dispatch) + group.bench_with_input( + BenchmarkId::new("mill-io-fast", msg_size), + &msg_size, + |b, &size| { + let (addr, _handle, _) = mill_io_impl::setup_server_low_latency(); + let mut stream = TcpStream::connect(&addr).unwrap(); + stream.set_nodelay(true).unwrap(); + let data = vec![42u8; size]; + let mut response = vec![0u8; size]; + + b.iter(|| { + for _ in 0..100 { + stream.write_all(&data).unwrap(); + stream.read_exact(&mut response).unwrap(); + } + black_box(&response); + }); + }, + ); + + // Tokio + group.bench_with_input( + BenchmarkId::new("tokio", msg_size), + &msg_size, + |b, &size| { + let (addr, _handle, _) = tokio_impl::setup_server(); + let mut stream = TcpStream::connect(&addr).unwrap(); + stream.set_nodelay(true).unwrap(); + let data = vec![42u8; size]; + let mut response = vec![0u8; size]; + + b.iter(|| { + for _ in 0..100 { + stream.write_all(&data).unwrap(); + stream.read_exact(&mut response).unwrap(); + } + black_box(&response); + }); + }, + ); + + // async-std + group.bench_with_input( + BenchmarkId::new("async-std", msg_size), + &msg_size, + |b, &size| { + let (addr, _handle, _) = async_std_impl::setup_server(); + let mut stream = TcpStream::connect(&addr).unwrap(); + stream.set_nodelay(true).unwrap(); + let data = vec![42u8; size]; + let mut response = vec![0u8; size]; + + b.iter(|| { + for _ in 0..100 { + stream.write_all(&data).unwrap(); + stream.read_exact(&mut response).unwrap(); + } + black_box(&response); + }); + }, + ); + + // smol + group.bench_with_input(BenchmarkId::new("smol", msg_size), &msg_size, |b, &size| { + let (addr, _handle, _) = smol_impl::setup_server(); + let mut stream = TcpStream::connect(&addr).unwrap(); + stream.set_nodelay(true).unwrap(); + let data = vec![42u8; size]; + let mut response = vec![0u8; size]; + + b.iter(|| { + for _ in 0..100 { + stream.write_all(&data).unwrap(); + stream.read_exact(&mut response).unwrap(); + } + black_box(&response); + }); + }); + } + + group.finish(); +} + +fn bench_latency(c: &mut Criterion) { + let mut group = c.benchmark_group("ping_pong_latency"); + + // Mill-IO (default mode) + group.bench_function("mill-io", |b| { + let (addr, _handle, _) = mill_io_impl::setup_server(); + let mut stream = TcpStream::connect(&addr).unwrap(); + stream.set_nodelay(true).unwrap(); + + b.iter(|| { + stream.write_all(b"ping").unwrap(); + let mut buf = [0u8; 4]; + stream.read_exact(&mut buf).unwrap(); + black_box(&buf); + }); + }); + + // Mill-IO (low-latency mode) + group.bench_function("mill-io-fast", |b| { + let (addr, _handle, _) = mill_io_impl::setup_server_low_latency(); + let mut stream = TcpStream::connect(&addr).unwrap(); + stream.set_nodelay(true).unwrap(); + + b.iter(|| { + stream.write_all(b"ping").unwrap(); + let mut buf = [0u8; 4]; + stream.read_exact(&mut buf).unwrap(); + black_box(&buf); + }); + }); + + // Tokio + group.bench_function("tokio", |b| { + let (addr, _handle, _) = tokio_impl::setup_server(); + let mut stream = TcpStream::connect(&addr).unwrap(); + stream.set_nodelay(true).unwrap(); + + b.iter(|| { + stream.write_all(b"ping").unwrap(); + let mut buf = [0u8; 4]; + stream.read_exact(&mut buf).unwrap(); + black_box(&buf); + }); + }); + + // async-std + group.bench_function("async-std", |b| { + let (addr, _handle, _) = async_std_impl::setup_server(); + let mut stream = TcpStream::connect(&addr).unwrap(); + stream.set_nodelay(true).unwrap(); + + b.iter(|| { + stream.write_all(b"ping").unwrap(); + let mut buf = [0u8; 4]; + stream.read_exact(&mut buf).unwrap(); + black_box(&buf); + }); + }); + + // smol + group.bench_function("smol", |b| { + let (addr, _handle, _) = smol_impl::setup_server(); + let mut stream = TcpStream::connect(&addr).unwrap(); + stream.set_nodelay(true).unwrap(); + + b.iter(|| { + stream.write_all(b"ping").unwrap(); + let mut buf = [0u8; 4]; + stream.read_exact(&mut buf).unwrap(); + black_box(&buf); + }); + }); + + group.finish(); +} + +criterion_group!(benches, bench_echo_throughput, bench_latency); +criterion_main!(benches); diff --git a/mill-net/benches/tcp_benchmarks.rs b/mill-net/benches/tcp_benchmarks.rs new file mode 100644 index 0000000..199f8e4 --- /dev/null +++ b/mill-net/benches/tcp_benchmarks.rs @@ -0,0 +1,261 @@ +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use mill_io::{error::Result, EventLoop, TaskPriority}; +use mill_net::tcp::{ + traits::{ConnectionId, NetworkHandler}, + ServerContext, TcpServer, TcpServerConfig, +}; +use mio::Token; +use std::{ + io::{Read, Write}, + net::TcpStream, + sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, + }, + thread, + time::Duration, +}; + +#[derive(Clone)] +struct BenchHandler { + bytes_received: Arc, + connections: Arc, +} + +impl NetworkHandler for BenchHandler { + fn on_connect(&self, _: &ServerContext, _: ConnectionId) -> Result<()> { + self.connections.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + + fn on_data(&self, ctx: &ServerContext, conn_id: ConnectionId, data: &[u8]) -> Result<()> { + self.bytes_received + .fetch_add(data.len() as u64, Ordering::Relaxed); + ctx.send_to(conn_id, data)?; + Ok(()) + } + + fn on_disconnect(&self, _: &ServerContext, _: ConnectionId) -> Result<()> { + Ok(()) + } +} + +struct ServerHandle { + event_loop: Arc, + handle: Option>, +} + +impl ServerHandle { + fn new(event_loop: Arc) -> Self { + let el = event_loop.clone(); + let handle = thread::spawn(move || { + let _ = el.run(); + }); + Self { + event_loop, + handle: Some(handle), + } + } +} + +impl Drop for ServerHandle { + fn drop(&mut self) { + self.event_loop.stop(); + if let Some(handle) = self.handle.take() { + let _ = handle.join(); + } + } +} + +fn bench_tcp_echo_throughput(c: &mut Criterion) { + let mut group = c.benchmark_group("tcp_echo"); + + for msg_size in [128, 1024, 4096] { + group.throughput(Throughput::Bytes((msg_size * 100) as u64)); + + group.bench_with_input( + BenchmarkId::new("message_size", msg_size), + &msg_size, + |b, &size| { + let event_loop = Arc::new(EventLoop::default()); + let handler = BenchHandler { + bytes_received: Arc::new(AtomicU64::new(0)), + connections: Arc::new(AtomicUsize::new(0)), + }; + let config = TcpServerConfig::builder() + .address("127.0.0.1:0".parse().unwrap()) + .build(); + let server = Arc::new(TcpServer::new(config, handler.clone()).unwrap()); + let addr = server.local_addr().unwrap(); + + server.clone().start(&event_loop, Token(1)).unwrap(); + let el = event_loop.clone(); + let _ = thread::spawn(move || { + let _ = el.run(); + }); + + thread::sleep(Duration::from_millis(10)); + + let mut stream = TcpStream::connect(addr).unwrap(); + stream.set_nodelay(true).unwrap(); + + let data = vec![42u8; size]; + let mut response = vec![0u8; size]; + + b.iter(|| { + for _ in 0..100 { + stream.write_all(&data).unwrap(); + stream.read_exact(&mut response).unwrap(); + } + black_box(&response); + }); + }, + ); + } + group.finish(); +} + +fn bench_tcp_concurrent_connections(c: &mut Criterion) { + let mut group = c.benchmark_group("tcp_concurrent"); + group.sample_size(10); + group.measurement_time(Duration::from_secs(15)); + + for conn_count in [10, 50] { + group.bench_with_input( + BenchmarkId::from_parameter(conn_count), + &conn_count, + |b, &count| { + // SETUP - Create server once + let event_loop = Arc::new(EventLoop::default()); + let handler = BenchHandler { + bytes_received: Arc::new(AtomicU64::new(0)), + connections: Arc::new(AtomicUsize::new(0)), + }; + let config = TcpServerConfig::builder() + .address("127.0.0.1:0".parse().unwrap()) + .max_connections(count * 10) + .build(); + let server = Arc::new(TcpServer::new(config, handler).unwrap()); + let addr = server.local_addr().unwrap(); + + server.clone().start(&event_loop, Token(1)).unwrap(); + let _server_handle = ServerHandle::new(event_loop); + + thread::sleep(Duration::from_millis(50)); + + // MEASUREMENT - Don't use barriers, just measure sequential connection handling + b.iter(|| { + let handles: Vec<_> = (0..count) + .map(|_| { + let addr_copy = addr; + thread::spawn(move || { + if let Ok(mut stream) = TcpStream::connect(addr_copy) { + let _ = stream.set_nodelay(true); + let _ = stream.set_read_timeout(Some(Duration::from_secs(2))); + let _ = stream.set_write_timeout(Some(Duration::from_secs(2))); + + for _ in 0..3 { + if stream.write_all(b"test").is_err() { + break; + } + let mut buf = [0u8; 4]; + if stream.read_exact(&mut buf).is_err() { + break; + } + } + } + }) + }) + .collect(); + + for h in handles { + let _ = h.join(); + } + }); + }, + ); + } + group.finish(); +} + +fn bench_mixed_io_compute(c: &mut Criterion) { + let mut group = c.benchmark_group("mixed_io_compute"); + group.sample_size(20); + + // SETUP + let event_loop = Arc::new(EventLoop::default()); + + #[derive(Clone)] + struct ComputeHandler { + event_loop: Arc, + processed: Arc, + } + + impl NetworkHandler for ComputeHandler { + fn on_data(&self, _ctx: &ServerContext, _conn_id: ConnectionId, data: &[u8]) -> Result<()> { + let data_len = data.len(); + let processed = self.processed.clone(); + self.event_loop.spawn_compute_with_priority( + move || { + let mut hash: u64 = 0; + for i in 0..data_len { + hash = hash.wrapping_mul(31).wrapping_add(i as u64); + } + black_box(hash); + processed.fetch_add(1, Ordering::Relaxed); + }, + TaskPriority::High, + ); + Ok(()) + } + } + + let handler = ComputeHandler { + event_loop: event_loop.clone(), + processed: Arc::new(AtomicUsize::new(0)), + }; + let config = TcpServerConfig::builder() + .address("127.0.0.1:0".parse().unwrap()) + .build(); + let server = Arc::new(TcpServer::new(config, handler).unwrap()); + let addr = server.local_addr().unwrap(); + + server.clone().start(&event_loop, Token(1)).unwrap(); + + let el = event_loop.clone(); + let server_handle = thread::spawn(move || { + let _ = el.run(); + }); + + thread::sleep(Duration::from_millis(50)); + + let mut stream = TcpStream::connect(addr).unwrap(); + stream.set_nodelay(true).unwrap(); + stream + .set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + + group.bench_function("io_with_compute_offload", |b| { + b.iter(|| { + for _ in 0..20 { + let _ = stream.write_all(b"compute this data"); + } + }); + }); + + // CLEANUP + event_loop.stop(); + let _ = server_handle.join(); + + group.finish(); +} + +criterion_group!( + tcp_benches, + bench_tcp_echo_throughput, + bench_tcp_concurrent_connections, +); + +criterion_group!(mixed_benches, bench_mixed_io_compute); + +criterion_main!(tcp_benches, mixed_benches);