diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java new file mode 100644 index 000000000..2f74ceb45 --- /dev/null +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamTimeoutException.java @@ -0,0 +1,87 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2; + +import java.net.SocketTimeoutException; + +import org.apache.hc.core5.util.Timeout; + +/** + * {@link java.net.SocketTimeoutException} raised by the HTTP/2 stream + * multiplexer when a per-stream timeout elapses. + *

+ * This exception is used for timeouts that are scoped to a single HTTP/2 + * stream rather than the underlying TCP connection, for example: + *

+ * + *

+ * The {@link #isIdleTimeout()} flag can be used to distinguish whether + * the timeout was triggered by idleness or by the overall stream lifetime. + * The affected stream id and the timeout value are exposed via + * {@link #getStreamId()} and {@link #getTimeout()} respectively. + *

+ * + * @since 5.4 + */ +public class H2StreamTimeoutException extends SocketTimeoutException { + + private static final long serialVersionUID = 1L; + + private final int streamId; + private final Timeout timeout; + private final boolean idleTimeout; + + public H2StreamTimeoutException(final String message, final int streamId, final Timeout timeout, final boolean idleTimeout) { + super(message); + this.streamId = streamId; + this.timeout = timeout; + this.idleTimeout = idleTimeout; + } + + public int getStreamId() { + return streamId; + } + + public Timeout getTimeout() { + return timeout; + } + + /** + * Indicates whether this timeout was triggered by idle time (no activity) + * rather than by stream lifetime. + * + * @return {@code true} if this is an idle timeout, {@code false} if it is a lifetime timeout. + */ + public boolean isIdleTimeout() { + return idleTimeout; + } + +} diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 05af4e4c3..e5b949990 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -30,6 +30,7 @@ import java.net.SocketAddress; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; +import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.charset.StandardCharsets; import java.util.Deque; @@ -74,6 +75,7 @@ import org.apache.hc.core5.http2.H2ConnectionException; import org.apache.hc.core5.http2.H2Error; import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.http2.H2StreamTimeoutException; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.config.H2Param; import org.apache.hc.core5.http2.config.H2Setting; @@ -270,7 +272,12 @@ private void commitFrameInternal(final RawFrame frame) throws IOException { } else { outputQueue.addLast(frame); } - ioSession.setEvent(SelectionKey.OP_WRITE); + try { + ioSession.setEvent(SelectionKey.OP_WRITE); + } catch (final CancelledKeyException ex) { + connState = ConnectionHandshake.SHUTDOWN; + ioSession.close(CloseMode.IMMEDIATE); + } } private void commitFrame(final RawFrame frame) throws IOException { @@ -412,7 +419,12 @@ private void incrementInputCapacity( void requestSessionOutput() { outputRequests.incrementAndGet(); - ioSession.setEvent(SelectionKey.OP_WRITE); + try { + ioSession.setEvent(SelectionKey.OP_WRITE); + } catch (final CancelledKeyException ex) { + connState = ConnectionHandshake.SHUTDOWN; + ioSession.close(CloseMode.IMMEDIATE); + } } public final void onConnect() throws HttpException, IOException { @@ -454,6 +466,9 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio break; } } + if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { + checkStreamTimeouts(System.nanoTime()); + } } } @@ -531,6 +546,11 @@ public final void onOutput() throws HttpException, IOException { } } } + + if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { + checkStreamTimeouts(System.nanoTime()); + } + if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) { int liveStreams = 0; for (final Iterator it = streams.iterator(); it.hasNext(); ) { @@ -642,6 +662,7 @@ private void executeRequest(final RequestExecutionCommand requestExecutionComman requestExecutionCommand.getExchangeHandler(), requestExecutionCommand.getPushHandlerFactory(), requestExecutionCommand.getContext())); + initializeStreamTimeouts(stream); if (streamListener != null) { final int initInputWindow = stream.getInputWindow().get(); @@ -760,10 +781,12 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio final H2StreamChannel channel = createChannel(streamId); if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) { stream = streams.createActive(channel, incomingRequest(channel)); + initializeStreamTimeouts(stream); streams.resetIfExceedsMaxConcurrentLimit(stream, localConfig.getMaxConcurrentStreams()); } else { channel.localReset(H2Error.REFUSED_STREAM); stream = streams.createActive(channel, NoopH2StreamHandler.INSTANCE); + initializeStreamTimeouts(stream); } } else if (stream.isLocalClosed() && stream.isRemoteClosed()) { throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed"); @@ -954,6 +977,7 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio channel.localReset(H2Error.REFUSED_STREAM); promisedStream = streams.createActive(channel, NoopH2StreamHandler.INSTANCE); } + initializeStreamTimeouts(promisedStream); try { consumePushPromiseFrame(frame, payload, promisedStream); } catch (final H2StreamResetException ex) { @@ -1359,8 +1383,20 @@ H2StreamChannel createChannel(final int streamId) { return new H2StreamChannelImpl(streamId, initInputWinSize, initOutputWinSize); } - H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler streamHandler) throws H2ConnectionException { - return streams.createActive(channel, streamHandler); + private void initializeStreamTimeouts(final H2Stream stream) { + final Timeout streamIdleTimeout = stream.getIdleTimeout(); + if (streamIdleTimeout == null || !streamIdleTimeout.isEnabled()) { + final Timeout socketTimeout = ioSession.getSocketTimeout(); + if (socketTimeout != null && socketTimeout.isEnabled()) { + stream.setIdleTimeout(socketTimeout); + } + } + } + + H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler streamHandler) { + final H2Stream stream = streams.createActive(channel, streamHandler); + initializeStreamTimeouts(stream); + return stream; } private void recordPriorityFromHeaders(final int streamId, final List headers) { @@ -1463,6 +1499,7 @@ public void push(final List
headers, final AsyncPushProducer pushProduce final int promisedStreamId = streams.generateStreamId(); final H2StreamChannel channel = createChannel(promisedStreamId); final H2Stream stream = streams.createReserved(channel, outgoingPushPromise(channel, pushProducer)); + initializeStreamTimeouts(stream); commitPushPromise(id, promisedStreamId, headers); stream.markRemoteClosed(); @@ -1579,4 +1616,50 @@ public String toString() { } + private void checkStreamTimeouts(final long nowNanos) throws IOException { + for (final Iterator it = streams.iterator(); it.hasNext(); ) { + final H2Stream stream = it.next(); + if (!stream.isActive()) { + continue; + } + + final Timeout idleTimeout = stream.getIdleTimeout(); + final Timeout lifetimeTimeout = stream.getLifetimeTimeout(); + if ((idleTimeout == null || !idleTimeout.isEnabled()) + && (lifetimeTimeout == null || !lifetimeTimeout.isEnabled())) { + continue; + } + + final long created = stream.getCreatedNanos(); + final long last = stream.getLastActivityNanos(); + + if (idleTimeout != null && idleTimeout.isEnabled()) { + final long idleNanos = idleTimeout.toNanoseconds(); + if (idleNanos > 0 && nowNanos - last > idleNanos) { + final int streamId = stream.getId(); + final H2StreamTimeoutException ex = new H2StreamTimeoutException( + "HTTP/2 stream idle timeout (" + idleTimeout + ")", + streamId, + idleTimeout, + true); + stream.localReset(ex, H2Error.CANCEL); + continue; + } + } + + if (lifetimeTimeout != null && lifetimeTimeout.isEnabled()) { + final long lifeNanos = lifetimeTimeout.toNanoseconds(); + if (lifeNanos > 0 && nowNanos - created > lifeNanos) { + final int streamId = stream.getId(); + final H2StreamTimeoutException ex = new H2StreamTimeoutException( + "HTTP/2 stream lifetime timeout (" + lifetimeTimeout + ")", + streamId, + lifetimeTimeout, + false); + stream.localReset(ex, H2Error.CANCEL); + } + } + } + } + } \ No newline at end of file diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java index 881e5d27e..6054a5913 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java @@ -44,6 +44,7 @@ import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http2.H2Error; import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.util.Timeout; class H2Stream implements StreamControl { @@ -59,6 +60,12 @@ class H2Stream implements StreamControl { private volatile boolean reserved; private volatile boolean remoteClosed; + private volatile long createdNanos; + private volatile long lastActivityNanos; + + private volatile Timeout idleTimeout; + private volatile Timeout lifetimeTimeout; + H2Stream(final H2StreamChannel channel, final H2StreamHandler handler, final Consumer stateChangeCallback) { this.channel = channel; this.handler = handler; @@ -67,6 +74,8 @@ class H2Stream implements StreamControl { this.transitionRef = new AtomicReference<>(State.RESERVED); this.released = new AtomicBoolean(); this.cancelled = new AtomicBoolean(); + this.createdNanos = 0L; + this.lastActivityNanos = 0L; } @Override @@ -97,6 +106,7 @@ private void triggerClosed() { void activate() { reserved = false; + markCreatedAndActive(); triggerOpen(); } @@ -139,6 +149,8 @@ boolean isLocalClosed() { void consumePromise(final List
headers) throws HttpException, IOException { try { + touch(); + if (channel.isLocalReset()) { return; } @@ -158,6 +170,8 @@ void consumeHeader(final List
headers, final boolean endOfStream) throws if (endOfStream) { remoteClosed = true; } + touch(); + if (channel.isLocalReset()) { return; } @@ -176,6 +190,8 @@ void consumeData(final ByteBuffer src, final boolean endOfStream) throws HttpExc if (endOfStream) { remoteClosed = true; } + touch(); + if (channel.isLocalReset()) { return; } @@ -197,6 +213,15 @@ boolean isOutputReady() { void produceOutput() throws HttpException, IOException { try { + if (channel.isLocalReset()) { + return; + } + if (cancelled.get()) { + localResetCancelled(); + return; + } + touch(); + handler.produceOutput(); } catch (final ProtocolException ex) { localReset(ex, H2Error.PROTOCOL_ERROR); @@ -204,6 +229,14 @@ void produceOutput() throws HttpException, IOException { } void produceInputCapacityUpdate() throws IOException { + if (channel.isLocalReset()) { + return; + } + if (cancelled.get()) { + localResetCancelled(); + return; + } + touch(); handler.updateInputCapacity(); } @@ -299,4 +332,37 @@ public String toString() { return buf.toString(); } + private void markCreatedAndActive() { + final long now = System.nanoTime(); + this.createdNanos = now; + this.lastActivityNanos = now; + } + + private void touch() { + this.lastActivityNanos = System.nanoTime(); + } + + long getCreatedNanos() { + return createdNanos; + } + + long getLastActivityNanos() { + return lastActivityNanos; + } + + Timeout getIdleTimeout() { + return idleTimeout; + } + + void setIdleTimeout(final Timeout idleTimeout) { + this.idleTimeout = idleTimeout; + } + + Timeout getLifetimeTimeout() { + return lifetimeTimeout; + } + + void setLifetimeTimeout(final Timeout lifetimeTimeout) { + this.lifetimeTimeout = lifetimeTimeout; + } } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java new file mode 100644 index 000000000..0a9bbd68e --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java @@ -0,0 +1,383 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.examples; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.nio.AsyncClientEndpoint; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.http2.H2StreamTimeoutException; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.frame.RawFrame; +import org.apache.hc.core5.http2.impl.nio.H2StreamListener; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; + +/** + * Example of an HTTP/2 client where a "slow" request gets aborted by a + * per-stream idle timeout enforced by the HTTP/2 multiplexer. + *

+ * The connection socket timeout is set to 2 seconds and is used as the initial / default + * per-stream idle timeout value. The example keeps the connection active by sending + * small "keep-alive" requests on separate streams, so the connection itself does not time out. + * The "slow" stream remains idle long enough to exceed the per-stream idle timeout and gets reset. + * + * @since 5.4 + */ +public class H2StreamTimeoutClientExample { + + public static void main(final String[] args) throws Exception { + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setSoTimeout(2, TimeUnit.SECONDS) + .build(); + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + .setMaxConcurrentStreams(100) + .build(); + + final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(h2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setStreamListener(new H2StreamListener() { + + @Override + public void onHeaderInput( + final HttpConnection connection, + final int streamId, + final List headers) { + for (int i = 0; i < headers.size(); i++) { + System.out.println(connection.getRemoteAddress() + + " (" + streamId + ") << " + headers.get(i)); + } + } + + @Override + public void onHeaderOutput( + final HttpConnection connection, + final int streamId, + final List headers) { + for (int i = 0; i < headers.size(); i++) { + System.out.println(connection.getRemoteAddress() + + " (" + streamId + ") >> " + headers.get(i)); + } + } + + @Override + public void onFrameInput( + final HttpConnection connection, + final int streamId, + final RawFrame frame) { + // No-op in this example. + } + + @Override + public void onFrameOutput( + final HttpConnection connection, + final int streamId, + final RawFrame frame) { + // No-op in this example. + } + + @Override + public void onInputFlowControl( + final HttpConnection connection, + final int streamId, + final int delta, + final int actualSize) { + // No-op in this example. + } + + @Override + public void onOutputFlowControl( + final HttpConnection connection, + final int streamId, + final int delta, + final int actualSize) { + // No-op in this example. + } + + }) + .create(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("HTTP requester shutting down"); + requester.close(CloseMode.GRACEFUL); + })); + + requester.start(); + + final HttpHost target = new HttpHost("https", "nghttp2.org", 443); + final AsyncClientEndpoint endpoint = requester.connect(target, Timeout.ofSeconds(10)).get(); + + try { + final URI keepAliveUri = new URI("https://nghttp2.org/httpbin/ip"); + final URI slowUri = new URI("https://nghttp2.org/httpbin/delay/5"); + + final CountDownLatch latch = new CountDownLatch(2); + final AtomicBoolean stop = new AtomicBoolean(false); + + // Keep the connection active with short requests on new streams, + // so the connection does NOT hit "Timeout due to inactivity". + final Thread keepAliveThread = new Thread(() -> { + try { + while (!stop.get()) { + executeKeepAliveOnce(endpoint, keepAliveUri); + Thread.sleep(500); + } + } catch (final Exception ignore) { + } finally { + latch.countDown(); + } + }); + keepAliveThread.setDaemon(true); + keepAliveThread.start(); + + // Slow stream: should be reset by per-stream idle timeout while the connection stays active. + executeWithLogging( + endpoint, + slowUri, + "[slow]", + latch, + stop); + + latch.await(30, TimeUnit.SECONDS); + + } finally { + endpoint.releaseAndReuse(); + System.out.println("Shutting down I/O reactor"); + requester.initiateShutdown(); + } + } + + private static void executeKeepAliveOnce( + final AsyncClientEndpoint endpoint, + final URI requestUri) throws InterruptedException { + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(requestUri).build(); + final BasicResponseConsumer responseConsumer = new BasicResponseConsumer<>( + new StringAsyncEntityConsumer()); + + final CountDownLatch done = new CountDownLatch(1); + + endpoint.execute(new AsyncClientExchangeHandler() { + + @Override + public void releaseResources() { + requestProducer.releaseResources(); + responseConsumer.releaseResources(); + done.countDown(); + } + + @Override + public void cancel() { + done.countDown(); + } + + @Override + public void failed(final Exception cause) { + done.countDown(); + } + + @Override + public void produceRequest( + final RequestChannel channel, + final HttpContext httpContext) throws HttpException, IOException { + requestProducer.sendRequest(channel, httpContext); + } + + @Override + public int available() { + return requestProducer.available(); + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + requestProducer.produce(channel); + } + + @Override + public void consumeInformation( + final HttpResponse response, + final HttpContext httpContext) throws HttpException, IOException { + // No-op + } + + @Override + public void consumeResponse( + final HttpResponse response, + final EntityDetails entityDetails, + final HttpContext httpContext) throws HttpException, IOException { + responseConsumer.consumeResponse(response, entityDetails, httpContext, null); + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + responseConsumer.updateCapacity(capacityChannel); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + responseConsumer.consume(src); + } + + @Override + public void streamEnd(final List trailers) + throws HttpException, IOException { + responseConsumer.streamEnd(trailers); + } + + }, HttpCoreContext.create()); + + done.await(5, TimeUnit.SECONDS); + } + + private static void executeWithLogging( + final AsyncClientEndpoint endpoint, + final URI requestUri, + final String label, + final CountDownLatch latch, + final AtomicBoolean stop) { + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(requestUri) + .build(); + final BasicResponseConsumer responseConsumer = new BasicResponseConsumer<>( + new StringAsyncEntityConsumer()); + + endpoint.execute(new AsyncClientExchangeHandler() { + + @Override + public void releaseResources() { + requestProducer.releaseResources(); + responseConsumer.releaseResources(); + stop.set(true); + latch.countDown(); + } + + @Override + public void cancel() { + System.out.println(label + " " + requestUri + " cancelled"); + } + + @Override + public void failed(final Exception cause) { + if (cause instanceof H2StreamTimeoutException) { + System.out.println(label + " expected per-stream timeout reset: " + + requestUri + " -> " + cause); + } else if (cause instanceof H2StreamResetException) { + System.out.println(label + " stream reset: " + + requestUri + " -> " + cause); + } else { + System.out.println(label + " failure: " + + requestUri + " -> " + cause); + } + } + + @Override + public void produceRequest( + final RequestChannel channel, + final HttpContext httpContext) throws HttpException, IOException { + System.out.println(label + " sending request: " + requestUri); + requestProducer.sendRequest(channel, httpContext); + } + + @Override + public int available() { + return requestProducer.available(); + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + requestProducer.produce(channel); + } + + @Override + public void consumeInformation( + final HttpResponse response, + final HttpContext httpContext) throws HttpException, IOException { + System.out.println(label + " " + requestUri + " -> informational " + + response.getCode()); + } + + @Override + public void consumeResponse( + final HttpResponse response, + final EntityDetails entityDetails, + final HttpContext httpContext) throws HttpException, IOException { + System.out.println(label + " response: " + + requestUri + " -> " + response.getCode()); + responseConsumer.consumeResponse(response, entityDetails, httpContext, null); + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + responseConsumer.updateCapacity(capacityChannel); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + responseConsumer.consume(src); + } + + @Override + public void streamEnd(final List trailers) + throws HttpException, IOException { + responseConsumer.streamEnd(trailers); + } + + }, HttpCoreContext.create()); + } + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index 0089a12e5..0a0ea1959 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.stream.IntStream; @@ -52,6 +53,7 @@ import org.apache.hc.core5.http2.H2ConnectionException; import org.apache.hc.core5.http2.H2Error; import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.http2.H2StreamTimeoutException; import org.apache.hc.core5.http2.WritableByteChannelMock; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.config.H2Param; @@ -65,6 +67,7 @@ import org.apache.hc.core5.http2.hpack.HPackEncoder; import org.apache.hc.core5.reactor.ProtocolIOSession; import org.apache.hc.core5.util.ByteArrayBuffer; +import org.apache.hc.core5.util.Timeout; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -1034,5 +1037,97 @@ void testPriorityUpdateContinuesAfterSettingsWithNoH2Equals1() throws Exception Assertions.assertTrue(idxPriUpd >= 0, "PRIORITY_UPDATE should be emitted when NO_RFC7540=1"); } + @Test + void testStreamIdleTimeoutTriggersH2StreamTimeoutException() throws Exception { + Mockito.when(protocolIOSession.getSocketTimeout()).thenReturn(Timeout.of(1, TimeUnit.NANOSECONDS)); + + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(invocation -> { + final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class); + final int remaining = buffer.remaining(); + buffer.position(buffer.limit()); + return remaining; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final H2Config h2Config = H2Config.custom().build(); + final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl( + protocolIOSession, + FRAME_FACTORY, + StreamIdGenerator.ODD, + httpProcessor, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + // Create a local stream and mark it active (initializeStreamTimeouts() se ejecuta aquĆ­) + final H2StreamChannel channel = streamMultiplexer.createChannel(1); + final H2Stream stream = streamMultiplexer.createStream(channel, streamHandler); + stream.activate(); + + streamMultiplexer.onOutput(); + + Mockito.verify(streamHandler).failed(exceptionCaptor.capture()); + final Exception cause = exceptionCaptor.getValue(); + Assertions.assertInstanceOf(H2StreamTimeoutException.class, cause); + + final H2StreamTimeoutException timeoutEx = (H2StreamTimeoutException) cause; + Assertions.assertTrue(timeoutEx.isIdleTimeout(), "Expected idle timeout flag"); + Assertions.assertEquals(1, timeoutEx.getStreamId(), "Unexpected stream id"); + + Assertions.assertTrue(stream.isLocalClosed()); + Assertions.assertTrue(stream.isClosed()); + } + + @Test + void testStreamLifetimeTimeoutTriggersH2StreamTimeoutException() throws Exception { + Mockito.when(protocolIOSession.getSocketTimeout()).thenReturn(Timeout.DISABLED); + + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(invocation -> { + final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class); + final int remaining = buffer.remaining(); + buffer.position(buffer.limit()); + return remaining; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final H2Config h2Config = H2Config.custom().build(); + final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl( + protocolIOSession, + FRAME_FACTORY, + StreamIdGenerator.ODD, + httpProcessor, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + final H2StreamChannel channel = streamMultiplexer.createChannel(3); + final H2Stream stream = streamMultiplexer.createStream(channel, streamHandler); + stream.activate(); + + stream.setIdleTimeout(null); + stream.setLifetimeTimeout(Timeout.of(1, TimeUnit.NANOSECONDS)); + + streamMultiplexer.onOutput(); + + Mockito.verify(streamHandler).failed(exceptionCaptor.capture()); + final Exception cause = exceptionCaptor.getValue(); + Assertions.assertInstanceOf(H2StreamTimeoutException.class, cause); + + final H2StreamTimeoutException timeoutEx = (H2StreamTimeoutException) cause; + Assertions.assertFalse(timeoutEx.isIdleTimeout(), "Expected lifetime timeout flag"); + Assertions.assertEquals(3, timeoutEx.getStreamId(), "Unexpected stream id"); + + Assertions.assertTrue(stream.isLocalClosed()); + Assertions.assertTrue(stream.isClosed()); + } + + + } diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java new file mode 100644 index 000000000..6fd506c75 --- /dev/null +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/async/AsyncH2SocketTimeoutCoreTest.java @@ -0,0 +1,198 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.testing.async; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hc.core5.function.Supplier; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStreamResetException; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.impl.routing.RequestRouter; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.ResponseChannel; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy; +import org.apache.hc.core5.http2.ssl.H2ServerTlsStrategy; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.testing.SSLTestContexts; +import org.apache.hc.core5.testing.extension.nio.H2AsyncRequesterResource; +import org.apache.hc.core5.testing.extension.nio.H2AsyncServerResource; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class AsyncH2SocketTimeoutCoreTest { + + private static final Timeout SOCKET_TIMEOUT = Timeout.ofSeconds(1); + private static final Timeout RESULT_TIMEOUT = Timeout.ofSeconds(30); + + @RegisterExtension + private final H2AsyncServerResource serverResource = new H2AsyncServerResource(); + + @RegisterExtension + private final H2AsyncRequesterResource clientResource = new H2AsyncRequesterResource(); + + public AsyncH2SocketTimeoutCoreTest() { + serverResource.configure(bootstrap -> bootstrap + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setTlsStrategy(new H2ServerTlsStrategy(SSLTestContexts.createServerSSLContext())) + .setIOReactorConfig( + IOReactorConfig.custom() + .setSoTimeout(Timeout.ofSeconds(30)) + .build()) + .setRequestRouter( + RequestRouter.>builder() + .addRoute( + RequestRouter.LOCAL_AUTHORITY, + "*", + SimpleDelayingHandler::new) + .resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER) + .build() + ) + ); + + clientResource.configure(bootstrap -> bootstrap + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setTlsStrategy(new H2ClientTlsStrategy(SSLTestContexts.createClientSSLContext())) + .setIOReactorConfig( + IOReactorConfig.custom() + .setSoTimeout(SOCKET_TIMEOUT) + .build()) + ); + } + + @Test + void testHttp2RequestTimeoutYieldsStreamReset() throws Exception { + final InetSocketAddress address = startServer(); + final HttpAsyncRequester requester = clientResource.start(); + + final URI requestUri = new URI("http://localhost:" + address.getPort() + "/timeout"); + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(requestUri).build(); + final BasicResponseConsumer responseConsumer = + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()); + + final Future> future = + requester.execute(requestProducer, responseConsumer, SOCKET_TIMEOUT, null); + + final ExecutionException ex = Assertions.assertThrows(ExecutionException.class, () -> future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit())); + + final Throwable cause = ex.getCause(); + Assertions.assertInstanceOf(HttpStreamResetException.class, cause, "Expected HttpStreamResetException, but got: " + cause); + } + + private InetSocketAddress startServer() throws Exception { + final HttpAsyncServer server = serverResource.start(); + final ListenerEndpoint listener = server.listen(new InetSocketAddress(0), URIScheme.HTTP).get(); + return (InetSocketAddress) listener.getAddress(); + } + + static final class SimpleDelayingHandler implements AsyncServerExchangeHandler { + + private final AtomicBoolean completed = new AtomicBoolean(false); + + @Override + public void handleRequest( + final HttpRequest request, + final EntityDetails entityDetails, + final ResponseChannel responseChannel, + final HttpContext context) throws HttpException, IOException { + // Intentionally do nothing: no response is sent back. + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + // Accept any amount of request data. + capacityChannel.update(Integer.MAX_VALUE); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + // Discard request body if present. + if (src != null) { + src.position(src.limit()); + } + } + + @Override + public void streamEnd(final List trailers) + throws HttpException, IOException { + // Nothing special to do on stream end for this test. + } + + @Override + public int available() { + // No response body to produce. + return 0; + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + // In this test we never send a response; just ensure the stream is closed + // if produce gets called. + if (completed.compareAndSet(false, true)) { + channel.endStream(); + } + } + + @Override + public void failed(final Exception cause) { + // No-op for this simple test handler. + } + + @Override + public void releaseResources() { + // No resources to release. + } + + } + +}