diff --git a/core/src/main/java/tech/kwik/core/concurrent/DaemonThreadFactory.java b/core/src/main/java/tech/kwik/core/concurrent/DaemonThreadFactory.java index bc10b816..79aa35cc 100644 --- a/core/src/main/java/tech/kwik/core/concurrent/DaemonThreadFactory.java +++ b/core/src/main/java/tech/kwik/core/concurrent/DaemonThreadFactory.java @@ -18,25 +18,61 @@ */ package tech.kwik.core.concurrent; - +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** - * Creates daemon threads. Java's default thread factory used in executors creates non-daemon threads that - * prevent JVM from shutting down. + * Creates daemon threads. Java's default thread factory used in executors creates non-daemon + * threads that prevent JVM from shutting down. */ public class DaemonThreadFactory implements ThreadFactory { private final String threadBaseName; private final AtomicInteger threadNumber = new AtomicInteger(1); + private ThreadFactory virtualFactory; public DaemonThreadFactory(String threadBaseName) { this.threadBaseName = threadBaseName; + + if (Runtime.version().feature() >= 24) { + try { + var lookup = MethodHandles.lookup(); + var builderClass = Class.forName("java.lang.Thread$Builder$OfVirtual"); + // public static Builder.OfVirtual ofVirtual() + var ofVirtualHandle = + lookup.findStatic(Thread.class, "ofVirtual", MethodType.methodType(builderClass)); + + // 2. public Thread.Builder name(String prefix, long start) + var nameHandle = + lookup.findVirtual( + builderClass, + "name", + MethodType.methodType(builderClass, String.class, long.class)); + + // 3. Invoke Thread.ofVirtual().name(threadBaseName, 0) + var namedBuilder = nameHandle.invoke(ofVirtualHandle.invoke(), threadBaseName, 0L); + + // 4. public ThreadFactory factory() + var factoryHandle = + lookup.findVirtual(builderClass, "factory", MethodType.methodType(ThreadFactory.class)); + + // 5. Invoke namedBuilder.factory() + this.virtualFactory = (ThreadFactory) factoryHandle.invoke(namedBuilder); + } catch (Throwable e) { + // impossible + } + } } @Override public Thread newThread(Runnable runnable) { + + if (virtualFactory != null) { + return virtualFactory.newThread(runnable); + } + Thread thread = new Thread(runnable, threadBaseName + "-" + threadNumber.getAndIncrement()); thread.setDaemon(true); return thread; diff --git a/core/src/main/java/tech/kwik/core/concurrent/VirtualExecutor.java b/core/src/main/java/tech/kwik/core/concurrent/VirtualExecutor.java new file mode 100644 index 00000000..43127ee5 --- /dev/null +++ b/core/src/main/java/tech/kwik/core/concurrent/VirtualExecutor.java @@ -0,0 +1,51 @@ +package tech.kwik.core.concurrent; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +/** + * Utility class to reflectively invoke the Executors.newThreadPerTaskExecutor static method using + * the Method Handles API. + */ +public class VirtualExecutor { + + private static final boolean SUPPORTED = Runtime.version().feature() >= 24; + + private static MethodHandle handle; + + static { + try { + handle = + MethodHandles.publicLookup() + .findStatic( + Executors.class, + "newThreadPerTaskExecutor", + MethodType.methodType(ExecutorService.class, ThreadFactory.class)); + } catch (Exception __) { + // failing is of no consequence + } + } + + /** Returns true if virtual threads are supported in this JVM */ + public static boolean supported() { + return SUPPORTED; + } + + /** + * Reflectively creates a virtual thread executor + * + * @param name the name of the threads + * @return A new ExecutorService instance backed by virtual threads. + */ + public static ExecutorService createExecutor(String name) { + try { + return (ExecutorService) handle.invoke(new DaemonThreadFactory(name)); + } catch (Throwable e) { + throw new UnsupportedOperationException("this jvm doesn't support virtual threads"); + } + } +} diff --git a/core/src/main/java/tech/kwik/core/server/impl/ServerConnectorImpl.java b/core/src/main/java/tech/kwik/core/server/impl/ServerConnectorImpl.java index 17f2a7b8..94dfff39 100644 --- a/core/src/main/java/tech/kwik/core/server/impl/ServerConnectorImpl.java +++ b/core/src/main/java/tech/kwik/core/server/impl/ServerConnectorImpl.java @@ -22,6 +22,7 @@ import tech.kwik.core.QuicConnection; import tech.kwik.core.QuicConstants; import tech.kwik.core.concurrent.DaemonThreadFactory; +import tech.kwik.core.concurrent.VirtualExecutor; import tech.kwik.core.crypto.Aead; import tech.kwik.core.crypto.ConnectionSecrets; import tech.kwik.core.crypto.MissingKeysException; @@ -146,8 +147,16 @@ private ServerConnectorImpl(DatagramSocket socket, TlsServerEngineFactory tlsEng receiver = new Receiver(serverSocket, log, exception -> System.exit(9)); int maxSharedExecutorThreads = 10; - sharedExecutor = new ThreadPoolExecutor(1, maxSharedExecutorThreads, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue(), new DaemonThreadFactory("server connector shared executor")); + sharedExecutor = + VirtualExecutor.supported() + ? VirtualExecutor.createExecutor("server connector shared executor") + : new ThreadPoolExecutor( + 1, + maxSharedExecutorThreads, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new DaemonThreadFactory("server connector shared executor")); sharedScheduledExecutor = Executors.newSingleThreadScheduledExecutor(); context = new ServerConnectorContext(); diff --git a/h09/src/main/java/tech/kwik/h09/client/Http09Client.java b/h09/src/main/java/tech/kwik/h09/client/Http09Client.java index 307bae06..eebff93c 100644 --- a/h09/src/main/java/tech/kwik/h09/client/Http09Client.java +++ b/h09/src/main/java/tech/kwik/h09/client/Http09Client.java @@ -22,6 +22,7 @@ import tech.kwik.core.QuicConnection; import tech.kwik.core.QuicStream; import tech.kwik.core.concurrent.DaemonThreadFactory; +import tech.kwik.core.concurrent.VirtualExecutor; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLParameters; @@ -58,8 +59,10 @@ public class Http09Client extends HttpClient { public Http09Client(QuicClientConnection quicConnection, boolean with0RTT) { this.quicConnection = quicConnection; this.with0RTT = with0RTT; - - executorService = Executors.newCachedThreadPool(new DaemonThreadFactory("http09")); + this.executorService = + VirtualExecutor.supported() + ? VirtualExecutor.createExecutor("http09") + : Executors.newCachedThreadPool(new DaemonThreadFactory("http09")); } @Override