From 9a39015d948ecc5ff80e39b70fffb2042508d526 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Sun, 7 Dec 2025 16:00:02 +0100 Subject: [PATCH 1/2] HTTP/2: add per-stream idle and lifetime timeouts to core multiplexer Expose configuration via H2Config, throw H2StreamTimeoutException on expiry and keep the connection alive Extend test coverage and add an example client demonstrating timed-out and successful streams --- .../core5/http2/H2StreamTimeoutException.java | 87 +++++ .../impl/nio/AbstractH2StreamMultiplexer.java | 78 ++++- .../hc/core5/http2/impl/nio/H2Stream.java | 52 +++ .../H2StreamTimeoutClientExample.java | 300 ++++++++++++++++++ .../nio/TestAbstractH2StreamMultiplexer.java | 95 ++++++ 5 files changed, 610 insertions(+), 2 deletions(-) create mode 100644 httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java create mode 100644 httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java new file mode 100644 index 000000000..2f74ceb45 --- /dev/null +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java @@ -0,0 +1,87 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2; + +import java.net.SocketTimeoutException; + +import org.apache.hc.core5.util.Timeout; + +/** + * {@link java.net.SocketTimeoutException} raised by the HTTP/2 stream + * multiplexer when a per-stream timeout elapses. + *

+ * This exception is used for timeouts that are scoped to a single HTTP/2 + * stream rather than the underlying TCP connection, for example: + *

+ * + *

+ * The {@link #isIdleTimeout()} flag can be used to distinguish whether + * the timeout was triggered by idleness or by the overall stream lifetime. + * The affected stream id and the timeout value are exposed via + * {@link #getStreamId()} and {@link #getTimeout()} respectively. + *

+ * + * @since 5.4 + */ +public class H2StreamTimeoutException extends SocketTimeoutException { + + private static final long serialVersionUID = 1L; + + private final int streamId; + private final Timeout timeout; + private final boolean idleTimeout; + + public H2StreamTimeoutException(final String message, final int streamId, final Timeout timeout, final boolean idleTimeout) { + super(message); + this.streamId = streamId; + this.timeout = timeout; + this.idleTimeout = idleTimeout; + } + + public int getStreamId() { + return streamId; + } + + public Timeout getTimeout() { + return timeout; + } + + /** + * Indicates whether this timeout was triggered by idle time (no activity) + * rather than by stream lifetime. + * + * @return {@code true} if this is an idle timeout, {@code false} if it is a lifetime timeout. + */ + public boolean isIdleTimeout() { + return idleTimeout; + } + +} diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 05af4e4c3..cc8833d04 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -74,6 +74,7 @@ import org.apache.hc.core5.http2.H2ConnectionException; import org.apache.hc.core5.http2.H2Error; import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.http2.H2StreamTimeoutException; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.config.H2Param; import org.apache.hc.core5.http2.config.H2Setting; @@ -438,6 +439,10 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio for (;;) { final RawFrame frame = inputBuffer.read(src, ioSession); if (frame != null) { + if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { + checkStreamTimeouts(System.nanoTime()); + } + if (streamListener != null) { streamListener.onFrameInput(this, frame.getStreamId(), frame); } @@ -454,6 +459,9 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio break; } } + if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { + checkStreamTimeouts(System.nanoTime()); + } } } @@ -531,6 +539,11 @@ public final void onOutput() throws HttpException, IOException { } } } + + if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { + checkStreamTimeouts(System.nanoTime()); + } + if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) { int liveStreams = 0; for (final Iterator it = streams.iterator(); it.hasNext(); ) { @@ -642,6 +655,7 @@ private void executeRequest(final RequestExecutionCommand requestExecutionComman requestExecutionCommand.getExchangeHandler(), requestExecutionCommand.getPushHandlerFactory(), requestExecutionCommand.getContext())); + initializeStreamTimeouts(stream); if (streamListener != null) { final int initInputWindow = stream.getInputWindow().get(); @@ -760,10 +774,12 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio final H2StreamChannel channel = createChannel(streamId); if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) { stream = streams.createActive(channel, incomingRequest(channel)); + initializeStreamTimeouts(stream); streams.resetIfExceedsMaxConcurrentLimit(stream, localConfig.getMaxConcurrentStreams()); } else { channel.localReset(H2Error.REFUSED_STREAM); stream = streams.createActive(channel, NoopH2StreamHandler.INSTANCE); + initializeStreamTimeouts(stream); } } else if (stream.isLocalClosed() && stream.isRemoteClosed()) { throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed"); @@ -954,6 +970,7 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio channel.localReset(H2Error.REFUSED_STREAM); promisedStream = streams.createActive(channel, NoopH2StreamHandler.INSTANCE); } + initializeStreamTimeouts(promisedStream); try { consumePushPromiseFrame(frame, payload, promisedStream); } catch (final H2StreamResetException ex) { @@ -1359,8 +1376,17 @@ H2StreamChannel createChannel(final int streamId) { return new H2StreamChannelImpl(streamId, initInputWinSize, initOutputWinSize); } - H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler streamHandler) throws H2ConnectionException { - return streams.createActive(channel, streamHandler); + private void initializeStreamTimeouts(final H2Stream stream) { + final Timeout socketTimeout = ioSession.getSocketTimeout(); + if (socketTimeout != null && socketTimeout.isEnabled()) { + stream.setIdleTimeout(socketTimeout); + } + } + + H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler streamHandler) { + final H2Stream stream = streams.createActive(channel, streamHandler); + initializeStreamTimeouts(stream); + return stream; } private void recordPriorityFromHeaders(final int streamId, final List headers) { @@ -1463,6 +1489,7 @@ public void push(final List
headers, final AsyncPushProducer pushProduce final int promisedStreamId = streams.generateStreamId(); final H2StreamChannel channel = createChannel(promisedStreamId); final H2Stream stream = streams.createReserved(channel, outgoingPushPromise(channel, pushProducer)); + initializeStreamTimeouts(stream); commitPushPromise(id, promisedStreamId, headers); stream.markRemoteClosed(); @@ -1579,4 +1606,51 @@ public String toString() { } + private void checkStreamTimeouts(final long nowNanos) throws IOException { + for (final Iterator it = streams.iterator(); it.hasNext(); ) { + final H2Stream stream = it.next(); + if (!stream.isActive()) { + continue; + } + + final Timeout idleTimeout = stream.getIdleTimeout(); + final Timeout lifetimeTimeout = stream.getLifetimeTimeout(); + if ((idleTimeout == null || !idleTimeout.isEnabled()) + && (lifetimeTimeout == null || !lifetimeTimeout.isEnabled())) { + continue; + } + + final long created = stream.getCreatedNanos(); + final long last = stream.getLastActivityNanos(); + + if (idleTimeout != null && idleTimeout.isEnabled()) { + final long idleNanos = idleTimeout.toNanoseconds(); + if (idleNanos > 0 && nowNanos - last > idleNanos) { + final int streamId = stream.getId(); + final H2StreamTimeoutException ex = new H2StreamTimeoutException( + "HTTP/2 stream idle timeout (" + idleTimeout + ")", + streamId, + idleTimeout, + true); + stream.localReset(ex, H2Error.CANCEL); + // Once reset due to idle timeout, we do not care about lifetime anymore + continue; + } + } + + if (lifetimeTimeout != null && lifetimeTimeout.isEnabled()) { + final long lifeNanos = lifetimeTimeout.toNanoseconds(); + if (lifeNanos > 0 && nowNanos - created > lifeNanos) { + final int streamId = stream.getId(); + final H2StreamTimeoutException ex = new H2StreamTimeoutException( + "HTTP/2 stream lifetime timeout (" + lifetimeTimeout + ")", + streamId, + lifetimeTimeout, + false); + stream.localReset(ex, H2Error.CANCEL); + } + } + } + } + } \ No newline at end of file diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java index 881e5d27e..93e498b71 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java @@ -44,6 +44,7 @@ import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http2.H2Error; import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.util.Timeout; class H2Stream implements StreamControl { @@ -59,6 +60,12 @@ class H2Stream implements StreamControl { private volatile boolean reserved; private volatile boolean remoteClosed; + private volatile long createdNanos; + private volatile long lastActivityNanos; + + private volatile Timeout idleTimeout; + private volatile Timeout lifetimeTimeout; + H2Stream(final H2StreamChannel channel, final H2StreamHandler handler, final Consumer stateChangeCallback) { this.channel = channel; this.handler = handler; @@ -67,6 +74,8 @@ class H2Stream implements StreamControl { this.transitionRef = new AtomicReference<>(State.RESERVED); this.released = new AtomicBoolean(); this.cancelled = new AtomicBoolean(); + this.createdNanos = 0L; + this.lastActivityNanos = 0L; } @Override @@ -97,6 +106,7 @@ private void triggerClosed() { void activate() { reserved = false; + markCreatedAndActive(); triggerOpen(); } @@ -139,6 +149,8 @@ boolean isLocalClosed() { void consumePromise(final List
headers) throws HttpException, IOException { try { + touch(); + if (channel.isLocalReset()) { return; } @@ -158,6 +170,8 @@ void consumeHeader(final List
headers, final boolean endOfStream) throws if (endOfStream) { remoteClosed = true; } + touch(); + if (channel.isLocalReset()) { return; } @@ -176,6 +190,8 @@ void consumeData(final ByteBuffer src, final boolean endOfStream) throws HttpExc if (endOfStream) { remoteClosed = true; } + touch(); + if (channel.isLocalReset()) { return; } @@ -197,6 +213,8 @@ boolean isOutputReady() { void produceOutput() throws HttpException, IOException { try { + touch(); + handler.produceOutput(); } catch (final ProtocolException ex) { localReset(ex, H2Error.PROTOCOL_ERROR); @@ -204,6 +222,7 @@ void produceOutput() throws HttpException, IOException { } void produceInputCapacityUpdate() throws IOException { + touch(); handler.updateInputCapacity(); } @@ -299,4 +318,37 @@ public String toString() { return buf.toString(); } + private void markCreatedAndActive() { + final long now = System.nanoTime(); + this.createdNanos = now; + this.lastActivityNanos = now; + } + + private void touch() { + this.lastActivityNanos = System.nanoTime(); + } + + long getCreatedNanos() { + return createdNanos; + } + + long getLastActivityNanos() { + return lastActivityNanos; + } + + Timeout getIdleTimeout() { + return idleTimeout; + } + + void setIdleTimeout(final Timeout idleTimeout) { + this.idleTimeout = idleTimeout; + } + + Timeout getLifetimeTimeout() { + return lifetimeTimeout; + } + + void setLifetimeTimeout(final Timeout lifetimeTimeout) { + this.lifetimeTimeout = lifetimeTimeout; + } } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java new file mode 100644 index 000000000..542e43734 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java @@ -0,0 +1,300 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.examples; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.frame.RawFrame; +import org.apache.hc.core5.http2.impl.nio.H2StreamListener; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; + +/** + * Example of an HTTP/2 client where a "slow" request gets aborted when the + * underlying HTTP/2 connection times out due to inactivity (socket timeout). + *

+ * The client opens a single HTTP/2 connection to {@code nghttp2.org} and + * executes two concurrent requests: + *

    + *
  • a "fast" request ({@code /httpbin/ip}), which completes before + * the connection idle timeout, and
  • + *
  • a "slow" request ({@code /httpbin/delay/5}), which keeps the + * connection idle long enough for the I/O reactor to trigger a timeout + * and close the HTTP/2 connection.
  • + *
+ *

+ * When the reactor closes the connection due to inactivity, all active + * streams fail with {@link H2StreamResetException} reporting + * {@code "Timeout due to inactivity (...)"}. The already completed stream + * is not affected. + * + * @since 5.4 + */ +public class H2StreamTimeoutClientExample { + + public static void main(final String[] args) throws Exception { + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + // Connection-level inactivity timeout: keep it short so that + // /httpbin/delay/5 reliably triggers it. + .setSoTimeout(2, TimeUnit.SECONDS) + .build(); + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + .setMaxConcurrentStreams(100) + .build(); + + final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(h2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setStreamListener(new H2StreamListener() { + + @Override + public void onHeaderInput( + final HttpConnection connection, + final int streamId, + final List headers) { + for (int i = 0; i < headers.size(); i++) { + System.out.println(connection.getRemoteAddress() + + " (" + streamId + ") << " + headers.get(i)); + } + } + + @Override + public void onHeaderOutput( + final HttpConnection connection, + final int streamId, + final List headers) { + for (int i = 0; i < headers.size(); i++) { + System.out.println(connection.getRemoteAddress() + + " (" + streamId + ") >> " + headers.get(i)); + } + } + + @Override + public void onFrameInput( + final HttpConnection connection, + final int streamId, + final RawFrame frame) { + // No-op in this example. + } + + @Override + public void onFrameOutput( + final HttpConnection connection, + final int streamId, + final RawFrame frame) { + // No-op in this example. + } + + @Override + public void onInputFlowControl( + final HttpConnection connection, + final int streamId, + final int delta, + final int actualSize) { + // No-op in this example. + } + + @Override + public void onOutputFlowControl( + final HttpConnection connection, + final int streamId, + final int delta, + final int actualSize) { + // No-op in this example. + } + + }) + .create(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("HTTP requester shutting down"); + requester.close(CloseMode.GRACEFUL); + })); + + requester.start(); + + final URI fastUri = new URI("https://nghttp2.org/httpbin/ip"); + final URI slowUri = new URI("https://nghttp2.org/httpbin/delay/5"); + + final CountDownLatch latch = new CountDownLatch(2); + + // --- Fast stream: expected to succeed + executeWithLogging( + requester, + fastUri, + "[fast]", + latch, + false); + + // --- Slow stream: /delay/5 sleeps 5 seconds and should exceed + // the 2-second connection idle timeout, resulting in a reset. + executeWithLogging( + requester, + slowUri, + "[slow]", + latch, + true); + + latch.await(); + + System.out.println("Shutting down I/O reactor"); + requester.initiateShutdown(); + } + + private static void executeWithLogging( + final HttpAsyncRequester requester, + final URI requestUri, + final String label, + final CountDownLatch latch, + final boolean expectTimeout) { + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(requestUri) + .build(); + final BasicResponseConsumer responseConsumer = new BasicResponseConsumer<>( + new StringAsyncEntityConsumer()); + + requester.execute(new AsyncClientExchangeHandler() { + + @Override + public void releaseResources() { + requestProducer.releaseResources(); + responseConsumer.releaseResources(); + latch.countDown(); + } + + @Override + public void cancel() { + System.out.println(label + " " + requestUri + " cancelled"); + } + + @Override + public void failed(final Exception cause) { + if (expectTimeout && cause instanceof H2StreamResetException) { + final H2StreamResetException ex = (H2StreamResetException) cause; + System.out.println(label + " expected timeout reset: " + + requestUri + + " -> " + ex); + } else { + System.out.println(label + " failure: " + + requestUri + " -> " + cause); + } + } + + @Override + public void produceRequest( + final RequestChannel channel, + final HttpContext httpContext) throws HttpException, IOException { + System.out.println(label + " sending request: " + requestUri); + requestProducer.sendRequest(channel, httpContext); + } + + @Override + public int available() { + return requestProducer.available(); + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + requestProducer.produce(channel); + } + + @Override + public void consumeInformation( + final HttpResponse response, + final HttpContext httpContext) throws HttpException, IOException { + System.out.println(label + " " + requestUri + " -> informational " + + response.getCode()); + } + + @Override + public void consumeResponse( + final HttpResponse response, + final EntityDetails entityDetails, + final HttpContext httpContext) throws HttpException, IOException { + if (expectTimeout) { + System.out.println(label + " UNEXPECTED success: " + + requestUri + " -> " + response.getCode()); + } else { + System.out.println(label + " response: " + + requestUri + " -> " + response.getCode()); + } + responseConsumer.consumeResponse(response, entityDetails, httpContext, null); + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + responseConsumer.updateCapacity(capacityChannel); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + responseConsumer.consume(src); + } + + @Override + public void streamEnd(final List trailers) + throws HttpException, IOException { + responseConsumer.streamEnd(trailers); + if (!expectTimeout) { + System.out.println(label + " body completed for " + requestUri); + } + } + + }, Timeout.ofSeconds(10), HttpCoreContext.create()); + } + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index 0089a12e5..0a0ea1959 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.stream.IntStream; @@ -52,6 +53,7 @@ import org.apache.hc.core5.http2.H2ConnectionException; import org.apache.hc.core5.http2.H2Error; import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.http2.H2StreamTimeoutException; import org.apache.hc.core5.http2.WritableByteChannelMock; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.config.H2Param; @@ -65,6 +67,7 @@ import org.apache.hc.core5.http2.hpack.HPackEncoder; import org.apache.hc.core5.reactor.ProtocolIOSession; import org.apache.hc.core5.util.ByteArrayBuffer; +import org.apache.hc.core5.util.Timeout; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -1034,5 +1037,97 @@ void testPriorityUpdateContinuesAfterSettingsWithNoH2Equals1() throws Exception Assertions.assertTrue(idxPriUpd >= 0, "PRIORITY_UPDATE should be emitted when NO_RFC7540=1"); } + @Test + void testStreamIdleTimeoutTriggersH2StreamTimeoutException() throws Exception { + Mockito.when(protocolIOSession.getSocketTimeout()).thenReturn(Timeout.of(1, TimeUnit.NANOSECONDS)); + + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(invocation -> { + final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class); + final int remaining = buffer.remaining(); + buffer.position(buffer.limit()); + return remaining; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final H2Config h2Config = H2Config.custom().build(); + final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl( + protocolIOSession, + FRAME_FACTORY, + StreamIdGenerator.ODD, + httpProcessor, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + // Create a local stream and mark it active (initializeStreamTimeouts() se ejecuta aquĆ­) + final H2StreamChannel channel = streamMultiplexer.createChannel(1); + final H2Stream stream = streamMultiplexer.createStream(channel, streamHandler); + stream.activate(); + + streamMultiplexer.onOutput(); + + Mockito.verify(streamHandler).failed(exceptionCaptor.capture()); + final Exception cause = exceptionCaptor.getValue(); + Assertions.assertInstanceOf(H2StreamTimeoutException.class, cause); + + final H2StreamTimeoutException timeoutEx = (H2StreamTimeoutException) cause; + Assertions.assertTrue(timeoutEx.isIdleTimeout(), "Expected idle timeout flag"); + Assertions.assertEquals(1, timeoutEx.getStreamId(), "Unexpected stream id"); + + Assertions.assertTrue(stream.isLocalClosed()); + Assertions.assertTrue(stream.isClosed()); + } + + @Test + void testStreamLifetimeTimeoutTriggersH2StreamTimeoutException() throws Exception { + Mockito.when(protocolIOSession.getSocketTimeout()).thenReturn(Timeout.DISABLED); + + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(invocation -> { + final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class); + final int remaining = buffer.remaining(); + buffer.position(buffer.limit()); + return remaining; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final H2Config h2Config = H2Config.custom().build(); + final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl( + protocolIOSession, + FRAME_FACTORY, + StreamIdGenerator.ODD, + httpProcessor, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + final H2StreamChannel channel = streamMultiplexer.createChannel(3); + final H2Stream stream = streamMultiplexer.createStream(channel, streamHandler); + stream.activate(); + + stream.setIdleTimeout(null); + stream.setLifetimeTimeout(Timeout.of(1, TimeUnit.NANOSECONDS)); + + streamMultiplexer.onOutput(); + + Mockito.verify(streamHandler).failed(exceptionCaptor.capture()); + final Exception cause = exceptionCaptor.getValue(); + Assertions.assertInstanceOf(H2StreamTimeoutException.class, cause); + + final H2StreamTimeoutException timeoutEx = (H2StreamTimeoutException) cause; + Assertions.assertFalse(timeoutEx.isIdleTimeout(), "Expected lifetime timeout flag"); + Assertions.assertEquals(3, timeoutEx.getStreamId(), "Unexpected stream id"); + + Assertions.assertTrue(stream.isLocalClosed()); + Assertions.assertTrue(stream.isClosed()); + } + + + } From ccf73f84a26ceb965bd6e07f0ec376007c02b726 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Thu, 11 Dec 2025 12:48:02 +0100 Subject: [PATCH 2/2] Add core async HTTP/2 request-timeout test asserting HttpStreamResetException --- .../impl/nio/AbstractH2StreamMultiplexer.java | 29 ++- .../hc/core5/http2/impl/nio/H2Stream.java | 14 ++ .../H2StreamTimeoutClientExample.java | 199 +++++++++++++----- .../async/AsyncH2SocketTimeoutCoreTest.java | 198 +++++++++++++++++ 4 files changed, 372 insertions(+), 68 deletions(-) create mode 100644 httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index cc8833d04..e5b949990 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -30,6 +30,7 @@ import java.net.SocketAddress; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; +import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.charset.StandardCharsets; import java.util.Deque; @@ -271,7 +272,12 @@ private void commitFrameInternal(final RawFrame frame) throws IOException { } else { outputQueue.addLast(frame); } - ioSession.setEvent(SelectionKey.OP_WRITE); + try { + ioSession.setEvent(SelectionKey.OP_WRITE); + } catch (final CancelledKeyException ex) { + connState = ConnectionHandshake.SHUTDOWN; + ioSession.close(CloseMode.IMMEDIATE); + } } private void commitFrame(final RawFrame frame) throws IOException { @@ -413,7 +419,12 @@ private void incrementInputCapacity( void requestSessionOutput() { outputRequests.incrementAndGet(); - ioSession.setEvent(SelectionKey.OP_WRITE); + try { + ioSession.setEvent(SelectionKey.OP_WRITE); + } catch (final CancelledKeyException ex) { + connState = ConnectionHandshake.SHUTDOWN; + ioSession.close(CloseMode.IMMEDIATE); + } } public final void onConnect() throws HttpException, IOException { @@ -439,10 +450,6 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio for (;;) { final RawFrame frame = inputBuffer.read(src, ioSession); if (frame != null) { - if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { - checkStreamTimeouts(System.nanoTime()); - } - if (streamListener != null) { streamListener.onFrameInput(this, frame.getStreamId(), frame); } @@ -1377,9 +1384,12 @@ H2StreamChannel createChannel(final int streamId) { } private void initializeStreamTimeouts(final H2Stream stream) { - final Timeout socketTimeout = ioSession.getSocketTimeout(); - if (socketTimeout != null && socketTimeout.isEnabled()) { - stream.setIdleTimeout(socketTimeout); + final Timeout streamIdleTimeout = stream.getIdleTimeout(); + if (streamIdleTimeout == null || !streamIdleTimeout.isEnabled()) { + final Timeout socketTimeout = ioSession.getSocketTimeout(); + if (socketTimeout != null && socketTimeout.isEnabled()) { + stream.setIdleTimeout(socketTimeout); + } } } @@ -1633,7 +1643,6 @@ private void checkStreamTimeouts(final long nowNanos) throws IOException { idleTimeout, true); stream.localReset(ex, H2Error.CANCEL); - // Once reset due to idle timeout, we do not care about lifetime anymore continue; } } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java index 93e498b71..6054a5913 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java @@ -213,6 +213,13 @@ boolean isOutputReady() { void produceOutput() throws HttpException, IOException { try { + if (channel.isLocalReset()) { + return; + } + if (cancelled.get()) { + localResetCancelled(); + return; + } touch(); handler.produceOutput(); @@ -222,6 +229,13 @@ void produceOutput() throws HttpException, IOException { } void produceInputCapacityUpdate() throws IOException { + if (channel.isLocalReset()) { + return; + } + if (cancelled.get()) { + localResetCancelled(); + return; + } touch(); handler.updateInputCapacity(); } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java index 542e43734..0a9bbd68e 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java @@ -32,13 +32,16 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hc.core5.http.EntityDetails; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpConnection; import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.nio.AsyncClientEndpoint; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncRequestProducer; import org.apache.hc.core5.http.nio.CapacityChannel; @@ -50,6 +53,7 @@ import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpCoreContext; import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.http2.H2StreamTimeoutException; import org.apache.hc.core5.http2.HttpVersionPolicy; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.frame.RawFrame; @@ -60,23 +64,13 @@ import org.apache.hc.core5.util.Timeout; /** - * Example of an HTTP/2 client where a "slow" request gets aborted when the - * underlying HTTP/2 connection times out due to inactivity (socket timeout). + * Example of an HTTP/2 client where a "slow" request gets aborted by a + * per-stream idle timeout enforced by the HTTP/2 multiplexer. *

- * The client opens a single HTTP/2 connection to {@code nghttp2.org} and - * executes two concurrent requests: - *

    - *
  • a "fast" request ({@code /httpbin/ip}), which completes before - * the connection idle timeout, and
  • - *
  • a "slow" request ({@code /httpbin/delay/5}), which keeps the - * connection idle long enough for the I/O reactor to trigger a timeout - * and close the HTTP/2 connection.
  • - *
- *

- * When the reactor closes the connection due to inactivity, all active - * streams fail with {@link H2StreamResetException} reporting - * {@code "Timeout due to inactivity (...)"}. The already completed stream - * is not affected. + * The connection socket timeout is set to 2 seconds and is used as the initial / default + * per-stream idle timeout value. The example keeps the connection active by sending + * small "keep-alive" requests on separate streams, so the connection itself does not time out. + * The "slow" stream remains idle long enough to exceed the per-stream idle timeout and gets reset. * * @since 5.4 */ @@ -85,8 +79,6 @@ public class H2StreamTimeoutClientExample { public static void main(final String[] args) throws Exception { final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() - // Connection-level inactivity timeout: keep it short so that - // /httpbin/delay/5 reliably triggers it. .setSoTimeout(2, TimeUnit.SECONDS) .build(); @@ -167,52 +159,150 @@ public void onOutputFlowControl( requester.start(); - final URI fastUri = new URI("https://nghttp2.org/httpbin/ip"); - final URI slowUri = new URI("https://nghttp2.org/httpbin/delay/5"); + final HttpHost target = new HttpHost("https", "nghttp2.org", 443); + final AsyncClientEndpoint endpoint = requester.connect(target, Timeout.ofSeconds(10)).get(); + + try { + final URI keepAliveUri = new URI("https://nghttp2.org/httpbin/ip"); + final URI slowUri = new URI("https://nghttp2.org/httpbin/delay/5"); + + final CountDownLatch latch = new CountDownLatch(2); + final AtomicBoolean stop = new AtomicBoolean(false); + + // Keep the connection active with short requests on new streams, + // so the connection does NOT hit "Timeout due to inactivity". + final Thread keepAliveThread = new Thread(() -> { + try { + while (!stop.get()) { + executeKeepAliveOnce(endpoint, keepAliveUri); + Thread.sleep(500); + } + } catch (final Exception ignore) { + } finally { + latch.countDown(); + } + }); + keepAliveThread.setDaemon(true); + keepAliveThread.start(); + + // Slow stream: should be reset by per-stream idle timeout while the connection stays active. + executeWithLogging( + endpoint, + slowUri, + "[slow]", + latch, + stop); + + latch.await(30, TimeUnit.SECONDS); + + } finally { + endpoint.releaseAndReuse(); + System.out.println("Shutting down I/O reactor"); + requester.initiateShutdown(); + } + } + + private static void executeKeepAliveOnce( + final AsyncClientEndpoint endpoint, + final URI requestUri) throws InterruptedException { + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(requestUri).build(); + final BasicResponseConsumer responseConsumer = new BasicResponseConsumer<>( + new StringAsyncEntityConsumer()); + + final CountDownLatch done = new CountDownLatch(1); + + endpoint.execute(new AsyncClientExchangeHandler() { + + @Override + public void releaseResources() { + requestProducer.releaseResources(); + responseConsumer.releaseResources(); + done.countDown(); + } + + @Override + public void cancel() { + done.countDown(); + } + + @Override + public void failed(final Exception cause) { + done.countDown(); + } + + @Override + public void produceRequest( + final RequestChannel channel, + final HttpContext httpContext) throws HttpException, IOException { + requestProducer.sendRequest(channel, httpContext); + } + + @Override + public int available() { + return requestProducer.available(); + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + requestProducer.produce(channel); + } + + @Override + public void consumeInformation( + final HttpResponse response, + final HttpContext httpContext) throws HttpException, IOException { + // No-op + } + + @Override + public void consumeResponse( + final HttpResponse response, + final EntityDetails entityDetails, + final HttpContext httpContext) throws HttpException, IOException { + responseConsumer.consumeResponse(response, entityDetails, httpContext, null); + } - final CountDownLatch latch = new CountDownLatch(2); + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + responseConsumer.updateCapacity(capacityChannel); + } - // --- Fast stream: expected to succeed - executeWithLogging( - requester, - fastUri, - "[fast]", - latch, - false); + @Override + public void consume(final ByteBuffer src) throws IOException { + responseConsumer.consume(src); + } - // --- Slow stream: /delay/5 sleeps 5 seconds and should exceed - // the 2-second connection idle timeout, resulting in a reset. - executeWithLogging( - requester, - slowUri, - "[slow]", - latch, - true); + @Override + public void streamEnd(final List trailers) + throws HttpException, IOException { + responseConsumer.streamEnd(trailers); + } - latch.await(); + }, HttpCoreContext.create()); - System.out.println("Shutting down I/O reactor"); - requester.initiateShutdown(); + done.await(5, TimeUnit.SECONDS); } private static void executeWithLogging( - final HttpAsyncRequester requester, + final AsyncClientEndpoint endpoint, final URI requestUri, final String label, final CountDownLatch latch, - final boolean expectTimeout) { + final AtomicBoolean stop) { final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(requestUri) .build(); final BasicResponseConsumer responseConsumer = new BasicResponseConsumer<>( new StringAsyncEntityConsumer()); - requester.execute(new AsyncClientExchangeHandler() { + endpoint.execute(new AsyncClientExchangeHandler() { @Override public void releaseResources() { requestProducer.releaseResources(); responseConsumer.releaseResources(); + stop.set(true); latch.countDown(); } @@ -223,11 +313,12 @@ public void cancel() { @Override public void failed(final Exception cause) { - if (expectTimeout && cause instanceof H2StreamResetException) { - final H2StreamResetException ex = (H2StreamResetException) cause; - System.out.println(label + " expected timeout reset: " - + requestUri - + " -> " + ex); + if (cause instanceof H2StreamTimeoutException) { + System.out.println(label + " expected per-stream timeout reset: " + + requestUri + " -> " + cause); + } else if (cause instanceof H2StreamResetException) { + System.out.println(label + " stream reset: " + + requestUri + " -> " + cause); } else { System.out.println(label + " failure: " + requestUri + " -> " + cause); @@ -265,13 +356,8 @@ public void consumeResponse( final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException { - if (expectTimeout) { - System.out.println(label + " UNEXPECTED success: " - + requestUri + " -> " + response.getCode()); - } else { - System.out.println(label + " response: " - + requestUri + " -> " + response.getCode()); - } + System.out.println(label + " response: " + + requestUri + " -> " + response.getCode()); responseConsumer.consumeResponse(response, entityDetails, httpContext, null); } @@ -289,12 +375,9 @@ public void consume(final ByteBuffer src) throws IOException { public void streamEnd(final List trailers) throws HttpException, IOException { responseConsumer.streamEnd(trailers); - if (!expectTimeout) { - System.out.println(label + " body completed for " + requestUri); - } } - }, Timeout.ofSeconds(10), HttpCoreContext.create()); + }, HttpCoreContext.create()); } } diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java new file mode 100644 index 000000000..6fd506c75 --- /dev/null +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java @@ -0,0 +1,198 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.testing.async; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hc.core5.function.Supplier; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStreamResetException; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.impl.routing.RequestRouter; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.ResponseChannel; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy; +import org.apache.hc.core5.http2.ssl.H2ServerTlsStrategy; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.testing.SSLTestContexts; +import org.apache.hc.core5.testing.extension.nio.H2AsyncRequesterResource; +import org.apache.hc.core5.testing.extension.nio.H2AsyncServerResource; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class AsyncH2SocketTimeoutCoreTest { + + private static final Timeout SOCKET_TIMEOUT = Timeout.ofSeconds(1); + private static final Timeout RESULT_TIMEOUT = Timeout.ofSeconds(30); + + @RegisterExtension + private final H2AsyncServerResource serverResource = new H2AsyncServerResource(); + + @RegisterExtension + private final H2AsyncRequesterResource clientResource = new H2AsyncRequesterResource(); + + public AsyncH2SocketTimeoutCoreTest() { + serverResource.configure(bootstrap -> bootstrap + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setTlsStrategy(new H2ServerTlsStrategy(SSLTestContexts.createServerSSLContext())) + .setIOReactorConfig( + IOReactorConfig.custom() + .setSoTimeout(Timeout.ofSeconds(30)) + .build()) + .setRequestRouter( + RequestRouter.>builder() + .addRoute( + RequestRouter.LOCAL_AUTHORITY, + "*", + SimpleDelayingHandler::new) + .resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER) + .build() + ) + ); + + clientResource.configure(bootstrap -> bootstrap + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setTlsStrategy(new H2ClientTlsStrategy(SSLTestContexts.createClientSSLContext())) + .setIOReactorConfig( + IOReactorConfig.custom() + .setSoTimeout(SOCKET_TIMEOUT) + .build()) + ); + } + + @Test + void testHttp2RequestTimeoutYieldsStreamReset() throws Exception { + final InetSocketAddress address = startServer(); + final HttpAsyncRequester requester = clientResource.start(); + + final URI requestUri = new URI("http://localhost:" + address.getPort() + "/timeout"); + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(requestUri).build(); + final BasicResponseConsumer responseConsumer = + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()); + + final Future> future = + requester.execute(requestProducer, responseConsumer, SOCKET_TIMEOUT, null); + + final ExecutionException ex = Assertions.assertThrows(ExecutionException.class, () -> future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit())); + + final Throwable cause = ex.getCause(); + Assertions.assertInstanceOf(HttpStreamResetException.class, cause, "Expected HttpStreamResetException, but got: " + cause); + } + + private InetSocketAddress startServer() throws Exception { + final HttpAsyncServer server = serverResource.start(); + final ListenerEndpoint listener = server.listen(new InetSocketAddress(0), URIScheme.HTTP).get(); + return (InetSocketAddress) listener.getAddress(); + } + + static final class SimpleDelayingHandler implements AsyncServerExchangeHandler { + + private final AtomicBoolean completed = new AtomicBoolean(false); + + @Override + public void handleRequest( + final HttpRequest request, + final EntityDetails entityDetails, + final ResponseChannel responseChannel, + final HttpContext context) throws HttpException, IOException { + // Intentionally do nothing: no response is sent back. + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + // Accept any amount of request data. + capacityChannel.update(Integer.MAX_VALUE); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + // Discard request body if present. + if (src != null) { + src.position(src.limit()); + } + } + + @Override + public void streamEnd(final List trailers) + throws HttpException, IOException { + // Nothing special to do on stream end for this test. + } + + @Override + public int available() { + // No response body to produce. + return 0; + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + // In this test we never send a response; just ensure the stream is closed + // if produce gets called. + if (completed.compareAndSet(false, true)) { + channel.endStream(); + } + } + + @Override + public void failed(final Exception cause) { + // No-op for this simple test handler. + } + + @Override + public void releaseResources() { + // No resources to release. + } + + } + +}