Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "benchmarks"]
path = benchmarks
url = ../itsi-server-benchmarks
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
## [0.2.17] - 2025-05-31
- Enabled vectorized writes in IoSteam
- Replaced all usage of heap-allocated BoxBody with HttpBody enums
- Add 5 threads as default for rack/handler
- Reserve header size ahead of time in rack interface
- Avoid intermediate array allocation when populating Rack env headers.
- Rewrite synchronous thread worker to avoid excessive GVL acquisition
- Revert to default write_ev behaviour for http1
- Switch to service_fn from service struct to avoid one additional pinned future
- Worker pinning accepts ruby workers too
- Fixed ordering incomaptibility in etag forwarding from static file server
- Added embedded benchmark suite

## [0.2.16] - 2025-05-02
- Optimized static error responses
- Optimized rate limit middleware
Expand Down
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
PATH
remote: .
specs:
itsi (0.2.16)
itsi-scheduler (~> 0.2.16)
itsi-server (~> 0.2.16)
itsi (0.2.17)
itsi-scheduler (~> 0.2.17)
itsi-server (~> 0.2.17)

PATH
remote: gems/scheduler
specs:
itsi-scheduler (0.2.16)
itsi-scheduler (0.2.17)
rb_sys (~> 0.9.91)

PATH
remote: gems/server
specs:
itsi-server (0.2.16)
itsi-server (0.2.17)
json (~> 2)
prism (~> 1.4)
rack (>= 1.6)
Expand Down
1 change: 1 addition & 0 deletions benchmarks
Submodule benchmarks added at c99d63
2 changes: 1 addition & 1 deletion crates/itsi_acme/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "itsi_acme"
version = "0.1.0"
authors = [
"wouterkem <wc@pico.net.nz>",
"wouterken <wc@pico.net.nz>",
"dignifiedquire <me@dignifiedquire.com>",
"Florian Uekermann <florian@uekermann.me>",
]
Expand Down
2 changes: 1 addition & 1 deletion crates/itsi_scheduler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "itsi-scheduler"
version = "0.2.16"
version = "0.2.17"
edition = "2021"
authors = ["Wouter Coppieters <wc@pico.net.nz>"]
license = "MIT"
Expand Down
4 changes: 3 additions & 1 deletion crates/itsi_server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "itsi-server"
version = "0.2.16"
version = "0.2.17"
edition = "2021"
authors = ["Wouter Coppieters <wc@pico.net.nz>"]
license = "MIT"
Expand Down Expand Up @@ -90,3 +90,5 @@ argon2 = "0.5.3"
core_affinity = "0.8.3"
memchr = "2.7.4"
quick_cache = "0.6.13"
smallvec = "1.15.0"
futures-util = "0.3.31"
7 changes: 6 additions & 1 deletion crates/itsi_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ fn init(ruby: &Ruby) -> Result<()> {
request.define_method("rack_protocol", method!(ItsiHttpRequest::rack_protocol, 0))?;
request.define_method("host", method!(ItsiHttpRequest::host, 0))?;
request.define_method("headers", method!(ItsiHttpRequest::headers, 0))?;
request.define_method("each_header", method!(ItsiHttpRequest::each_header, 0))?;
request.define_method("uri", method!(ItsiHttpRequest::uri, 0))?;
request.define_method("header", method!(ItsiHttpRequest::header, 1))?;
request.define_method("[]", method!(ItsiHttpRequest::header, 1))?;
Expand All @@ -71,6 +72,7 @@ fn init(ruby: &Ruby) -> Result<()> {
request.define_method("url_encoded?", method!(ItsiHttpRequest::is_url_encoded, 0))?;
request.define_method("multipart?", method!(ItsiHttpRequest::is_multipart, 0))?;
request.define_method("url_params", method!(ItsiHttpRequest::url_params, 0))?;
request.define_method("server_error", method!(ItsiHttpRequest::error, 1))?;

let body_proxy = ruby.get_inner(&ITSI_BODY_PROXY);
body_proxy.define_method("gets", method!(ItsiBodyProxy::gets, 0))?;
Expand All @@ -80,14 +82,17 @@ fn init(ruby: &Ruby) -> Result<()> {

let response = ruby.get_inner(&ITSI_RESPONSE);
response.define_method("[]=", method!(ItsiHttpResponse::add_header, 2))?;
response.define_method(
"reserve_headers",
method!(ItsiHttpResponse::reserve_headers, 1),
)?;
response.define_method("add_header", method!(ItsiHttpResponse::add_header, 2))?;
response.define_method("add_headers", method!(ItsiHttpResponse::add_headers, 1))?;
response.define_method("status=", method!(ItsiHttpResponse::set_status, 1))?;
response.define_method("send_frame", method!(ItsiHttpResponse::send_frame, 1))?;
response.define_method("<<", method!(ItsiHttpResponse::send_frame, 1))?;
response.define_method("write", method!(ItsiHttpResponse::send_frame, 1))?;
response.define_method("read", method!(ItsiHttpResponse::recv_frame, 0))?;
response.define_method("flush", method!(ItsiHttpResponse::flush, 0))?;
response.define_method("closed?", method!(ItsiHttpResponse::is_closed, 0))?;
response.define_method(
"send_and_close",
Expand Down
2 changes: 2 additions & 0 deletions crates/itsi_server/src/ruby_types/itsi_body_proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ pub struct ItsiBodyProxy {
pub enum ItsiBody {
Buffered(BigBytes),
Stream(ItsiBodyProxy),
Empty,
}

impl ItsiBody {
pub fn into_value(&self) -> Option<Value> {
match self {
ItsiBody::Buffered(bytes) => bytes.as_value(),
ItsiBody::Stream(proxy) => Some(proxy.clone().into_value()),
ItsiBody::Empty => None,
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/itsi_server/src/ruby_types/itsi_grpc_call.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use super::itsi_grpc_response_stream::ItsiGrpcResponseStream;
use crate::prelude::*;
use crate::server::http_message_types::{HttpRequest, HttpResponse};
use crate::server::http_message_types::{HttpBody, HttpRequest, HttpResponse};
use crate::server::{byte_frame::ByteFrame, request_job::RequestJob};
use crate::services::itsi_http_service::HttpRequestContext;
use async_compression::futures::bufread::{GzipDecoder, GzipEncoder, ZlibDecoder, ZlibEncoder};
use bytes::Bytes;
use derive_more::Debug;
use futures::{executor::block_on, io::Cursor, AsyncReadExt};
use http::{request::Parts, Response, StatusCode};
use http_body_util::{combinators::BoxBody, BodyExt, Empty};
use http_body_util::BodyExt;
use itsi_error::CLIENT_CONNECTION_CLOSED;
use itsi_rb_helpers::{print_rb_backtrace, HeapValue};
use itsi_tracing::debug;
Expand Down Expand Up @@ -139,15 +139,15 @@ impl ItsiGrpcCall {
{
Err(err) => {
error!("Error occurred: {}", err);
let mut response = Response::new(BoxBody::new(Empty::new()));
let mut response = Response::new(HttpBody::empty());
*response.status_mut() = StatusCode::BAD_REQUEST;
Ok(response)
}
_ => match receiver.recv().await {
Some(first_frame) => Ok(response_stream
.build_response(first_frame, receiver, shutdown_channel)
.await),
None => Ok(Response::new(BoxBody::new(Empty::new()))),
None => Ok(Response::new(HttpBody::empty())),
},
}
}
Expand Down
27 changes: 14 additions & 13 deletions crates/itsi_server/src/ruby_types/itsi_grpc_response_stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::itsi_grpc_call::CompressionAlgorithm;
use crate::prelude::*;
use crate::server::http_message_types::HttpResponse;
use crate::server::http_message_types::{HttpBody, HttpResponse};
use crate::server::size_limited_incoming::SizeLimitedIncoming;
use crate::server::{byte_frame::ByteFrame, serve_strategy::single_mode::RunningPhase};
use bytes::Bytes;
Expand All @@ -11,8 +11,8 @@ use http::{
header::{HeaderName, HeaderValue},
HeaderMap, Response,
};
use http_body_util::{combinators::BoxBody, BodyDataStream, BodyExt, Empty, Full, StreamBody};
use hyper::body::{Frame, Incoming};
use http_body_util::BodyDataStream;
use hyper::body::Incoming;
use magnus::error::Result as MagnusResult;
use nix::unistd::pipe;
use parking_lot::Mutex;
Expand Down Expand Up @@ -161,7 +161,7 @@ impl ItsiGrpcResponseStream {
response_headers,
incoming_reader: Some(pipe_read),
response_sender,
response: Some(Response::new(BoxBody::new(Empty::new()))),
response: Some(Response::new(HttpBody::empty())),
trailer_tx,
trailer_rx: Some(trailer_rx),
})),
Expand Down Expand Up @@ -207,12 +207,12 @@ impl ItsiGrpcResponseStream {
let rx = self.inner.lock().trailer_rx.take().unwrap();
*response.version_mut() = Version::HTTP_2;
*response.headers_mut() = self.inner.lock().response_headers.clone();
*response.body_mut() = if matches!(first_frame, ByteFrame::Empty) {
BoxBody::new(Empty::new())
let body_with_trailers = if matches!(first_frame, ByteFrame::Empty) {
HttpBody::empty()
} else if matches!(first_frame, ByteFrame::End(_)) {
BoxBody::new(Full::new(first_frame.into()))
HttpBody::full(first_frame.into())
} else {
let initial_frame = tokio_stream::once(Ok(Frame::data(Bytes::from(first_frame))));
let initial_frame = tokio_stream::once(Ok(Bytes::from(first_frame)));
let frame_stream = unfold(
(ReceiverStream::new(receiver), shutdown_rx),
|(mut receiver, mut shutdown_rx)| async move {
Expand All @@ -224,7 +224,7 @@ impl ItsiGrpcResponseStream {
maybe_bytes = receiver.next() => {
match maybe_bytes {
Some(ByteFrame::Data(bytes)) | Some(ByteFrame::End(bytes)) => {
return Some((Ok(Frame::data(bytes)), (receiver, shutdown_rx)));
return Some((Ok(bytes), (receiver, shutdown_rx)));
}
_ => {
return None;
Expand All @@ -234,7 +234,7 @@ impl ItsiGrpcResponseStream {
_ = shutdown_rx.changed() => {
match *shutdown_rx.borrow() {
RunningPhase::ShutdownPending => {
warn!("Disconnecting streaming client.");
debug!("Disconnecting streaming client.");
return None;
},
_ => continue,
Expand All @@ -246,15 +246,16 @@ impl ItsiGrpcResponseStream {
);

let combined_stream = initial_frame.chain(frame_stream);
BoxBody::new(StreamBody::new(combined_stream))
HttpBody::stream(combined_stream)
}
.with_trailers(async move {
match rx.await {
Ok(trailers) => Some(Ok(trailers)),
Err(_err) => None,
}
})
.boxed();
});

*response.body_mut() = body_with_trailers;
response
}

Expand Down
Loading
Loading