diff --git a/core/src/main/java/tech/kwik/core/QuicConnection.java b/core/src/main/java/tech/kwik/core/QuicConnection.java index 97b50493..78da08cd 100644 --- a/core/src/main/java/tech/kwik/core/QuicConnection.java +++ b/core/src/main/java/tech/kwik/core/QuicConnection.java @@ -96,12 +96,19 @@ enum QuicVersion { void setPeerInitiatedStreamCallback(Consumer streamConsumer); + ConnectionListener getConnectionListener(); /** * Register a listener that will be called when the connection is established or terminated. * @param connectionListener */ void setConnectionListener(ConnectionListener connectionListener); + StreamReadListener getStreamReadListener(); + void setStreamReadListener(StreamReadListener listener); + + StreamWriteListener getStreamWriteListener(); + void setStreamWriteListener(StreamWriteListener listener); + void close(); /** diff --git a/core/src/main/java/tech/kwik/core/QuicStream.java b/core/src/main/java/tech/kwik/core/QuicStream.java index a888644d..1e6cbc52 100644 --- a/core/src/main/java/tech/kwik/core/QuicStream.java +++ b/core/src/main/java/tech/kwik/core/QuicStream.java @@ -29,7 +29,7 @@ * unidirectional or bidirectional." * */ -public interface QuicStream { +public interface QuicStream extends AutoCloseable { /** * Returns the input stream for reading data sent by the peer. diff --git a/core/src/main/java/tech/kwik/core/StreamReadListener.java b/core/src/main/java/tech/kwik/core/StreamReadListener.java new file mode 100644 index 00000000..779fec4f --- /dev/null +++ b/core/src/main/java/tech/kwik/core/StreamReadListener.java @@ -0,0 +1,32 @@ +/* + * Copyright © 2024, 2025 Peter Doornbosch + * + * This file is part of Kwik, an implementation of the QUIC protocol in Java. + * + * Kwik is free software: you can redistribute it and/or modify it under + * the terms of the GNU Lesser General Public License as published by the + * Free Software Foundation, either version 3 of the License, or (at your option) + * any later version. + * + * Kwik is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for + * more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ +package tech.kwik.core; + +/** + * Listener that is notified when data is read from a {@link QuicStream}. + */ +public interface StreamReadListener { + + /** + * Called when bytes are read from a stream. + * @param stream the stream that data was read from + * @param amount the number of bytes read + */ + void read(QuicStream stream, long amount); +} diff --git a/core/src/main/java/tech/kwik/core/StreamWriteListener.java b/core/src/main/java/tech/kwik/core/StreamWriteListener.java new file mode 100644 index 00000000..11cd2aa0 --- /dev/null +++ b/core/src/main/java/tech/kwik/core/StreamWriteListener.java @@ -0,0 +1,33 @@ +/* + * Copyright © 2024, 2025 Peter Doornbosch + * + * This file is part of Kwik, an implementation of the QUIC protocol in Java. + * + * Kwik is free software: you can redistribute it and/or modify it under + * the terms of the GNU Lesser General Public License as published by the + * Free Software Foundation, either version 3 of the License, or (at your option) + * any later version. + * + * Kwik is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for + * more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ +package tech.kwik.core; + +/** + * Listener that is notified when data is written to a {@link QuicStream}. + */ +@FunctionalInterface +public interface StreamWriteListener { + + /** + * Called when bytes are written to a stream. + * @param stream the stream that data was written to + * @param amount the number of bytes written + */ + void write(QuicStream stream, long amount); +} diff --git a/core/src/main/java/tech/kwik/core/concurrent/DaemonThreadFactory.java b/core/src/main/java/tech/kwik/core/concurrent/DaemonThreadFactory.java index bc10b816..352e47c4 100644 --- a/core/src/main/java/tech/kwik/core/concurrent/DaemonThreadFactory.java +++ b/core/src/main/java/tech/kwik/core/concurrent/DaemonThreadFactory.java @@ -18,27 +18,63 @@ */ package tech.kwik.core.concurrent; - +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** - * Creates daemon threads. Java's default thread factory used in executors creates non-daemon threads that - * prevent JVM from shutting down. + * Creates daemon threads. Java's default thread factory used in executors creates non-daemon + * threads that prevent JVM from shutting down. */ public class DaemonThreadFactory implements ThreadFactory { private final String threadBaseName; private final AtomicInteger threadNumber = new AtomicInteger(1); + private ThreadFactory virtualFactory; public DaemonThreadFactory(String threadBaseName) { this.threadBaseName = threadBaseName; + + if (Runtime.version().feature() >= 24) { + try { + var lookup = MethodHandles.lookup(); + var builderClass = Class.forName("java.lang.Thread$Builder$OfVirtual"); + // public static Builder.OfVirtual ofVirtual() + var ofVirtualHandle = + lookup.findStatic(Thread.class, "ofVirtual", MethodType.methodType(builderClass)); + + // 2. public Thread.Builder name(String prefix, long start) + var nameHandle = + lookup.findVirtual( + builderClass, + "name", + MethodType.methodType(builderClass, String.class, long.class)); + + // 3. Invoke Thread.ofVirtual().name(threadBaseName, 0) + var namedBuilder = nameHandle.invoke(ofVirtualHandle.invoke(), threadBaseName, 0L); + + // 4. public ThreadFactory factory() + var factoryHandle = + lookup.findVirtual(builderClass, "factory", MethodType.methodType(ThreadFactory.class)); + + // 5. Invoke namedBuilder.factory() + this.virtualFactory = (ThreadFactory) factoryHandle.invoke(namedBuilder); + } catch (Throwable e) { + // impossible + } + } } @Override public Thread newThread(Runnable runnable) { + + if (virtualFactory != null) { + return virtualFactory.newThread(runnable); + } + Thread thread = new Thread(runnable, threadBaseName + "-" + threadNumber.getAndIncrement()); thread.setDaemon(true); return thread; } -} +} \ No newline at end of file diff --git a/core/src/main/java/tech/kwik/core/concurrent/VirtualExecutor.java b/core/src/main/java/tech/kwik/core/concurrent/VirtualExecutor.java new file mode 100644 index 00000000..bee64b95 --- /dev/null +++ b/core/src/main/java/tech/kwik/core/concurrent/VirtualExecutor.java @@ -0,0 +1,51 @@ +package tech.kwik.core.concurrent; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +/** + * Utility class to reflectively invoke the Executors.newThreadPerTaskExecutor static method using + * the Method Handles API. + */ +public class VirtualExecutor { + + private static final boolean SUPPORTED = Runtime.version().feature() >= 24; + + private static MethodHandle handle; + + static { + try { + handle = + MethodHandles.publicLookup() + .findStatic( + Executors.class, + "newThreadPerTaskExecutor", + MethodType.methodType(ExecutorService.class, ThreadFactory.class)); + } catch (Exception __) { + // failing is of no consequence + } + } + + /** Returns true if virtual threads are supported in this JVM */ + public static boolean supported() { + return SUPPORTED; + } + + /** + * Reflectively creates a virtual thread executor + * + * @param name the name of the threads + * @return A new ExecutorService instance backed by virtual threads. + */ + public static ExecutorService createExecutor(String name) { + try { + return (ExecutorService) handle.invoke(new DaemonThreadFactory(name)); + } catch (Throwable e) { + throw new UnsupportedOperationException("this jvm doesn't support virtual threads"); + } + } +} \ No newline at end of file diff --git a/core/src/main/java/tech/kwik/core/crypto/CryptoStream.java b/core/src/main/java/tech/kwik/core/crypto/CryptoStream.java index fda42204..9dd0b8c2 100644 --- a/core/src/main/java/tech/kwik/core/crypto/CryptoStream.java +++ b/core/src/main/java/tech/kwik/core/crypto/CryptoStream.java @@ -96,7 +96,7 @@ public CryptoStream(VersionHolder quicVersion, EncryptionLevel encryptionLevel, tlsMessageParser = new TlsMessageParser(this::quicExtensionsParser); dataToSend = new ArrayList<>(); maxMessageSize = determineMaxMessageSize(role, encryptionLevel); - receiveBuffer = new ReceiveBufferImpl(); + receiveBuffer = new ReceiveBufferImpl(null); } public CryptoStream(VersionHolder quicVersion, EncryptionLevel encryptionLevel, Role role, Logger log) { diff --git a/core/src/main/java/tech/kwik/core/impl/QuicConnectionImpl.java b/core/src/main/java/tech/kwik/core/impl/QuicConnectionImpl.java index 2bfed17f..0bf87468 100644 --- a/core/src/main/java/tech/kwik/core/impl/QuicConnectionImpl.java +++ b/core/src/main/java/tech/kwik/core/impl/QuicConnectionImpl.java @@ -142,6 +142,8 @@ protected enum ErrorType { private volatile ConnectionCloseFrame lastConnectionCloseFrameSent; private final ScheduledExecutorService scheduler; private ConnectionListener connectionListener; + private StreamReadListener readListener; + private StreamWriteListener writeListener; protected final ExecutorService callbackThread; // https://datatracker.ietf.org/doc/html/rfc9221 Datagram Extension @@ -150,7 +152,6 @@ protected enum ErrorType { private volatile Consumer datagramHandler; private volatile ExecutorService datagramHandlerExecutor; - protected QuicConnectionImpl(Version originalVersion, Role role, Path secretsFile, ConnectionConfig settings, String id, Logger log) { this.quicVersion = new VersionHolder(originalVersion); this.role = role; @@ -961,11 +962,33 @@ public QuicVersion getQuicVersion() { return quicVersion.getVersion().toQuicVersion(); } + @Override + public ConnectionListener getConnectionListener() { + return connectionListener; + } @Override public void setConnectionListener(ConnectionListener connectionListener) { this.connectionListener = connectionListener; } + @Override + public StreamReadListener getStreamReadListener() { + return readListener; + } + @Override + public void setStreamReadListener(StreamReadListener listener) { + this.readListener = listener; + } + + @Override + public StreamWriteListener getStreamWriteListener() { + return writeListener; + } + @Override + public void setStreamWriteListener(StreamWriteListener listener) { + this.writeListener = listener; + } + protected abstract boolean usingIPv4(); protected abstract PacketFilter createProcessorChain(); diff --git a/core/src/main/java/tech/kwik/core/stream/ListenerThreadPool.java b/core/src/main/java/tech/kwik/core/stream/ListenerThreadPool.java new file mode 100644 index 00000000..2abf4c65 --- /dev/null +++ b/core/src/main/java/tech/kwik/core/stream/ListenerThreadPool.java @@ -0,0 +1,64 @@ +package tech.kwik.core.stream; + +import tech.kwik.core.concurrent.VirtualExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + +import static java.lang.Thread.currentThread; +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static java.util.concurrent.TimeUnit.DAYS; + +final class ListenerThreadPool implements Executor, AutoCloseable { + + private final ExecutorService executor; + private final boolean virtual; + + ListenerThreadPool() { + if (VirtualExecutor.supported()) { + this.executor = VirtualExecutor.createExecutor("kwik-listener"); + this.virtual = true; + return; + } + this.executor = newSingleThreadExecutor(runnable -> { + Thread thread = new Thread(runnable, "kwik-listener"); + thread.setDaemon(true); + return thread; + }); + this.virtual = false; + } + + @Override + public void execute(Runnable command) { + this.executor.execute(command); + } + + /** + * Implementation of {@link AutoCloseable#close()} that performs an + * orderly shutdown of {@link #executor}. + * + * @implNote This is a clone of OpenJDK 19+ default close method + * available directly on the newer {@code ExecutorService} interface. + */ + @Override + public void close() { + if (this.virtual) return; + + boolean terminated = this.executor.isTerminated(); + if (terminated) return; + + this.executor.shutdown(); + boolean interrupted = false; + while (!terminated) { + try { + terminated = this.executor.awaitTermination(1L, DAYS); + } catch (InterruptedException e) { + if (interrupted) continue; + this.executor.shutdownNow(); + interrupted = true; + } + } + if (!interrupted) return; + currentThread().interrupt(); + } +} diff --git a/core/src/main/java/tech/kwik/core/stream/QuicStreamImpl.java b/core/src/main/java/tech/kwik/core/stream/QuicStreamImpl.java index 4bfc0b54..88f22c66 100644 --- a/core/src/main/java/tech/kwik/core/stream/QuicStreamImpl.java +++ b/core/src/main/java/tech/kwik/core/stream/QuicStreamImpl.java @@ -39,6 +39,7 @@ public class QuicStreamImpl implements QuicStream { protected final int streamId; protected final Role role; protected final QuicConnectionImpl connection; + final ListenerThreadPool listenerPool; private final StreamManager streamManager; protected final Logger log; private final StreamInputStream inputStream; @@ -47,7 +48,6 @@ public class QuicStreamImpl implements QuicStream { private volatile boolean inputClosed; private final ReentrantLock stateLock; - public QuicStreamImpl(int streamId, Role role, QuicConnectionImpl connection, StreamManager streamManager, FlowControl flowController) { this(Version.getDefault(), streamId, role, connection, streamManager, flowController, new NullLogger()); } @@ -77,12 +77,14 @@ public QuicStreamImpl(Version quicVersion, int streamId, Role role, QuicConnecti if (isBidirectional() || isUnidirectional() && isSelfInitiated()) { outputStream = createStreamOutputStream(sendBufferSize, flowController); + ((StreamOutputStreamImpl) outputStream).getSendBuffer().notifyCanWrite(true); } else { outputStream = new NullStreamOutputStream(); } stateLock = new ReentrantLock(); + listenerPool = new ListenerThreadPool(); } private long determineInitialReceiveBufferSize() { @@ -104,6 +106,13 @@ public OutputStream getOutputStream() { return outputStream; } + public boolean isOutputClosed() { + return outputClosed; + } + public boolean isInputClosed() { + return inputClosed; + } + /** * Adds data from a newly received frame to the stream. * @@ -196,8 +205,16 @@ protected StreamOutputStream createStreamOutputStream(Integer sendBufferSize, Fl long terminateStream(long errorCode, long finalSize) throws TransportError { return inputStream.terminate(errorCode, finalSize); } - - // TODO: QuicStream should have a close method that closes both input and output stream and releases all resources and marks itself as terminated. + + @Override + public void close() { + // TODO: QuicStream should have a close method that closes both input + // and output stream and releases all resources and marks itself as + // terminated. + + // currently only closing the listener thread pool + this.listenerPool.close(); + } /** * Resets the output stream so data can again be send from the start of the stream (offset 0). Note that in such diff --git a/core/src/main/java/tech/kwik/core/stream/ReceiveBufferImpl.java b/core/src/main/java/tech/kwik/core/stream/ReceiveBufferImpl.java index 6356312c..02f818b7 100644 --- a/core/src/main/java/tech/kwik/core/stream/ReceiveBufferImpl.java +++ b/core/src/main/java/tech/kwik/core/stream/ReceiveBufferImpl.java @@ -40,6 +40,8 @@ public class ReceiveBufferImpl implements ReceiveBuffer { private static final int DEFAULT_MAX_COMBINED_FRAME_SIZE = 5120; + private final QuicStreamImpl stream; + private final NavigableSet outOfOrderFrames = new ConcurrentSkipListSet<>(); private final Queue contiguousFrames = new ConcurrentLinkedQueue<>(); private volatile long contiguousUpToOffset = 0; @@ -49,8 +51,10 @@ public class ReceiveBufferImpl implements ReceiveBuffer { private final int maxCombinedFrameSize; private volatile boolean discarded; - public ReceiveBufferImpl() { - this(DEFAULT_MAX_COMBINED_FRAME_SIZE); + private volatile long lastAvailData = -1; + + public ReceiveBufferImpl(QuicStreamImpl stream) { + this(stream, DEFAULT_MAX_COMBINED_FRAME_SIZE); } /** @@ -64,8 +68,23 @@ public ReceiveBufferImpl() { * @param maxCombinedFrameSize the maximum size of a combined frame (i.e. when frames are combined to remove * overlap, the resulting frame will not be larger than this size). */ - public ReceiveBufferImpl(int maxCombinedFrameSize) { + public ReceiveBufferImpl(QuicStreamImpl stream, int maxCombinedFrameSize) { this.maxCombinedFrameSize = maxCombinedFrameSize; + this.stream = stream; + } + + private void notifyWriter(boolean force) { + if (stream == null || stream.connection == null || stream.connection.getStreamReadListener() == null) return; + if (stream.isInputClosed()) return; + + long avail = bytesAvailable(); + if (force || avail != lastAvailData) { + lastAvailData = avail; + this.stream.listenerPool.execute(() -> { + if (stream.isInputClosed()) return; + stream.connection.getStreamReadListener().read(stream, avail); + }); + } } @Override @@ -96,6 +115,8 @@ public int read(ByteBuffer buffer) { nextFrame = contiguousFrames.peek(); } } + + if (totalBytesRead > 0) notifyWriter(false); return totalBytesRead; } @@ -133,9 +154,12 @@ public boolean add(StreamElement frame) { bufferedOutOfOrderData -= nextFrame.getLength(); } } + + // notificar sempre que a quantidade (bytesAvailable) mudar (entrou dado/ficou contíguo/FIN drenou) + notifyWriter(false); + return contiguousUpToOffset > previousContiguousUpToOffset; - } - catch (Exception e) { + } catch (Exception e) { // Because the add method is the only method making modifications to the outOfOrderFrames, race conditions // will not occur. However, there is one exception: the discardAllData method. Concurrent call to this method // can cause a race condition, which can lead to various runtime exceptions in the code block wrapped by this @@ -308,6 +332,7 @@ public void discardAllData() { outOfOrderFrames.clear(); bufferedOutOfOrderData = 0; contiguousFrames.clear(); + notifyWriter(true); } private static class SimpleStreamElement implements StreamElement { @@ -357,7 +382,7 @@ public int compareTo(StreamElement other) { @Override public String toString() { - return "" + offset + ".." + (offset + data.length - 1); + return offset + ".." + (offset + data.length - 1); } } -} +} \ No newline at end of file diff --git a/core/src/main/java/tech/kwik/core/stream/SendBuffer.java b/core/src/main/java/tech/kwik/core/stream/SendBuffer.java index 8e6f4355..34534625 100644 --- a/core/src/main/java/tech/kwik/core/stream/SendBuffer.java +++ b/core/src/main/java/tech/kwik/core/stream/SendBuffer.java @@ -18,12 +18,12 @@ */ package tech.kwik.core.stream; +import tech.kwik.core.StreamWriteListener; import tech.kwik.core.frame.StreamFrame; import tech.kwik.core.impl.Version; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; @@ -36,6 +36,8 @@ */ public class SendBuffer { + private final QuicStreamImpl stream; + // Send queue contains stream bytes to send in order. The position of the first byte buffer in the queue determines the next byte(s) to send. private final Queue sendQueue; private final ByteBuffer END_OF_STREAM_MARKER = ByteBuffer.allocate(0); @@ -45,8 +47,13 @@ public class SendBuffer { private final Condition notFull; private volatile Thread blockingWriterThread; + private volatile boolean closed = false; + + private volatile int lastReportedAvail = Integer.MIN_VALUE; + private volatile boolean lastReportedWritable = true; - public SendBuffer(Integer sendBufferSize) { + public SendBuffer(QuicStreamImpl stream, Integer sendBufferSize) { + this.stream = stream; sendQueue = new ConcurrentLinkedDeque<>(); if (sendBufferSize != null && sendBufferSize > 0) { maxBufferSize = sendBufferSize; @@ -59,14 +66,30 @@ public SendBuffer(Integer sendBufferSize) { notFull = bufferLock.newCondition(); } + public void notifyCanWrite(boolean force) { + StreamWriteListener listener = (stream != null && stream.connection != null) + ? stream.connection.getStreamWriteListener() : + null; + if (listener == null) return; + else if (stream.isOutputClosed()) return; + + int avail = getAvailableBytes(); + boolean canWrite = !closed && avail > 0; + int report = canWrite ? avail : 0; + + if (force || report != lastReportedAvail || canWrite != lastReportedWritable) { + lastReportedAvail = report; + lastReportedWritable = canWrite; + this.stream.listenerPool.execute(() -> { + if (stream.isOutputClosed()) return; + listener.write(stream, report); + }); + } + } + /** * Writes data to the buffer. If the buffer is full, the method will block until there is enough space in the buffer. * This method makes defensive copies of the data. - * @param data - * @param off - * @param len - * @throws IOException - * @throws InterruptedException */ public void write(byte[] data, int off, int len) throws IOException, InterruptedException { int availableBufferSpace = maxBufferSize - bufferedBytes.get(); @@ -89,8 +112,10 @@ public void write(byte[] data, int off, int len) throws IOException, Interrupted } } - sendQueue.add(ByteBuffer.wrap(Arrays.copyOfRange(data, off, off + len))); + sendQueue.add(ByteBuffer.wrap(java.util.Arrays.copyOfRange(data, off, off + len))); bufferedBytes.getAndAdd(len); + + notifyCanWrite(false); } public StreamFrame getStreamFrame(Version quicVersion, int streamId, long currentOffset, int maxBytesToSend) { @@ -130,29 +155,40 @@ public StreamFrame getStreamFrame(Version quicVersion, int streamId, long curren finally { bufferLock.unlock(); } + + notifyCanWrite(false); + if (nrOfBytes < maxBytesToSend) { // This can happen when not enough data is buffer to fill a stream frame, or length field is 1 byte (instead of 2 that was counted for) - dataToSend = Arrays.copyOfRange(dataToSend, 0, nrOfBytes); + dataToSend = java.util.Arrays.copyOfRange(dataToSend, 0, nrOfBytes); } - StreamFrame streamFrame = new StreamFrame(quicVersion, streamId, currentOffset, dataToSend, finalFrame); - return streamFrame; + + return new StreamFrame(quicVersion, streamId, currentOffset, dataToSend, finalFrame); } - public int getAvailableBytes() { + public int getBufferedBytes() { return bufferedBytes.get(); } + public int getAvailableBytes() { + return getMaxSize() - getBufferedBytes(); + } public boolean isEmpty() { return sendQueue.isEmpty(); } - public void close() { - sendQueue.add(END_OF_STREAM_MARKER); - } - public void clear() { sendQueue.clear(); bufferedBytes.set(0); + closed = false; + notifyCanWrite(true); + } + + public void close() { + if (closed) return; + closed = true; + sendQueue.add(END_OF_STREAM_MARKER); + notifyCanWrite(true); } public void interruptBlockedWriter() { diff --git a/core/src/main/java/tech/kwik/core/stream/StreamInputStreamImpl.java b/core/src/main/java/tech/kwik/core/stream/StreamInputStreamImpl.java index a3b0e6a9..d5b1a3be 100644 --- a/core/src/main/java/tech/kwik/core/stream/StreamInputStreamImpl.java +++ b/core/src/main/java/tech/kwik/core/stream/StreamInputStreamImpl.java @@ -61,7 +61,7 @@ class StreamInputStreamImpl extends StreamInputStream { public StreamInputStreamImpl(QuicStreamImpl quicStream, long receiveBufferSize, Logger log) { this.quicStream = quicStream; this.log = log; - receiveBuffer = new ReceiveBufferImpl(); + receiveBuffer = new ReceiveBufferImpl(quicStream); receiverFlowControlLimit = receiveBufferSize; lastCommunicatedMaxData = receiverFlowControlLimit; diff --git a/core/src/main/java/tech/kwik/core/stream/StreamOutputStreamImpl.java b/core/src/main/java/tech/kwik/core/stream/StreamOutputStreamImpl.java index 0e324ea8..ee867ffb 100644 --- a/core/src/main/java/tech/kwik/core/stream/StreamOutputStreamImpl.java +++ b/core/src/main/java/tech/kwik/core/stream/StreamOutputStreamImpl.java @@ -19,10 +19,10 @@ package tech.kwik.core.stream; import tech.kwik.core.common.EncryptionLevel; -import tech.kwik.core.frame.DataBlockedFrame; import tech.kwik.core.frame.QuicFrame; import tech.kwik.core.frame.ResetStreamFrame; import tech.kwik.core.frame.StreamDataBlockedFrame; +import tech.kwik.core.frame.DataBlockedFrame; import tech.kwik.core.frame.StreamFrame; import tech.kwik.core.log.Logger; @@ -39,7 +39,6 @@ class StreamOutputStreamImpl extends StreamOutputStream implements FlowControlUp private static final int MIN_FRAME_SIZE = 1 + 8 + 8 + 2 + 1; private final QuicStreamImpl quicStream; - private final Object lock = new Object(); private final SendBuffer sendBuffer; private final Logger log; @@ -64,7 +63,7 @@ class StreamOutputStreamImpl extends StreamOutputStream implements FlowControlUp StreamOutputStreamImpl(QuicStreamImpl quicStream, Integer sendBufferSize, FlowControl flowControl, Logger log) { this.quicStream = quicStream; flowController = flowControl; - sendBuffer = new SendBuffer(sendBufferSize); + sendBuffer = new SendBuffer(quicStream, sendBufferSize); this.log = log; maxBufferSize = sendBuffer.getMaxSize(); retransmitBuffer = new RetransmitBuffer(); @@ -170,7 +169,7 @@ else if (sendBuffer.hasData()) { long flowControlLimit = flowController.getFlowControlLimit(quicStream); assert (flowControlLimit >= currentOffset); - int maxBytesToSend = sendBuffer.getAvailableBytes(); + int maxBytesToSend = sendBuffer.getBufferedBytes(); if (flowControlLimit > currentOffset || maxBytesToSend == 0) { StreamFrame dummy = new StreamFrame(quicStream.quicVersion, quicStream.streamId, currentOffset, new byte[0], false); maxBytesToSend = Integer.min(maxBytesToSend, maxFrameSize - dummy.getFrameLength() - 1); // Take one byte extra for length field var int @@ -264,7 +263,11 @@ private void retransmitStreamFrame(QuicFrame frame) { protected EncryptionLevel getEncryptionLevel() { return App; } - + + SendBuffer getSendBuffer() { + return sendBuffer; + } + // Very confusing name: this has nothing to do with resetting the stream as the reset method does! protected void resetOutputStream() { closed = false; @@ -302,7 +305,7 @@ private void discardAllData() { } private QuicFrame createResetFrame(int maxFrameSize) { - assert (reset == true); + assert reset; return new ResetStreamFrame(quicStream.streamId, resetErrorCode, currentOffset); } diff --git a/core/src/test/java/tech/kwik/core/stream/ReceiveBufferImplTest.java b/core/src/test/java/tech/kwik/core/stream/ReceiveBufferImplTest.java index 90352dba..b54392ed 100644 --- a/core/src/test/java/tech/kwik/core/stream/ReceiveBufferImplTest.java +++ b/core/src/test/java/tech/kwik/core/stream/ReceiveBufferImplTest.java @@ -16,7 +16,7 @@ class ReceiveBufferImplTest { @BeforeEach void setUpObjectUnderTest() { - receiveBuffer = new ReceiveBufferImpl(MAX_COMBINED_FRAME_SIZE); + receiveBuffer = new ReceiveBufferImpl(null, MAX_COMBINED_FRAME_SIZE); } @Test