Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,61 @@
*/
package tech.kwik.core.concurrent;


import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Creates daemon threads. Java's default thread factory used in executors creates non-daemon threads that
* prevent JVM from shutting down.
* Creates daemon threads. Java's default thread factory used in executors creates non-daemon
* threads that prevent JVM from shutting down.
*/
public class DaemonThreadFactory implements ThreadFactory {

private final String threadBaseName;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private ThreadFactory virtualFactory;

public DaemonThreadFactory(String threadBaseName) {
this.threadBaseName = threadBaseName;

if (Runtime.version().feature() >= 24) {
try {
var lookup = MethodHandles.lookup();
var builderClass = Class.forName("java.lang.Thread$Builder$OfVirtual");
// public static Builder.OfVirtual ofVirtual()
var ofVirtualHandle =
lookup.findStatic(Thread.class, "ofVirtual", MethodType.methodType(builderClass));

// 2. public Thread.Builder name(String prefix, long start)
var nameHandle =
lookup.findVirtual(
builderClass,
"name",
MethodType.methodType(builderClass, String.class, long.class));

// 3. Invoke Thread.ofVirtual().name(threadBaseName, 0)
var namedBuilder = nameHandle.invoke(ofVirtualHandle.invoke(), threadBaseName, 0L);

// 4. public ThreadFactory factory()
var factoryHandle =
lookup.findVirtual(builderClass, "factory", MethodType.methodType(ThreadFactory.class));

// 5. Invoke namedBuilder.factory()
this.virtualFactory = (ThreadFactory) factoryHandle.invoke(namedBuilder);
} catch (Throwable e) {
// impossible
}
}
}

@Override
public Thread newThread(Runnable runnable) {

if (virtualFactory != null) {
return virtualFactory.newThread(runnable);
}

Thread thread = new Thread(runnable, threadBaseName + "-" + threadNumber.getAndIncrement());
thread.setDaemon(true);
return thread;
Expand Down
51 changes: 51 additions & 0 deletions core/src/main/java/tech/kwik/core/concurrent/VirtualExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package tech.kwik.core.concurrent;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
* Utility class to reflectively invoke the Executors.newThreadPerTaskExecutor static method using
* the Method Handles API.
*/
public class VirtualExecutor {

private static final boolean SUPPORTED = Runtime.version().feature() >= 24;

private static MethodHandle handle;

static {
try {
handle =
MethodHandles.publicLookup()
.findStatic(
Executors.class,
"newThreadPerTaskExecutor",
MethodType.methodType(ExecutorService.class, ThreadFactory.class));
} catch (Exception __) {
// failing is of no consequence
}
}

/** Returns true if virtual threads are supported in this JVM */
public static boolean supported() {
return SUPPORTED;
}

/**
* Reflectively creates a virtual thread executor
*
* @param name the name of the threads
* @return A new ExecutorService instance backed by virtual threads.
*/
public static ExecutorService createExecutor(String name) {
try {
return (ExecutorService) handle.invoke(new DaemonThreadFactory(name));
} catch (Throwable e) {
throw new UnsupportedOperationException("this jvm doesn't support virtual threads");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import tech.kwik.core.QuicConnection;
import tech.kwik.core.QuicConstants;
import tech.kwik.core.concurrent.DaemonThreadFactory;
import tech.kwik.core.concurrent.VirtualExecutor;
import tech.kwik.core.crypto.Aead;
import tech.kwik.core.crypto.ConnectionSecrets;
import tech.kwik.core.crypto.MissingKeysException;
Expand Down Expand Up @@ -146,8 +147,16 @@ private ServerConnectorImpl(DatagramSocket socket, TlsServerEngineFactory tlsEng
receiver = new Receiver(serverSocket, log, exception -> System.exit(9));

int maxSharedExecutorThreads = 10;
sharedExecutor = new ThreadPoolExecutor(1, maxSharedExecutorThreads, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("server connector shared executor"));
sharedExecutor =
VirtualExecutor.supported()
? VirtualExecutor.createExecutor("server connector shared executor")
: new ThreadPoolExecutor(
1,
maxSharedExecutorThreads,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new DaemonThreadFactory("server connector shared executor"));
sharedScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
context = new ServerConnectorContext();

Expand Down
7 changes: 5 additions & 2 deletions h09/src/main/java/tech/kwik/h09/client/Http09Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import tech.kwik.core.QuicConnection;
import tech.kwik.core.QuicStream;
import tech.kwik.core.concurrent.DaemonThreadFactory;
import tech.kwik.core.concurrent.VirtualExecutor;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
Expand Down Expand Up @@ -58,8 +59,10 @@ public class Http09Client extends HttpClient {
public Http09Client(QuicClientConnection quicConnection, boolean with0RTT) {
this.quicConnection = quicConnection;
this.with0RTT = with0RTT;

executorService = Executors.newCachedThreadPool(new DaemonThreadFactory("http09"));
this.executorService =
VirtualExecutor.supported()
? VirtualExecutor.createExecutor("http09")
: Executors.newCachedThreadPool(new DaemonThreadFactory("http09"));
}

@Override
Expand Down