Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ void singleServerLoadTest(DatabaseWrapper.Protocol protocol) throws Exception {

// Parameters for the test
final int numOfThreads = 5; //number of threads to use to insert users and photos
final int numOfUsers = 10000; // Each thread will create 200000 users
final int numOfPhotos = 10; // Each user will have 5 photos
final int numOfFriendship = 1000; // Each thread will create 100000 friendships
final int numOfLike = 1000; // Each thread will create 100000 likes
final int numOfUsers = 10000; // number of users per thread
final int numOfPhotos = 10; // number of phots per user
final int numOfFriendship = 1000; // total number of friendships edges to create
final int numOfLike = 1000; // total number of likes to photo edges to create

int expectedUsersCount = numOfUsers * numOfThreads;
int expectedPhotoCount = expectedUsersCount * numOfPhotos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@
import com.arcadedb.test.support.ContainersTestTemplate;
import com.arcadedb.test.support.DatabaseWrapper;
import com.arcadedb.test.support.ServerWrapper;
import io.micrometer.core.instrument.Metrics;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -53,8 +58,9 @@ void singleServerLoadTest(DatabaseWrapper.Protocol protocol) throws Exception {

int expectedUsersCount = numOfUsers * numOfThreads;
int expectedPhotoCount = expectedUsersCount * numOfPhotos;

LocalDateTime startedAt = LocalDateTime.now();
logger.info("Creating {} users using {} threads", expectedUsersCount, numOfThreads);

ExecutorService executor = Executors.newFixedThreadPool(numOfThreads);
for (int i = 0; i < numOfThreads; i++) {
// Each thread will create users and photos
Expand All @@ -79,6 +85,14 @@ void singleServerLoadTest(DatabaseWrapper.Protocol protocol) throws Exception {
}
}

LocalDateTime finishedAt = LocalDateTime.now();
logger.info("Finishing at {}", DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(finishedAt));
logger.info("Total time: {} minutes", Duration.between(startedAt, finishedAt).toMinutes());

Metrics.globalRegistry.getMeters().forEach(meter -> {
logger.info("Meter: {} - {}", meter.getId().getName(), meter.measure());
});

db.assertThatUserCountIs(expectedUsersCount);
db.assertThatPhotoCountIs(expectedPhotoCount);

Expand Down
20 changes: 20 additions & 0 deletions engine/src/main/java/com/arcadedb/GlobalConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ public Object call(final Object value) {
POLYGLOT_COMMAND_TIMEOUT("arcadedb.polyglotCommand.timeout", SCOPE.DATABASE, "Default timeout for polyglot commands (in ms)",
Long.class, 10_000),

QUERY_ENGINES_USE_VIRTUAL_THREADS("arcadedb.queryEngines.useVirtualThreads", SCOPE.DATABASE,
"Use virtual threads for executing Java and Polyglot queries. Virtual threads provide better scalability for concurrent query execution.",
Boolean.class, true),

QUERY_MAX_HEAP_ELEMENTS_ALLOWED_PER_OP("arcadedb.queryMaxHeapElementsAllowedPerOp", SCOPE.DATABASE, """
Maximum number of elements (records) allowed in a single query for memory-intensive operations (eg. ORDER BY in heap). \
If exceeded, the query fails with an OCommandExecutionException. Negative number means no limit.\
Expand Down Expand Up @@ -375,6 +379,10 @@ This setting is intended as a safety measure against excessive resource consumpt
"Timeout in seconds for a HTTP session (managing a transaction) to expire. This timeout is computed from the latest command against the session",
Long.class, 5), // 5 SECONDS DEFAULT

SERVER_HTTP_USE_VIRTUAL_THREADS("arcadedb.server.httpUseVirtualThreads", SCOPE.SERVER,
"Use virtual threads for HTTP request handling. Virtual threads provide better scalability for concurrent HTTP connections (requires Java 21+).",
Boolean.class, true),

// SERVER WS
SERVER_WS_EVENT_BUS_QUEUE_SIZE("arcadedb.server.eventBusQueueSize", SCOPE.SERVER,
"Size of the queue used as a buffer for unserviced database change events.", Integer.class, 1000),
Expand Down Expand Up @@ -435,6 +443,10 @@ This setting is intended as a safety measure against excessive resource consumpt
HA_REPLICATION_INCOMING_PORTS("arcadedb.ha.replicationIncomingPorts", SCOPE.SERVER,
"TCP/IP port number used for incoming replication connections", String.class, "2424-2433"),

HA_USE_VIRTUAL_THREADS("arcadedb.ha.useVirtualThreads", SCOPE.SERVER,
"Use virtual threads for HA replication network executors. Virtual threads provide better scalability for replica connections (requires Java 21+).",
Boolean.class, true),

// KUBERNETES
HA_K8S("arcadedb.ha.k8s", SCOPE.SERVER, "The server is running inside Kubernetes", Boolean.class, false),

Expand All @@ -452,13 +464,21 @@ This setting is intended as a safety measure against excessive resource consumpt
POSTGRES_DEBUG("arcadedb.postgres.debug", SCOPE.SERVER,
"Enables the printing of Postgres protocol to the console. Default is false", Boolean.class, false),

POSTGRES_USE_VIRTUAL_THREADS("arcadedb.postgres.useVirtualThreads", SCOPE.SERVER,
"Use virtual threads for PostgreSQL wire protocol connections. Virtual threads provide better scalability for concurrent connections (requires Java 21+).",
Boolean.class, true),

// REDIS
REDIS_PORT("arcadedb.redis.port", SCOPE.SERVER,
"TCP/IP port number used for incoming connections for Redis plugin. Default is 6379", Integer.class, 6379),

REDIS_HOST("arcadedb.redis.host", SCOPE.SERVER,
"TCP/IP host name used for incoming connections for Redis plugin. Default is '0.0.0.0'", String.class, "0.0.0.0"),

REDIS_USE_VIRTUAL_THREADS("arcadedb.redis.useVirtualThreads", SCOPE.SERVER,
"Use virtual threads for Redis wire protocol connections. Virtual threads provide better scalability for concurrent connections (requires Java 21+).",
Boolean.class, true),

// MONGO
MONGO_PORT("arcadedb.mongo.port", SCOPE.SERVER,
"TCP/IP port number used for incoming connections for Mongo plugin. Default is 27017", Integer.class, 27017),
Expand Down
19 changes: 14 additions & 5 deletions engine/src/main/java/com/arcadedb/query/java/JavaQueryEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
public class JavaQueryEngine implements QueryEngine {
public static final String ENGINE_NAME = "java";
private final long timeout;
private final ThreadPoolExecutor userCodeExecutor;
private final ArrayBlockingQueue<Runnable> userCodeExecutorQueue;
private final ExecutorService userCodeExecutor;
private final Set<String> registeredClasses = new HashSet<>();
private final Set<String> registeredMethods = new HashSet<>();

Expand Down Expand Up @@ -94,15 +93,25 @@ public QueryEngine getInstance(final DatabaseInternal database) {
}

protected JavaQueryEngine(final DatabaseInternal database) {
this.userCodeExecutorQueue = new ArrayBlockingQueue<>(1_000);
this.userCodeExecutor = new ThreadPoolExecutor(8, 8, 30, TimeUnit.SECONDS, userCodeExecutorQueue, new ThreadPoolExecutor.CallerRunsPolicy());
this.timeout = database.getConfiguration().getValueAsLong(GlobalConfiguration.POLYGLOT_COMMAND_TIMEOUT);
this.userCodeExecutor = createExecutor(database);
}

private static ExecutorService createExecutor(final DatabaseInternal database) {
final boolean useVirtualThreads = database.getConfiguration()
.getValueAsBoolean(GlobalConfiguration.QUERY_ENGINES_USE_VIRTUAL_THREADS);

if (useVirtualThreads) {
return Executors.newVirtualThreadPerTaskExecutor();
} else {
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1_000);
return new ThreadPoolExecutor(8, 8, 30, TimeUnit.SECONDS, queue, new ThreadPoolExecutor.CallerRunsPolicy());
}
}

@Override
public void close() {
userCodeExecutor.shutdown();
userCodeExecutorQueue.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@
import java.util.concurrent.*;

public class PolyglotQueryEngine implements QueryEngine {
private GraalPolyglotEngine polyglotEngine;
private final String language;
private final long timeout;
private final DatabaseInternal database;
private List<String> allowedPackages = null;
private final ExecutorService userCodeExecutor;
private final ArrayBlockingQueue<Runnable> userCodeExecutorQueue;
private GraalPolyglotEngine polyglotEngine;
private final String language;
private final long timeout;
private final DatabaseInternal database;
private List<String> allowedPackages = null;
private final ExecutorService userCodeExecutor;

private static final AnalyzedQuery ANALYZED_QUERY = new AnalyzedQuery() {
@Override
Expand Down Expand Up @@ -90,10 +89,20 @@ protected PolyglotQueryEngine(final DatabaseInternal database, final String lang
this.allowedPackages = allowedPackages;
this.polyglotEngine = GraalPolyglotEngine.newBuilder(database, Engine.create()).setLanguage(language)
.setAllowedPackages(allowedPackages).build();
this.userCodeExecutorQueue = new ArrayBlockingQueue<>(10000);
this.userCodeExecutor = new ThreadPoolExecutor(8, 8, 30, TimeUnit.SECONDS, userCodeExecutorQueue,
new ThreadPoolExecutor.CallerRunsPolicy());
this.timeout = database.getConfiguration().getValueAsLong(GlobalConfiguration.POLYGLOT_COMMAND_TIMEOUT);
this.userCodeExecutor = createExecutor(database);
}

private static ExecutorService createExecutor(final DatabaseInternal database) {
final boolean useVirtualThreads = database.getConfiguration()
.getValueAsBoolean(GlobalConfiguration.QUERY_ENGINES_USE_VIRTUAL_THREADS);

if (useVirtualThreads) {
return Executors.newVirtualThreadPerTaskExecutor();
} else {
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10_000);
return new ThreadPoolExecutor(8, 8, 30, TimeUnit.SECONDS, queue, new ThreadPoolExecutor.CallerRunsPolicy());
}
}

@Override
Expand Down Expand Up @@ -215,7 +224,6 @@ public ResultSet query(final String query, ContextConfiguration configuration, f
@Override
public void close() {
userCodeExecutor.shutdown();
userCodeExecutorQueue.clear();
polyglotEngine.close();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright © 2021-present Arcade Data Ltd (info@arcadedata.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-FileCopyrightText: 2021-present Arcade Data Ltd (info@arcadedata.com)
* SPDX-License-Identifier: Apache-2.0
*/
package com.arcadedb.query.java;

import com.arcadedb.GlobalConfiguration;
import com.arcadedb.TestHelper;
import com.arcadedb.query.sql.executor.ResultSet;
import org.junit.jupiter.api.Test;

import java.util.concurrent.*;

import static org.assertj.core.api.Assertions.*;

/**
* Test for Java Query Engine with Virtual Threads support.
* Tests both virtual thread and traditional platform thread execution.
*/
public class JavaQueryEngineVirtualThreadTest extends TestHelper {

@Test
public void testVirtualThreadsEnabled() {
database.getConfiguration().setValue(GlobalConfiguration.QUERY_ENGINES_USE_VIRTUAL_THREADS, true);

database.getQueryEngine("java").registerFunctions(JavaMethods.class.getName() + "::sum");

final ResultSet result = database.command("java", "com.arcadedb.query.java.JavaMethods::sum", 5, 3);
assertThat(result.hasNext()).isTrue();
assertThat((Integer) result.next().getProperty("value")).isEqualTo(8);
}

@Test
public void testPlatformThreadsEnabled() {
database.getConfiguration().setValue(GlobalConfiguration.QUERY_ENGINES_USE_VIRTUAL_THREADS, false);

database.getQueryEngine("java").registerFunctions(JavaMethods.class.getName() + "::sum");

final ResultSet result = database.command("java", "com.arcadedb.query.java.JavaMethods::sum", 5, 3);
assertThat(result.hasNext()).isTrue();
assertThat((Integer) result.next().getProperty("value")).isEqualTo(8);
}

@Test
public void testConcurrentExecutionWithVirtualThreads() throws InterruptedException {
database.getConfiguration().setValue(GlobalConfiguration.QUERY_ENGINES_USE_VIRTUAL_THREADS, true);

database.getQueryEngine("java").registerFunctions(JavaMethods.class.getName() + "::sum");

// Execute multiple concurrent queries
final CountDownLatch latch = new CountDownLatch(10);
final ExecutorService executor = Executors.newFixedThreadPool(5);

for (int i = 0; i < 10; i++) {
final int index = i;
executor.submit(() -> {
try {
final ResultSet result = database.command("java", "com.arcadedb.query.java.JavaMethods::sum", index, index * 2);
assertThat(result.hasNext()).isTrue();
assertThat((Integer) result.next().getProperty("value")).isEqualTo(index + index * 2);
} finally {
latch.countDown();
}
});
}

try {
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
} finally {
executor.shutdown();
}
}

@Test
public void testConcurrentExecutionWithPlatformThreads() throws InterruptedException {
database.getConfiguration().setValue(GlobalConfiguration.QUERY_ENGINES_USE_VIRTUAL_THREADS, false);

database.getQueryEngine("java").registerFunctions(JavaMethods.class.getName() + "::sum");

// Execute multiple concurrent queries
final CountDownLatch latch = new CountDownLatch(10);
final ExecutorService executor = Executors.newFixedThreadPool(5);

for (int i = 0; i < 10; i++) {
final int index = i;
executor.submit(() -> {
try {
final ResultSet result = database.command("java", "com.arcadedb.query.java.JavaMethods::sum", index, index * 2);
assertThat(result.hasNext()).isTrue();
assertThat((Integer) result.next().getProperty("value")).isEqualTo(index + index * 2);
} finally {
latch.countDown();
}
});
}

try {
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
} finally {
executor.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

import java.io.File;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

Expand Down Expand Up @@ -141,7 +143,9 @@ private void startStandardServer(ContextConfiguration config) throws IOException
grpcServer = serverBuilder
.maxInboundMessageSize(256 * 1024 * 1024)
.maxInboundMetadataSize(32 * 1024 * 1024)
.build().start();
.executor(Executors.newVirtualThreadPerTaskExecutor())
.build()
.start();

// Build status message
StringBuilder status = new StringBuilder();
Expand Down Expand Up @@ -180,7 +184,9 @@ private void startXdsServer(ContextConfiguration config) throws IOException {
xdsServer = xdsBuilder
.maxInboundMessageSize(256 * 1024 * 1024)
.maxInboundMetadataSize(32 * 1024 * 1024)
.build().start();
.executor(Executors.newVirtualThreadPerTaskExecutor())
.build()
.start();

LogManager.instance().log(this, Level.INFO, "gRPC XDS server started on port %s (xDS management enabled)", port);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,13 @@ public void run() {
// CREATE A NEW PROTOCOL INSTANCE
// TODO: OPEN A DATABASE
final PostgresNetworkExecutor connection = new PostgresNetworkExecutor(server, socket, null);
connection.start();

// Use virtual threads if configured, otherwise use platform threads
if (server.getConfiguration().getValueAsBoolean(com.arcadedb.GlobalConfiguration.POSTGRES_USE_VIRTUAL_THREADS)) {
Thread.startVirtualThread(connection);
} else {
connection.start();
}

} catch (final Exception e) {
if (active)
Expand Down
Loading
Loading