Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/main/java/tech/kwik/core/QuicConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,19 @@ enum QuicVersion {

void setPeerInitiatedStreamCallback(Consumer<QuicStream> streamConsumer);

ConnectionListener getConnectionListener();
/**
* Register a listener that will be called when the connection is established or terminated.
* @param connectionListener
*/
void setConnectionListener(ConnectionListener connectionListener);

StreamReadListener getStreamReadListener();
void setStreamReadListener(StreamReadListener listener);

StreamWriteListener getStreamWriteListener();
void setStreamWriteListener(StreamWriteListener listener);

void close();

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/tech/kwik/core/QuicStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* unidirectional or bidirectional."
*
*/
public interface QuicStream {
public interface QuicStream extends AutoCloseable {

/**
* Returns the input stream for reading data sent by the peer.
Expand Down
32 changes: 32 additions & 0 deletions core/src/main/java/tech/kwik/core/StreamReadListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright © 2024, 2025 Peter Doornbosch
*
* This file is part of Kwik, an implementation of the QUIC protocol in Java.
*
* Kwik is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the
* Free Software Foundation, either version 3 of the License, or (at your option)
* any later version.
*
* Kwik is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package tech.kwik.core;

/**
* Listener that is notified when data is read from a {@link QuicStream}.
*/
public interface StreamReadListener {

/**
* Called when bytes are read from a stream.
* @param stream the stream that data was read from
* @param amount the number of bytes read
*/
void read(QuicStream stream, long amount);
}
33 changes: 33 additions & 0 deletions core/src/main/java/tech/kwik/core/StreamWriteListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright © 2024, 2025 Peter Doornbosch
*
* This file is part of Kwik, an implementation of the QUIC protocol in Java.
*
* Kwik is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the
* Free Software Foundation, either version 3 of the License, or (at your option)
* any later version.
*
* Kwik is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package tech.kwik.core;

/**
* Listener that is notified when data is written to a {@link QuicStream}.
*/
@FunctionalInterface
public interface StreamWriteListener {

/**
* Called when bytes are written to a stream.
* @param stream the stream that data was written to
* @param amount the number of bytes written
*/
void write(QuicStream stream, long amount);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,63 @@
*/
package tech.kwik.core.concurrent;


import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Creates daemon threads. Java's default thread factory used in executors creates non-daemon threads that
* prevent JVM from shutting down.
* Creates daemon threads. Java's default thread factory used in executors creates non-daemon
* threads that prevent JVM from shutting down.
*/
public class DaemonThreadFactory implements ThreadFactory {

private final String threadBaseName;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private ThreadFactory virtualFactory;

public DaemonThreadFactory(String threadBaseName) {
this.threadBaseName = threadBaseName;

if (Runtime.version().feature() >= 24) {
try {
var lookup = MethodHandles.lookup();
var builderClass = Class.forName("java.lang.Thread$Builder$OfVirtual");
// public static Builder.OfVirtual ofVirtual()
var ofVirtualHandle =
lookup.findStatic(Thread.class, "ofVirtual", MethodType.methodType(builderClass));

// 2. public Thread.Builder name(String prefix, long start)
var nameHandle =
lookup.findVirtual(
builderClass,
"name",
MethodType.methodType(builderClass, String.class, long.class));

// 3. Invoke Thread.ofVirtual().name(threadBaseName, 0)
var namedBuilder = nameHandle.invoke(ofVirtualHandle.invoke(), threadBaseName, 0L);

// 4. public ThreadFactory factory()
var factoryHandle =
lookup.findVirtual(builderClass, "factory", MethodType.methodType(ThreadFactory.class));

// 5. Invoke namedBuilder.factory()
this.virtualFactory = (ThreadFactory) factoryHandle.invoke(namedBuilder);
} catch (Throwable e) {
// impossible
}
}
}

@Override
public Thread newThread(Runnable runnable) {

if (virtualFactory != null) {
return virtualFactory.newThread(runnable);
}

Thread thread = new Thread(runnable, threadBaseName + "-" + threadNumber.getAndIncrement());
thread.setDaemon(true);
return thread;
}
}
}
51 changes: 51 additions & 0 deletions core/src/main/java/tech/kwik/core/concurrent/VirtualExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package tech.kwik.core.concurrent;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
* Utility class to reflectively invoke the Executors.newThreadPerTaskExecutor static method using
* the Method Handles API.
*/
public class VirtualExecutor {

private static final boolean SUPPORTED = Runtime.version().feature() >= 24;

private static MethodHandle handle;

static {
try {
handle =
MethodHandles.publicLookup()
.findStatic(
Executors.class,
"newThreadPerTaskExecutor",
MethodType.methodType(ExecutorService.class, ThreadFactory.class));
} catch (Exception __) {
// failing is of no consequence
}
}

/** Returns true if virtual threads are supported in this JVM */
public static boolean supported() {
return SUPPORTED;
}

/**
* Reflectively creates a virtual thread executor
*
* @param name the name of the threads
* @return A new ExecutorService instance backed by virtual threads.
*/
public static ExecutorService createExecutor(String name) {
try {
return (ExecutorService) handle.invoke(new DaemonThreadFactory(name));
} catch (Throwable e) {
throw new UnsupportedOperationException("this jvm doesn't support virtual threads");
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/tech/kwik/core/crypto/CryptoStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public CryptoStream(VersionHolder quicVersion, EncryptionLevel encryptionLevel,
tlsMessageParser = new TlsMessageParser(this::quicExtensionsParser);
dataToSend = new ArrayList<>();
maxMessageSize = determineMaxMessageSize(role, encryptionLevel);
receiveBuffer = new ReceiveBufferImpl();
receiveBuffer = new ReceiveBufferImpl(null);
}

public CryptoStream(VersionHolder quicVersion, EncryptionLevel encryptionLevel, Role role, Logger log) {
Expand Down
25 changes: 24 additions & 1 deletion core/src/main/java/tech/kwik/core/impl/QuicConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ protected enum ErrorType {
private volatile ConnectionCloseFrame lastConnectionCloseFrameSent;
private final ScheduledExecutorService scheduler;
private ConnectionListener connectionListener;
private StreamReadListener readListener;
private StreamWriteListener writeListener;
protected final ExecutorService callbackThread;

// https://datatracker.ietf.org/doc/html/rfc9221 Datagram Extension
Expand All @@ -150,7 +152,6 @@ protected enum ErrorType {
private volatile Consumer<byte[]> datagramHandler;
private volatile ExecutorService datagramHandlerExecutor;


protected QuicConnectionImpl(Version originalVersion, Role role, Path secretsFile, ConnectionConfig settings, String id, Logger log) {
this.quicVersion = new VersionHolder(originalVersion);
this.role = role;
Expand Down Expand Up @@ -961,11 +962,33 @@ public QuicVersion getQuicVersion() {
return quicVersion.getVersion().toQuicVersion();
}

@Override
public ConnectionListener getConnectionListener() {
return connectionListener;
}
@Override
public void setConnectionListener(ConnectionListener connectionListener) {
this.connectionListener = connectionListener;
}

@Override
public StreamReadListener getStreamReadListener() {
return readListener;
}
@Override
public void setStreamReadListener(StreamReadListener listener) {
this.readListener = listener;
}

@Override
public StreamWriteListener getStreamWriteListener() {
return writeListener;
}
@Override
public void setStreamWriteListener(StreamWriteListener listener) {
this.writeListener = listener;
}

protected abstract boolean usingIPv4();

protected abstract PacketFilter createProcessorChain();
Expand Down
64 changes: 64 additions & 0 deletions core/src/main/java/tech/kwik/core/stream/ListenerThreadPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package tech.kwik.core.stream;

import tech.kwik.core.concurrent.VirtualExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import static java.lang.Thread.currentThread;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.TimeUnit.DAYS;

final class ListenerThreadPool implements Executor, AutoCloseable {

private final ExecutorService executor;
private final boolean virtual;

ListenerThreadPool() {
if (VirtualExecutor.supported()) {
this.executor = VirtualExecutor.createExecutor("kwik-listener");
this.virtual = true;
return;
}
this.executor = newSingleThreadExecutor(runnable -> {
Thread thread = new Thread(runnable, "kwik-listener");
thread.setDaemon(true);
return thread;
});
this.virtual = false;
}

@Override
public void execute(Runnable command) {
this.executor.execute(command);
}

/**
* Implementation of {@link AutoCloseable#close()} that performs an
* orderly shutdown of {@link #executor}.
*
* @implNote This is a clone of OpenJDK 19+ default close method
* available directly on the newer {@code ExecutorService} interface.
*/
@Override
public void close() {
if (this.virtual) return;

boolean terminated = this.executor.isTerminated();
if (terminated) return;

this.executor.shutdown();
boolean interrupted = false;
while (!terminated) {
try {
terminated = this.executor.awaitTermination(1L, DAYS);
} catch (InterruptedException e) {
if (interrupted) continue;
this.executor.shutdownNow();
interrupted = true;
}
}
if (!interrupted) return;
currentThread().interrupt();
}
}
Loading