diff --git a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java index ecc9fb66ed..bd776eb69b 100644 --- a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java +++ b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java @@ -48,10 +48,10 @@ void singleServerLoadTest(DatabaseWrapper.Protocol protocol) throws Exception { // Parameters for the test final int numOfThreads = 5; //number of threads to use to insert users and photos - final int numOfUsers = 10000; // Each thread will create 200000 users - final int numOfPhotos = 10; // Each user will have 5 photos - final int numOfFriendship = 1000; // Each thread will create 100000 friendships - final int numOfLike = 1000; // Each thread will create 100000 likes + final int numOfUsers = 10000; // number of users per thread + final int numOfPhotos = 10; // number of phots per user + final int numOfFriendship = 1000; // total number of friendships edges to create + final int numOfLike = 1000; // total number of likes to photo edges to create int expectedUsersCount = numOfUsers * numOfThreads; int expectedPhotoCount = expectedUsersCount * numOfPhotos; diff --git a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java index 53ccc659eb..ea6da897fa 100644 --- a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java +++ b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java @@ -21,10 +21,15 @@ import com.arcadedb.test.support.ContainersTestTemplate; import com.arcadedb.test.support.DatabaseWrapper; import com.arcadedb.test.support.ServerWrapper; +import io.micrometer.core.instrument.Metrics; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -53,8 +58,9 @@ void singleServerLoadTest(DatabaseWrapper.Protocol protocol) throws Exception { int expectedUsersCount = numOfUsers * numOfThreads; int expectedPhotoCount = expectedUsersCount * numOfPhotos; - + LocalDateTime startedAt = LocalDateTime.now(); logger.info("Creating {} users using {} threads", expectedUsersCount, numOfThreads); + ExecutorService executor = Executors.newFixedThreadPool(numOfThreads); for (int i = 0; i < numOfThreads; i++) { // Each thread will create users and photos @@ -79,6 +85,14 @@ void singleServerLoadTest(DatabaseWrapper.Protocol protocol) throws Exception { } } + LocalDateTime finishedAt = LocalDateTime.now(); + logger.info("Finishing at {}", DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(finishedAt)); + logger.info("Total time: {} minutes", Duration.between(startedAt, finishedAt).toMinutes()); + + Metrics.globalRegistry.getMeters().forEach(meter -> { + logger.info("Meter: {} - {}", meter.getId().getName(), meter.measure()); + }); + db.assertThatUserCountIs(expectedUsersCount); db.assertThatPhotoCountIs(expectedPhotoCount); diff --git a/engine/src/main/java/com/arcadedb/GlobalConfiguration.java b/engine/src/main/java/com/arcadedb/GlobalConfiguration.java index 0eaf39555c..5f6d6447a5 100644 --- a/engine/src/main/java/com/arcadedb/GlobalConfiguration.java +++ b/engine/src/main/java/com/arcadedb/GlobalConfiguration.java @@ -272,6 +272,10 @@ public Object call(final Object value) { POLYGLOT_COMMAND_TIMEOUT("arcadedb.polyglotCommand.timeout", SCOPE.DATABASE, "Default timeout for polyglot commands (in ms)", Long.class, 10_000), + QUERY_ENGINES_USE_VIRTUAL_THREADS("arcadedb.queryEngines.useVirtualThreads", SCOPE.DATABASE, + "Use virtual threads for executing Java and Polyglot queries. Virtual threads provide better scalability for concurrent query execution.", + Boolean.class, true), + QUERY_MAX_HEAP_ELEMENTS_ALLOWED_PER_OP("arcadedb.queryMaxHeapElementsAllowedPerOp", SCOPE.DATABASE, """ Maximum number of elements (records) allowed in a single query for memory-intensive operations (eg. ORDER BY in heap). \ If exceeded, the query fails with an OCommandExecutionException. Negative number means no limit.\ @@ -375,6 +379,10 @@ This setting is intended as a safety measure against excessive resource consumpt "Timeout in seconds for a HTTP session (managing a transaction) to expire. This timeout is computed from the latest command against the session", Long.class, 5), // 5 SECONDS DEFAULT + SERVER_HTTP_USE_VIRTUAL_THREADS("arcadedb.server.httpUseVirtualThreads", SCOPE.SERVER, + "Use virtual threads for HTTP request handling. Virtual threads provide better scalability for concurrent HTTP connections (requires Java 21+).", + Boolean.class, true), + // SERVER WS SERVER_WS_EVENT_BUS_QUEUE_SIZE("arcadedb.server.eventBusQueueSize", SCOPE.SERVER, "Size of the queue used as a buffer for unserviced database change events.", Integer.class, 1000), @@ -435,6 +443,10 @@ This setting is intended as a safety measure against excessive resource consumpt HA_REPLICATION_INCOMING_PORTS("arcadedb.ha.replicationIncomingPorts", SCOPE.SERVER, "TCP/IP port number used for incoming replication connections", String.class, "2424-2433"), + HA_USE_VIRTUAL_THREADS("arcadedb.ha.useVirtualThreads", SCOPE.SERVER, + "Use virtual threads for HA replication network executors. Virtual threads provide better scalability for replica connections (requires Java 21+).", + Boolean.class, true), + // KUBERNETES HA_K8S("arcadedb.ha.k8s", SCOPE.SERVER, "The server is running inside Kubernetes", Boolean.class, false), @@ -452,6 +464,10 @@ This setting is intended as a safety measure against excessive resource consumpt POSTGRES_DEBUG("arcadedb.postgres.debug", SCOPE.SERVER, "Enables the printing of Postgres protocol to the console. Default is false", Boolean.class, false), + POSTGRES_USE_VIRTUAL_THREADS("arcadedb.postgres.useVirtualThreads", SCOPE.SERVER, + "Use virtual threads for PostgreSQL wire protocol connections. Virtual threads provide better scalability for concurrent connections (requires Java 21+).", + Boolean.class, true), + // REDIS REDIS_PORT("arcadedb.redis.port", SCOPE.SERVER, "TCP/IP port number used for incoming connections for Redis plugin. Default is 6379", Integer.class, 6379), @@ -459,6 +475,10 @@ This setting is intended as a safety measure against excessive resource consumpt REDIS_HOST("arcadedb.redis.host", SCOPE.SERVER, "TCP/IP host name used for incoming connections for Redis plugin. Default is '0.0.0.0'", String.class, "0.0.0.0"), + REDIS_USE_VIRTUAL_THREADS("arcadedb.redis.useVirtualThreads", SCOPE.SERVER, + "Use virtual threads for Redis wire protocol connections. Virtual threads provide better scalability for concurrent connections (requires Java 21+).", + Boolean.class, true), + // MONGO MONGO_PORT("arcadedb.mongo.port", SCOPE.SERVER, "TCP/IP port number used for incoming connections for Mongo plugin. Default is 27017", Integer.class, 27017), diff --git a/engine/src/main/java/com/arcadedb/query/java/JavaQueryEngine.java b/engine/src/main/java/com/arcadedb/query/java/JavaQueryEngine.java index 988a110d7a..ff50820794 100644 --- a/engine/src/main/java/com/arcadedb/query/java/JavaQueryEngine.java +++ b/engine/src/main/java/com/arcadedb/query/java/JavaQueryEngine.java @@ -37,8 +37,7 @@ public class JavaQueryEngine implements QueryEngine { public static final String ENGINE_NAME = "java"; private final long timeout; - private final ThreadPoolExecutor userCodeExecutor; - private final ArrayBlockingQueue userCodeExecutorQueue; + private final ExecutorService userCodeExecutor; private final Set registeredClasses = new HashSet<>(); private final Set registeredMethods = new HashSet<>(); @@ -94,15 +93,25 @@ public QueryEngine getInstance(final DatabaseInternal database) { } protected JavaQueryEngine(final DatabaseInternal database) { - this.userCodeExecutorQueue = new ArrayBlockingQueue<>(1_000); - this.userCodeExecutor = new ThreadPoolExecutor(8, 8, 30, TimeUnit.SECONDS, userCodeExecutorQueue, new ThreadPoolExecutor.CallerRunsPolicy()); this.timeout = database.getConfiguration().getValueAsLong(GlobalConfiguration.POLYGLOT_COMMAND_TIMEOUT); + this.userCodeExecutor = createExecutor(database); + } + + private static ExecutorService createExecutor(final DatabaseInternal database) { + final boolean useVirtualThreads = database.getConfiguration() + .getValueAsBoolean(GlobalConfiguration.QUERY_ENGINES_USE_VIRTUAL_THREADS); + + if (useVirtualThreads) { + return Executors.newVirtualThreadPerTaskExecutor(); + } else { + final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1_000); + return new ThreadPoolExecutor(8, 8, 30, TimeUnit.SECONDS, queue, new ThreadPoolExecutor.CallerRunsPolicy()); + } } @Override public void close() { userCodeExecutor.shutdown(); - userCodeExecutorQueue.clear(); } @Override diff --git a/engine/src/main/java/com/arcadedb/query/polyglot/PolyglotQueryEngine.java b/engine/src/main/java/com/arcadedb/query/polyglot/PolyglotQueryEngine.java index 6b695980df..88142ba1f0 100644 --- a/engine/src/main/java/com/arcadedb/query/polyglot/PolyglotQueryEngine.java +++ b/engine/src/main/java/com/arcadedb/query/polyglot/PolyglotQueryEngine.java @@ -36,13 +36,12 @@ import java.util.concurrent.*; public class PolyglotQueryEngine implements QueryEngine { - private GraalPolyglotEngine polyglotEngine; - private final String language; - private final long timeout; - private final DatabaseInternal database; - private List allowedPackages = null; - private final ExecutorService userCodeExecutor; - private final ArrayBlockingQueue userCodeExecutorQueue; + private GraalPolyglotEngine polyglotEngine; + private final String language; + private final long timeout; + private final DatabaseInternal database; + private List allowedPackages = null; + private final ExecutorService userCodeExecutor; private static final AnalyzedQuery ANALYZED_QUERY = new AnalyzedQuery() { @Override @@ -90,10 +89,20 @@ protected PolyglotQueryEngine(final DatabaseInternal database, final String lang this.allowedPackages = allowedPackages; this.polyglotEngine = GraalPolyglotEngine.newBuilder(database, Engine.create()).setLanguage(language) .setAllowedPackages(allowedPackages).build(); - this.userCodeExecutorQueue = new ArrayBlockingQueue<>(10000); - this.userCodeExecutor = new ThreadPoolExecutor(8, 8, 30, TimeUnit.SECONDS, userCodeExecutorQueue, - new ThreadPoolExecutor.CallerRunsPolicy()); this.timeout = database.getConfiguration().getValueAsLong(GlobalConfiguration.POLYGLOT_COMMAND_TIMEOUT); + this.userCodeExecutor = createExecutor(database); + } + + private static ExecutorService createExecutor(final DatabaseInternal database) { + final boolean useVirtualThreads = database.getConfiguration() + .getValueAsBoolean(GlobalConfiguration.QUERY_ENGINES_USE_VIRTUAL_THREADS); + + if (useVirtualThreads) { + return Executors.newVirtualThreadPerTaskExecutor(); + } else { + final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10_000); + return new ThreadPoolExecutor(8, 8, 30, TimeUnit.SECONDS, queue, new ThreadPoolExecutor.CallerRunsPolicy()); + } } @Override @@ -215,7 +224,6 @@ public ResultSet query(final String query, ContextConfiguration configuration, f @Override public void close() { userCodeExecutor.shutdown(); - userCodeExecutorQueue.clear(); polyglotEngine.close(); } diff --git a/engine/src/test/java/com/arcadedb/query/java/JavaQueryEngineVirtualThreadTest.java b/engine/src/test/java/com/arcadedb/query/java/JavaQueryEngineVirtualThreadTest.java new file mode 100644 index 0000000000..baeb667b61 --- /dev/null +++ b/engine/src/test/java/com/arcadedb/query/java/JavaQueryEngineVirtualThreadTest.java @@ -0,0 +1,117 @@ +/* + * Copyright © 2021-present Arcade Data Ltd (info@arcadedata.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-FileCopyrightText: 2021-present Arcade Data Ltd (info@arcadedata.com) + * SPDX-License-Identifier: Apache-2.0 + */ +package com.arcadedb.query.java; + +import com.arcadedb.GlobalConfiguration; +import com.arcadedb.TestHelper; +import com.arcadedb.query.sql.executor.ResultSet; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.*; + +import static org.assertj.core.api.Assertions.*; + +/** + * Test for Java Query Engine with Virtual Threads support. + * Tests both virtual thread and traditional platform thread execution. + */ +public class JavaQueryEngineVirtualThreadTest extends TestHelper { + + @Test + public void testVirtualThreadsEnabled() { + database.getConfiguration().setValue(GlobalConfiguration.QUERY_ENGINES_USE_VIRTUAL_THREADS, true); + + database.getQueryEngine("java").registerFunctions(JavaMethods.class.getName() + "::sum"); + + final ResultSet result = database.command("java", "com.arcadedb.query.java.JavaMethods::sum", 5, 3); + assertThat(result.hasNext()).isTrue(); + assertThat((Integer) result.next().getProperty("value")).isEqualTo(8); + } + + @Test + public void testPlatformThreadsEnabled() { + database.getConfiguration().setValue(GlobalConfiguration.QUERY_ENGINES_USE_VIRTUAL_THREADS, false); + + database.getQueryEngine("java").registerFunctions(JavaMethods.class.getName() + "::sum"); + + final ResultSet result = database.command("java", "com.arcadedb.query.java.JavaMethods::sum", 5, 3); + assertThat(result.hasNext()).isTrue(); + assertThat((Integer) result.next().getProperty("value")).isEqualTo(8); + } + + @Test + public void testConcurrentExecutionWithVirtualThreads() throws InterruptedException { + database.getConfiguration().setValue(GlobalConfiguration.QUERY_ENGINES_USE_VIRTUAL_THREADS, true); + + database.getQueryEngine("java").registerFunctions(JavaMethods.class.getName() + "::sum"); + + // Execute multiple concurrent queries + final CountDownLatch latch = new CountDownLatch(10); + final ExecutorService executor = Executors.newFixedThreadPool(5); + + for (int i = 0; i < 10; i++) { + final int index = i; + executor.submit(() -> { + try { + final ResultSet result = database.command("java", "com.arcadedb.query.java.JavaMethods::sum", index, index * 2); + assertThat(result.hasNext()).isTrue(); + assertThat((Integer) result.next().getProperty("value")).isEqualTo(index + index * 2); + } finally { + latch.countDown(); + } + }); + } + + try { + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } finally { + executor.shutdown(); + } + } + + @Test + public void testConcurrentExecutionWithPlatformThreads() throws InterruptedException { + database.getConfiguration().setValue(GlobalConfiguration.QUERY_ENGINES_USE_VIRTUAL_THREADS, false); + + database.getQueryEngine("java").registerFunctions(JavaMethods.class.getName() + "::sum"); + + // Execute multiple concurrent queries + final CountDownLatch latch = new CountDownLatch(10); + final ExecutorService executor = Executors.newFixedThreadPool(5); + + for (int i = 0; i < 10; i++) { + final int index = i; + executor.submit(() -> { + try { + final ResultSet result = database.command("java", "com.arcadedb.query.java.JavaMethods::sum", index, index * 2); + assertThat(result.hasNext()).isTrue(); + assertThat((Integer) result.next().getProperty("value")).isEqualTo(index + index * 2); + } finally { + latch.countDown(); + } + }); + } + + try { + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } finally { + executor.shutdown(); + } + } +} diff --git a/grpcw/src/main/java/com/arcadedb/server/grpc/GrpcServerPlugin.java b/grpcw/src/main/java/com/arcadedb/server/grpc/GrpcServerPlugin.java index 2adee720bc..eb96be1ff7 100644 --- a/grpcw/src/main/java/com/arcadedb/server/grpc/GrpcServerPlugin.java +++ b/grpcw/src/main/java/com/arcadedb/server/grpc/GrpcServerPlugin.java @@ -39,6 +39,8 @@ import java.io.File; import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -141,7 +143,9 @@ private void startStandardServer(ContextConfiguration config) throws IOException grpcServer = serverBuilder .maxInboundMessageSize(256 * 1024 * 1024) .maxInboundMetadataSize(32 * 1024 * 1024) - .build().start(); + .executor(Executors.newVirtualThreadPerTaskExecutor()) + .build() + .start(); // Build status message StringBuilder status = new StringBuilder(); @@ -180,7 +184,9 @@ private void startXdsServer(ContextConfiguration config) throws IOException { xdsServer = xdsBuilder .maxInboundMessageSize(256 * 1024 * 1024) .maxInboundMetadataSize(32 * 1024 * 1024) - .build().start(); + .executor(Executors.newVirtualThreadPerTaskExecutor()) + .build() + .start(); LogManager.instance().log(this, Level.INFO, "gRPC XDS server started on port %s (xDS management enabled)", port); } diff --git a/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkListener.java b/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkListener.java index a4c81ceff0..6be52dc785 100755 --- a/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkListener.java +++ b/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkListener.java @@ -69,7 +69,13 @@ public void run() { // CREATE A NEW PROTOCOL INSTANCE // TODO: OPEN A DATABASE final PostgresNetworkExecutor connection = new PostgresNetworkExecutor(server, socket, null); - connection.start(); + + // Use virtual threads if configured, otherwise use platform threads + if (server.getConfiguration().getValueAsBoolean(com.arcadedb.GlobalConfiguration.POSTGRES_USE_VIRTUAL_THREADS)) { + Thread.startVirtualThread(connection); + } else { + connection.start(); + } } catch (final Exception e) { if (active) diff --git a/postgresw/src/test/java/com/arcadedb/postgres/PostgresVirtualThreadTest.java b/postgresw/src/test/java/com/arcadedb/postgres/PostgresVirtualThreadTest.java new file mode 100644 index 0000000000..a0ba4a06a9 --- /dev/null +++ b/postgresw/src/test/java/com/arcadedb/postgres/PostgresVirtualThreadTest.java @@ -0,0 +1,53 @@ +/* + * Copyright © 2021-present Arcade Data Ltd (info@arcadedata.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-FileCopyrightText: 2021-present Arcade Data Ltd (info@arcadedata.com) + * SPDX-License-Identifier: Apache-2.0 + */ +package com.arcadedb.postgres; + +import com.arcadedb.GlobalConfiguration; +import com.arcadedb.server.ArcadeDBServer; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for PostgreSQL wire protocol with virtual threads support. + */ +public class PostgresVirtualThreadTest { + + @Test + public void testVirtualThreadsConfigurationExists() { + // Verify that the configuration flag exists + assertThat(GlobalConfiguration.POSTGRES_USE_VIRTUAL_THREADS).isNotNull(); + assertThat(GlobalConfiguration.POSTGRES_USE_VIRTUAL_THREADS.getKey()) + .isEqualTo("arcadedb.postgres.useVirtualThreads"); + } + + @Test + public void testVirtualThreadsCanBeDisabled() { + // Verify the type is Boolean + assertThat(GlobalConfiguration.POSTGRES_USE_VIRTUAL_THREADS.getType()) + .isEqualTo(Boolean.class); + } + + @Test + public void testConfigurationScope() { + // Verify that configuration is server-scoped + assertThat(GlobalConfiguration.POSTGRES_USE_VIRTUAL_THREADS.getScope()) + .isEqualTo(GlobalConfiguration.SCOPE.SERVER); + } +} diff --git a/redisw/src/main/java/com/arcadedb/redis/RedisNetworkListener.java b/redisw/src/main/java/com/arcadedb/redis/RedisNetworkListener.java index 5ca15d29df..abe0f08c7d 100755 --- a/redisw/src/main/java/com/arcadedb/redis/RedisNetworkListener.java +++ b/redisw/src/main/java/com/arcadedb/redis/RedisNetworkListener.java @@ -62,7 +62,13 @@ public void run() { // CREATE A NEW PROTOCOL INSTANCE final RedisNetworkExecutor connection = new RedisNetworkExecutor(server, socket); - connection.start(); + + // Use virtual threads if configured, otherwise use platform threads + if (server.getConfiguration().getValueAsBoolean(com.arcadedb.GlobalConfiguration.REDIS_USE_VIRTUAL_THREADS)) { + Thread.startVirtualThread(connection); + } else { + connection.start(); + } if (callback != null) callback.connected(); diff --git a/redisw/src/test/java/com/arcadedb/redis/RedisVirtualThreadTest.java b/redisw/src/test/java/com/arcadedb/redis/RedisVirtualThreadTest.java new file mode 100644 index 0000000000..0210f64129 --- /dev/null +++ b/redisw/src/test/java/com/arcadedb/redis/RedisVirtualThreadTest.java @@ -0,0 +1,52 @@ +/* + * Copyright © 2021-present Arcade Data Ltd (info@arcadedata.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-FileCopyrightText: 2021-present Arcade Data Ltd (info@arcadedata.com) + * SPDX-License-Identifier: Apache-2.0 + */ +package com.arcadedb.redis; + +import com.arcadedb.GlobalConfiguration; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for Redis wire protocol with virtual threads support. + */ +public class RedisVirtualThreadTest { + + @Test + public void testVirtualThreadsConfigurationExists() { + // Verify that the configuration flag exists + assertThat(GlobalConfiguration.REDIS_USE_VIRTUAL_THREADS).isNotNull(); + assertThat(GlobalConfiguration.REDIS_USE_VIRTUAL_THREADS.getKey()) + .isEqualTo("arcadedb.redis.useVirtualThreads"); + } + + @Test + public void testVirtualThreadsCanBeDisabled() { + // Verify the type is Boolean + assertThat(GlobalConfiguration.REDIS_USE_VIRTUAL_THREADS.getType()) + .isEqualTo(Boolean.class); + } + + @Test + public void testConfigurationScope() { + // Verify that configuration is server-scoped + assertThat(GlobalConfiguration.REDIS_USE_VIRTUAL_THREADS.getScope()) + .isEqualTo(GlobalConfiguration.SCOPE.SERVER); + } +} diff --git a/server/src/main/java/com/arcadedb/server/ha/HAServer.java b/server/src/main/java/com/arcadedb/server/ha/HAServer.java index 8fc7f6e565..7ee96128e0 100644 --- a/server/src/main/java/com/arcadedb/server/ha/HAServer.java +++ b/server/src/main/java/com/arcadedb/server/ha/HAServer.java @@ -1004,7 +1004,12 @@ private void connectToLeader(final String host, final int port) { leaderConnection.get().startup(); // START SEPARATE THREAD TO EXECUTE LEADER'S REQUESTS - leaderConnection.get().start(); + final Replica2LeaderNetworkExecutor replicaExecutor = leaderConnection.get(); + if (configuration.getValueAsBoolean(GlobalConfiguration.HA_USE_VIRTUAL_THREADS)) { + Thread.startVirtualThread(replicaExecutor); + } else { + replicaExecutor.start(); + } } protected ChannelBinaryClient createNetworkConnection(final String host, final int port, final short commandId) diff --git a/server/src/main/java/com/arcadedb/server/ha/Leader2ReplicaNetworkExecutor.java b/server/src/main/java/com/arcadedb/server/ha/Leader2ReplicaNetworkExecutor.java index 7b3e6a2ae6..ba30f6db26 100755 --- a/server/src/main/java/com/arcadedb/server/ha/Leader2ReplicaNetworkExecutor.java +++ b/server/src/main/java/com/arcadedb/server/ha/Leader2ReplicaNetworkExecutor.java @@ -155,7 +155,10 @@ public void mergeFrom(final Leader2ReplicaNetworkExecutor previousConnection) { public void run() { LogManager.instance().setContext(server.getServerName()); - senderThread = new Thread(new Runnable() { + final boolean useVirtualThreads = server.getServer().getConfiguration() + .getValueAsBoolean(com.arcadedb.GlobalConfiguration.HA_USE_VIRTUAL_THREADS); + + final Runnable senderRunnable = new Runnable() { @Override public void run() { LogManager.instance().setContext(server.getServerName()); @@ -205,11 +208,18 @@ public void run() { senderQueue.size()); } - }); - senderThread.start(); + }; + + // Use virtual threads if configured, otherwise use platform threads + if (useVirtualThreads) { + senderThread = Thread.startVirtualThread(senderRunnable); + } else { + senderThread = new Thread(senderRunnable); + senderThread.start(); + } senderThread.setName(server.getServer().getServerName() + " leader2replica-sender->" + remoteServerName); - forwarderThread = new Thread(new Runnable() { + final Runnable forwarderRunnable = new Runnable() { @Override public void run() { LogManager.instance().setContext(server.getServerName()); @@ -245,8 +255,15 @@ public void run() { .log(this, Level.FINE, "Replication thread to remote server '%s' is off (buffered=%d)", remoteServerName, forwarderQueue.size()); } - }); - forwarderThread.start(); + }; + + // Use virtual threads if configured, otherwise use platform threads + if (useVirtualThreads) { + forwarderThread = Thread.startVirtualThread(forwarderRunnable); + } else { + forwarderThread = new Thread(forwarderRunnable); + forwarderThread.start(); + } forwarderThread.setName(server.getServer().getServerName() + " leader-forwarder"); // REUSE THE SAME BUFFER TO AVOID MALLOC diff --git a/server/src/main/java/com/arcadedb/server/ha/LeaderNetworkListener.java b/server/src/main/java/com/arcadedb/server/ha/LeaderNetworkListener.java index af9c6b3733..10b93d90e4 100755 --- a/server/src/main/java/com/arcadedb/server/ha/LeaderNetworkListener.java +++ b/server/src/main/java/com/arcadedb/server/ha/LeaderNetworkListener.java @@ -278,7 +278,12 @@ private void connect(final ChannelBinaryServer channel, final String remoteServe ha.registerIncomingConnection(connection.getRemoteServerName(), connection); - connection.start(); + // Use virtual threads if configured, otherwise use platform threads + if (ha.getServer().getConfiguration().getValueAsBoolean(com.arcadedb.GlobalConfiguration.HA_USE_VIRTUAL_THREADS)) { + Thread.startVirtualThread(connection); + } else { + connection.start(); + } } private void readClusterName(final Socket socket, final ChannelBinaryServer channel) throws IOException { diff --git a/server/src/main/java/com/arcadedb/server/http/HttpServer.java b/server/src/main/java/com/arcadedb/server/http/HttpServer.java index 4be9c0baf5..531ef5e9d5 100644 --- a/server/src/main/java/com/arcadedb/server/http/HttpServer.java +++ b/server/src/main/java/com/arcadedb/server/http/HttpServer.java @@ -50,6 +50,8 @@ import io.undertow.server.RoutingHandler; import io.undertow.server.handlers.PathHandler; import org.xnio.Options; +import org.xnio.Xnio; +import org.xnio.XnioWorker; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; @@ -61,6 +63,7 @@ import java.security.*; import java.security.cert.*; import java.util.*; +import java.util.concurrent.*; import java.util.logging.*; import static com.arcadedb.GlobalConfiguration.NETWORK_SSL_KEYSTORE; @@ -78,6 +81,7 @@ public class HttpServer implements ServerPlugin { private Undertow undertow; private volatile String listeningAddress; private int httpPortListening; + private XnioWorker xnioWorker; // Keep reference to shut down properly public HttpServer(final ArcadeDBServer server) { this.server = server; @@ -98,6 +102,16 @@ public void stopService() { } } + // Shutdown custom XNIO worker if created + if (xnioWorker != null) { + try { + xnioWorker.shutdown(); + xnioWorker.awaitTermination(10, TimeUnit.SECONDS); + } catch (final Exception e) { + LogManager.instance().log(this, Level.WARNING, "Error shutting down XNIO worker", e); + } + } + sessionManager.close(); } @@ -182,9 +196,35 @@ private Undertow buildUndertowServer(final ContextConfiguration configuration, f .setHandler(routes)// .setSocketOption(Options.READ_TIMEOUT, configuration.getValueAsInteger(GlobalConfiguration.NETWORK_SOCKET_TIMEOUT)) .setIoThreads(configuration.getValueAsInteger(GlobalConfiguration.SERVER_HTTP_IO_THREADS))// - .setWorkerThreads(500)// .setServerOption(SHUTDOWN_TIMEOUT, 5000); + // Configure worker threads based on virtual thread setting + if (configuration.getValueAsBoolean(GlobalConfiguration.SERVER_HTTP_USE_VIRTUAL_THREADS)) { + LogManager.instance().log(this, Level.INFO, "- Configuring HTTP Server with Java Virtual Threads"); + + // Create XNIO worker with virtual thread executor + final Xnio xnio = Xnio.getInstance(); + final XnioWorker.Builder workerBuilder = xnio.createWorkerBuilder(); + + // Set external executor service to use virtual threads + workerBuilder.setExternalExecutorService(Executors.newVirtualThreadPerTaskExecutor()); + + // Configure worker pool size (can be small since we're using virtual threads) + workerBuilder.setCoreWorkerPoolSize(1); + workerBuilder.setMaxWorkerPoolSize(1); + + // Set IO threads + workerBuilder.setWorkerIoThreads(configuration.getValueAsInteger(GlobalConfiguration.SERVER_HTTP_IO_THREADS)); + workerBuilder.setWorkerName("arcade-http-virtual"); + workerBuilder.setDaemon(true); + + xnioWorker = workerBuilder.build(); + builder.setWorker(xnioWorker); + } else { + // Use traditional platform thread pool + builder.setWorkerThreads(500); + } + if (configuration.getValueAsBoolean(GlobalConfiguration.NETWORK_USE_SSL)) { final SSLContext sslContext = createSSLContext(); builder.addHttpsListener(httpsPortListening, host, sslContext) diff --git a/server/src/test/java/com/arcadedb/server/DeleteTest.java b/server/src/test/java/com/arcadedb/server/DeleteTest.java new file mode 100644 index 0000000000..8261bc9454 --- /dev/null +++ b/server/src/test/java/com/arcadedb/server/DeleteTest.java @@ -0,0 +1,307 @@ +package com.arcadedb.server; + +import com.arcadedb.ContextConfiguration; +import com.arcadedb.GlobalConfiguration; +import com.arcadedb.engine.ComponentFile.MODE; +import com.arcadedb.query.sql.executor.ResultSet; +import com.arcadedb.remote.RemoteDatabase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class DeleteTest { + private static final String DB_NAME = "testDB"; + private static final String ROOT_PASSWORD = "playwithdata"; + private static final String ROOT_DIR = "arcade_db_delete_test"; + private static final List TMP_DIRS = List.of("backups", "log", "databases", "config"); + private static ArcadeDBServer SERVER = null; + private static RemoteDatabase DATABASE = null; + private static Path ROOT_PATH = null; + + // Schema + private static final String VERTEX_DUCT = "duct"; + private static final String EDGE_HIERARCHY_DUCT_DUCT = "hierarchy_duct_duct"; + private static final String PROPERTY_INTERNAL_ID = "internal_id"; + private static final String PROPERTY_INTERNAL_FROM = "internal_from"; + private static final String PROPERTY_INTERNAL_TO = "internal_to"; + private static final String PROPERTY_SWAP = "swap"; + private static final String PROPERTY_ORDER_NUMBER = "order_number"; + private static final String PROPERTY_RID = "@rid"; + + // Use case params + private static final String DUCT_ELID = "duct_elid"; + private static final String SUB_DUCT_ELID_1 = "sub_duct_elid_1"; + private static final String SUB_DUCT_ELID_2 = "sub_duct_elid_2"; + private static final String SUB_DUCT_ELID_3 = "sub_duct_elid_3"; + private static final String SUB_DUCT_ELID_4 = "sub_duct_elid_4"; + private static final String SUB_DUCT_ELID_5 = "sub_duct_elid_5"; + private static final String SUB_DUCT_ELID_6 = "sub_duct_elid_6"; + private static final String OBJECT_OWNER = "owner"; + private static final String SWAP = "N"; + private static final String ORDER_NUMBER = "1"; + private static final List ALL_DUCT_ELIDS = List.of(DUCT_ELID, SUB_DUCT_ELID_1, SUB_DUCT_ELID_2, SUB_DUCT_ELID_3, SUB_DUCT_ELID_4, SUB_DUCT_ELID_5, SUB_DUCT_ELID_6); + private static final List SUB_DUCT_ELIDS = List.of(SUB_DUCT_ELID_1, SUB_DUCT_ELID_2, SUB_DUCT_ELID_3, SUB_DUCT_ELID_4, SUB_DUCT_ELID_5, SUB_DUCT_ELID_6); + + public static void main(String[] args) { + try { + new Thread(() -> { + prepareTestDirectory(); + startArcadeDBServer(); + initRemoteDatabase(); + setupDatabaseSchema(); + insertInitialData(); + executeTestCase(); + }).start(); + Thread.currentThread().join(); + } catch (Exception e) { + System.err.print("\nERROR: " + e.getMessage()); + } finally { + closeDatabase(); + stopServer(); + } + } + + private static void setupDatabaseSchema() { + System.out.print("\nSetting up database schema."); + createVertexType(VERTEX_DUCT); + createHierarchyEdgeType(EDGE_HIERARCHY_DUCT_DUCT); + } + + private static void insertInitialData() { + System.out.print("\nInserting initial data."); + /** + * duct + * ||..| + * sub_duct_1..6 + */ + insertTestCaseVertices(); + insertTestCaseEdges(); + } + + private static void executeTestCase() { + System.out.print("\nExecuting test case."); + final String where = "((internal_from = ?) AND (internal_to = ?) AND (swap = ?) AND (order_number = ?)) OR " + + "((internal_from = ?) AND (internal_to = ?) AND (swap = ?) AND (order_number = ?)) OR " + + "((internal_from = ?) AND (internal_to = ?) AND (swap = ?) AND (order_number = ?)) OR " + + "((internal_from = ?) AND (internal_to = ?) AND (swap = ?) AND (order_number = ?)) OR " + + "((internal_from = ?) AND (internal_to = ?) AND (swap = ?) AND (order_number = ?)) OR " + + "((internal_from = ?) AND (internal_to = ?) AND (swap = ?) AND (order_number = ?))"; + deleteEdge(EDGE_HIERARCHY_DUCT_DUCT, where, + List.of(DUCT_ELID, SUB_DUCT_ELID_1, SWAP, ORDER_NUMBER, + DUCT_ELID, SUB_DUCT_ELID_2, SWAP, ORDER_NUMBER, + DUCT_ELID, SUB_DUCT_ELID_3, SWAP, ORDER_NUMBER, + DUCT_ELID, SUB_DUCT_ELID_4, SWAP, ORDER_NUMBER, + DUCT_ELID, SUB_DUCT_ELID_5, SWAP, ORDER_NUMBER, + DUCT_ELID, SUB_DUCT_ELID_6, SWAP, ORDER_NUMBER)); + final Map allDuctIdMap = queryIdMap(VERTEX_DUCT, ALL_DUCT_ELIDS); + createHierarchyEdge(SUB_DUCT_ELIDS.stream().map(subDuctElid -> new CreateHierarchyEdgeRecord(EDGE_HIERARCHY_DUCT_DUCT, allDuctIdMap.get(DUCT_ELID), allDuctIdMap.get(subDuctElid), DUCT_ELID, subDuctElid, ORDER_NUMBER)).toList()); + } + + private static void insertTestCaseVertices() { + createVertex(ALL_DUCT_ELIDS.stream().map(elid -> new CreateVertexRecord(VERTEX_DUCT, elid)).toList()); + } + + private static void insertTestCaseEdges() { + // query vertex's rid(s) + final Map ductIdMap = queryIdMap(VERTEX_DUCT, ALL_DUCT_ELIDS); + // duct <-..> sub_duct(s) + final String ductRid = ductIdMap.get(DUCT_ELID); + final List records = new ArrayList<>(); + SUB_DUCT_ELIDS.forEach(subDuctElid -> records.add(new CreateHierarchyEdgeRecord(EDGE_HIERARCHY_DUCT_DUCT, ductRid, ductIdMap.get(subDuctElid), DUCT_ELID, subDuctElid, ORDER_NUMBER))); + createHierarchyEdge(records); + } + + private static Map queryIdMap(final String label, final Collection internalIds) { + final List properties = List.of(PROPERTY_RID, PROPERTY_INTERNAL_ID); + final List arguments = new ArrayList<>(); + final String placeholders = internalIds.stream() // + .map(attr -> { + arguments.add(attr); + return "?"; + }).collect(Collectors.joining(", ")); + final String where = "%s in [%s]".formatted(PROPERTY_INTERNAL_ID, placeholders); + return command("sql", "select %s from %s where %s".formatted(String.join(", ", properties), label, where), arguments) + .stream() // + .map(result -> Map.entry((String) result.getProperty(PROPERTY_INTERNAL_ID), (String) result.getProperty(PROPERTY_RID))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private static void createVertex(final List records) { + final List insertRecords = records.stream().map(record -> new InsertVertexRecord(record.vertex, + Map.of(PROPERTY_INTERNAL_ID, record.internal_id))).toList(); + insertVertex(insertRecords); + } + + private record CreateVertexRecord(String vertex, String internal_id) {} + + private static void createHierarchyEdge(final List records) { + final List createEdgeRecords = records.stream().map(record -> new CreateEdgeRecord(record.edge, record.fromRid, record.toRid, + Map.of(PROPERTY_INTERNAL_FROM, record.internal_from, PROPERTY_INTERNAL_TO, record.internal_to, PROPERTY_SWAP, SWAP, PROPERTY_ORDER_NUMBER, record.orderNumber))).toList(); + createEdge(createEdgeRecords); + } + + private record CreateHierarchyEdgeRecord(String edge, String fromRid, String toRid, String internal_from, String internal_to, String orderNumber) {} + + private static void insertVertex(final List records) { + final List arguments = new ArrayList<>(); + final List commands = records.stream().map(record -> { + arguments.addAll(record.properties.values()); + final String valuesPlaceholder = record.properties.values().stream().map(attr -> "?").collect(Collectors.joining(", ")); + return "insert into %s (%s) values (%s)".formatted(record.into, String.join(", ", record.properties.keySet()), valuesPlaceholder); + }).toList(); + command("sqlscript", String.join("; ", commands), arguments); + } + + private record InsertVertexRecord(String into, Map properties) {} + + private static void createEdge(final List records) { + final List arguments = new ArrayList<>(); + final List commands = records.stream().map(record -> { + arguments.add(record.fromRid); + arguments.add(record.toRid); + final List placeholders = record.properties.entrySet() // + .stream() // + .map(entry -> { + arguments.add(entry.getValue()); + return "%s = ?".formatted(entry.getKey()); + }).toList(); + return "create edge %s from ? to ? set %s".formatted(record.edge, String.join(", ", placeholders)); + }).toList(); + command("sqlscript", String.join("; ", commands), arguments); + } + + private record CreateEdgeRecord(String edge, String fromRid, String toRid, Map properties) {} + + private static ResultSet deleteEdge(final String edge, final String where, final List params) { + return command("sql", "delete from %s where %s".formatted(edge, where), params); + } + + private static void createVertexType(final String label) { + command("sql", "create vertex type %s if not exists".formatted(label)); + command("sql", "alter type %s BucketSelectionStrategy `thread`".formatted(label)); + createIndexedStringProperty(label, PROPERTY_INTERNAL_ID, true); + } + + private static void createHierarchyEdgeType(final String label) { + createEdgeType(label); + createStringProperty(label, PROPERTY_SWAP); + createStringProperty(label, PROPERTY_ORDER_NUMBER); + createIndexedStringProperty(label, PROPERTY_INTERNAL_FROM, false); + createIndexedStringProperty(label, PROPERTY_INTERNAL_TO, false); + createIndex(label, List.of(PROPERTY_INTERNAL_FROM, PROPERTY_INTERNAL_TO, PROPERTY_SWAP, PROPERTY_ORDER_NUMBER), true); + } + + private static void createEdgeType(final String label) { + command("sql", "create edge type %s if not exists".formatted(label)); + command("sql", "alter type %s BucketSelectionStrategy `thread`".formatted(label)); + } + + private static void createStringProperty(final String label, final String property) { + createProperty(label, property, "STRING"); + } + + private static void createIndexedStringProperty(final String label, final String property, final boolean unique) { + createStringProperty(label, property); + createIndex(label, Collections.singletonList(property), unique); + } + + private static void createProperty(final String label, final String property, final String type) { + command("sql", "create property " + label + "." + property + " " + type); + } + + private static void createIndex(final String label, final List properties, final boolean unique) { + command("sql", "create index on %s (%s) %s".formatted(label, String.join(",", properties), unique ? "UNIQUE" : "NOTUNIQUE")); + } + + private static ResultSet command(final String language, final String command) { + System.out.print("\nCommand: " + command); + return DATABASE.command(language, command); + } + + private static ResultSet command(final String language, final String command, Collection parameters) { + System.out.print("\nCommand: " + command + "; Parameters: " + parameters); + return DATABASE.command(language, command, parameters.toArray()); + } + + private static void closeDatabase() { + if (DATABASE != null && DATABASE.isOpen()) { + DATABASE.close(); + } + DATABASE = null; + } + + private static void stopServer() { + if (SERVER != null && SERVER.isStarted()) { + SERVER.stop(); + } + SERVER = null; + } + + private static void prepareTestDirectory() { + System.out.print("\nPreparing test directory."); + ROOT_PATH = Paths.get(ROOT_DIR); + clearTestDirectory(ROOT_PATH); + createDirectoryIfNotExists(ROOT_PATH); + createTmpDirectories(ROOT_PATH); + } + + private static void createTmpDirectories(final Path rootPath) { + TMP_DIRS.stream().map(rootPath::resolve).forEach(DeleteTest::createDirectoryIfNotExists); + } + + private static void createDirectoryIfNotExists(final Path dir) { + if (Files.exists(dir)) { + return; + } + try { + Files.createDirectory(dir); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static void startArcadeDBServer() { + System.out.print("\nStarting ArcadeDB server."); + final ContextConfiguration config = new ContextConfiguration( + Map.of(GlobalConfiguration.SERVER_ROOT_PASSWORD.getKey(), ROOT_PASSWORD, GlobalConfiguration.SERVER_ROOT_PATH.getKey(), ROOT_PATH.toAbsolutePath().toString())); + SERVER = new ArcadeDBServer(config); + SERVER.start(); + SERVER.createDatabase(DB_NAME, MODE.READ_WRITE); + } + + private static void initRemoteDatabase() { + System.out.print("\nInitializing remote database."); + DATABASE = new RemoteDatabase("localhost", 2480, DB_NAME, "root", ROOT_PASSWORD); + } + + private static void clearTestDirectory(final Path rootPath) { + TMP_DIRS.stream() // + .map(rootPath::resolve) // + .flatMap(path -> { + try { + return Files.walk(path).sorted(Comparator.reverseOrder()); + } catch (IOException e) { + System.err.println("Failed to enter: " + e.getMessage()); + return null; + } + }) // + .filter(Objects::nonNull) // + .forEach(path -> { + try { + Files.delete(path); + } catch (IOException e) { + System.err.println("Failed to delete: " + e.getMessage()); + } + }); + } +} diff --git a/server/src/test/java/com/arcadedb/server/ha/HAVirtualThreadTest.java b/server/src/test/java/com/arcadedb/server/ha/HAVirtualThreadTest.java new file mode 100644 index 0000000000..708519cc3f --- /dev/null +++ b/server/src/test/java/com/arcadedb/server/ha/HAVirtualThreadTest.java @@ -0,0 +1,59 @@ +/* + * Copyright © 2021-present Arcade Data Ltd (info@arcadedata.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-FileCopyrightText: 2021-present Arcade Data Ltd (info@arcadedata.com) + * SPDX-License-Identifier: Apache-2.0 + */ +package com.arcadedb.server.ha; + +import com.arcadedb.GlobalConfiguration; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for HA replication with virtual threads support. + */ +public class HAVirtualThreadTest { + + @Test + public void testVirtualThreadsConfigurationExists() { + // Verify that the configuration flag exists + assertThat(GlobalConfiguration.HA_USE_VIRTUAL_THREADS).isNotNull(); + assertThat(GlobalConfiguration.HA_USE_VIRTUAL_THREADS.getKey()) + .isEqualTo("arcadedb.ha.useVirtualThreads"); + } + + @Test + public void testVirtualThreadsCanBeDisabled() { + // Verify the type is Boolean + assertThat(GlobalConfiguration.HA_USE_VIRTUAL_THREADS.getType()) + .isEqualTo(Boolean.class); + } + + @Test + public void testConfigurationScope() { + // Verify that configuration is server-scoped + assertThat(GlobalConfiguration.HA_USE_VIRTUAL_THREADS.getScope()) + .isEqualTo(GlobalConfiguration.SCOPE.SERVER); + } + + @Test + public void testConfigurationDefault() { + // Verify that virtual threads are enabled by default + assertThat(GlobalConfiguration.HA_USE_VIRTUAL_THREADS.getDefValue()) + .isEqualTo(true); + } +}