diff --git a/bazel/java.MODULE.bazel b/bazel/java.MODULE.bazel index efa94121..f46e6c65 100644 --- a/bazel/java.MODULE.bazel +++ b/bazel/java.MODULE.bazel @@ -12,6 +12,7 @@ maven.install( artifacts = [ "ch.qos.logback:logback-classic:%s" % LOGBACK_VERSION, "ch.qos.logback:logback-core:%s" % LOGBACK_VERSION, + "com.zaxxer:HikariCP:5.1.0", "com.fasterxml.jackson.core:jackson-annotations", "com.fasterxml.jackson.core:jackson-core", "com.fasterxml.jackson.core:jackson-databind", @@ -41,7 +42,12 @@ maven.install( "org.eclipse.jetty:jetty-server:%s" % JETTY_VERSION, "org.eclipse.jetty.websocket:websocket-jetty-server:%s" % JETTY_VERSION, "org.jspecify:jspecify:1.0.0", + "org.postgresql:postgresql:42.7.4", "org.slf4j:slf4j-api:2.0.17", + "software.amazon.awssdk:auth:2.29.44", + "software.amazon.awssdk:regions:2.29.44", + "software.amazon.awssdk:sqs:2.29.44", + "software.amazon.awssdk:url-connection-client:2.29.44", ], boms = [ "io.netty:netty-bom:4.2.9.Final", diff --git a/jvm/src/main/java/com/muchq/chess_indexer/App.java b/jvm/src/main/java/com/muchq/chess_indexer/App.java new file mode 100644 index 00000000..9ae602d3 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/App.java @@ -0,0 +1,14 @@ +package com.muchq.chess_indexer; + +import io.micronaut.runtime.Micronaut; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class App { + private static final Logger LOG = LoggerFactory.getLogger(App.class); + + public static void main(String[] args) { + LOG.info("Starting chess indexer API"); + Micronaut.run(App.class, args); + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/BUILD.bazel b/jvm/src/main/java/com/muchq/chess_indexer/BUILD.bazel new file mode 100644 index 00000000..6050ded1 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/BUILD.bazel @@ -0,0 +1,77 @@ +load("@rules_java//java:java_binary.bzl", "java_binary") +load("@rules_java//java:defs.bzl", "java_library") +load("//bazel/rules:oci.bzl", "linux_oci_java") + +java_library( + name = "chess_indexer_lib", + srcs = glob(["**/*.java"], exclude = [ + "App.java", + "worker/WorkerApp.java", + ]), + visibility = ["//visibility:public"], + deps = [ + "//jvm/src/main/java/com/muchq/chess_com_api", + "//jvm/src/main/java/com/muchq/http_client/core", + "//jvm/src/main/java/com/muchq/http_client/jdk11", + "//jvm/src/main/java/com/muchq/json", + "@maven//:com_fasterxml_jackson_core_jackson_annotations", + "@maven//:com_fasterxml_jackson_core_jackson_databind", + "@maven//:com_zaxxer_HikariCP", + "@maven//:io_micronaut_micronaut_context", + "@maven//:io_micronaut_micronaut_core", + "@maven//:io_micronaut_micronaut_http", + "@maven//:io_micronaut_micronaut_runtime", + "@maven//:jakarta_inject_jakarta_inject_api", + "@maven//:org_postgresql_postgresql", + "@maven//:org_slf4j_slf4j_api", + "@maven//:software_amazon_awssdk_auth", + "@maven//:software_amazon_awssdk_regions", + "@maven//:software_amazon_awssdk_sqs", + "@maven//:software_amazon_awssdk_url_connection_client", + ], +) + +java_binary( + name = "chess_indexer_api", + srcs = ["App.java"], + main_class = "com.muchq.chess_indexer.App", + plugins = [ + "//bazel/rules:micronaut_type_element_visitor_processor", + "//bazel/rules:micronaut_aggregating_type_element_visitor_processor", + "//bazel/rules:micronaut_bean_definition_inject_processor", + "//bazel/rules:micronaut_package_element_visitor_processor", + ], + resources = ["//jvm/src/main/resources:micronaut_config"], + visibility = ["//visibility:public"], + runtime_deps = [ + "@maven//:ch_qos_logback_logback_classic", + ], + deps = [ + ":chess_indexer_lib", + "@maven//:io_micronaut_micronaut_http_server_netty", + ], +) + +java_binary( + name = "chess_indexer_worker", + srcs = ["worker/WorkerApp.java"], + main_class = "com.muchq.chess_indexer.worker.WorkerApp", + plugins = [ + "//bazel/rules:micronaut_type_element_visitor_processor", + "//bazel/rules:micronaut_aggregating_type_element_visitor_processor", + "//bazel/rules:micronaut_bean_definition_inject_processor", + "//bazel/rules:micronaut_package_element_visitor_processor", + ], + resources = ["//jvm/src/main/resources:micronaut_config"], + visibility = ["//visibility:public"], + runtime_deps = [ + "@maven//:ch_qos_logback_logback_classic", + ], + deps = [ + ":chess_indexer_lib", + "@maven//:io_micronaut_micronaut_http_server_netty", + ], +) + +linux_oci_java(bin_name = "chess_indexer_api") +linux_oci_java(bin_name = "chess_indexer_worker") diff --git a/jvm/src/main/java/com/muchq/chess_indexer/api/IndexRequestController.java b/jvm/src/main/java/com/muchq/chess_indexer/api/IndexRequestController.java new file mode 100644 index 00000000..685c60c5 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/api/IndexRequestController.java @@ -0,0 +1,70 @@ +package com.muchq.chess_indexer.api; + +import com.muchq.chess_indexer.db.IndexRequestDao; +import com.muchq.chess_indexer.ingest.IndexRequestService; +import com.muchq.chess_indexer.model.IndexRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.annotation.Body; +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Get; +import io.micronaut.http.annotation.PathVariable; +import io.micronaut.http.annotation.Post; +import jakarta.inject.Inject; +import java.time.LocalDate; +import java.util.Optional; +import java.util.UUID; + +@Controller("/index-requests") +public class IndexRequestController { + + private final IndexRequestService indexRequestService; + private final IndexRequestDao indexRequestDao; + + @Inject + public IndexRequestController(IndexRequestService indexRequestService, IndexRequestDao indexRequestDao) { + this.indexRequestService = indexRequestService; + this.indexRequestDao = indexRequestDao; + } + + @Post + public HttpResponse create(@Body IndexRequestCreateRequest request) { + try { + String platform = normalizePlatform(request.platform()); + LocalDate startDate = parseDate(request.startDate()); + LocalDate endDate = parseDate(request.endDate()); + if (endDate.isBefore(startDate)) { + return HttpResponse.badRequest(); + } + + IndexRequest created = indexRequestService.submit(platform, request.username(), startDate, endDate); + return HttpResponse.created(IndexRequestResponse.from(created)); + } catch (IllegalArgumentException e) { + return HttpResponse.badRequest(); + } + } + + @Get("/{id}") + public HttpResponse get(@PathVariable String id) { + Optional request = indexRequestDao.findById(UUID.fromString(id)); + return request.map(value -> HttpResponse.ok(IndexRequestResponse.from(value))) + .orElseGet(HttpResponse::notFound); + } + + private String normalizePlatform(String platform) { + if (platform == null) { + throw new IllegalArgumentException("platform is required"); + } + String normalized = platform.trim().toLowerCase(); + if (!normalized.equals("chess.com")) { + throw new IllegalArgumentException("Only chess.com is supported right now"); + } + return normalized; + } + + private LocalDate parseDate(String value) { + if (value == null || value.isBlank()) { + throw new IllegalArgumentException("date is required"); + } + return LocalDate.parse(value); + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/api/IndexRequestCreateRequest.java b/jvm/src/main/java/com/muchq/chess_indexer/api/IndexRequestCreateRequest.java new file mode 100644 index 00000000..85d19e6b --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/api/IndexRequestCreateRequest.java @@ -0,0 +1,8 @@ +package com.muchq.chess_indexer.api; + +public record IndexRequestCreateRequest( + String platform, + String username, + String startDate, + String endDate +) {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/api/IndexRequestResponse.java b/jvm/src/main/java/com/muchq/chess_indexer/api/IndexRequestResponse.java new file mode 100644 index 00000000..1a957789 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/api/IndexRequestResponse.java @@ -0,0 +1,27 @@ +package com.muchq.chess_indexer.api; + +import com.muchq.chess_indexer.model.IndexRequest; + +public record IndexRequestResponse( + String id, + String platform, + String username, + String startDate, + String endDate, + String status, + int gamesIndexed, + String errorMessage +) { + public static IndexRequestResponse from(IndexRequest request) { + return new IndexRequestResponse( + request.id().toString(), + request.platform(), + request.username(), + request.startDate().toString(), + request.endDate().toString(), + request.status().name(), + request.gamesIndexed(), + request.errorMessage() + ); + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/api/QueryController.java b/jvm/src/main/java/com/muchq/chess_indexer/api/QueryController.java new file mode 100644 index 00000000..bfbd3594 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/api/QueryController.java @@ -0,0 +1,34 @@ +package com.muchq.chess_indexer.api; + +import com.muchq.chess_indexer.model.GameSummary; +import com.muchq.chess_indexer.query.QueryService; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.annotation.Body; +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Post; +import jakarta.inject.Inject; +import java.util.List; + +@Controller("/queries") +public class QueryController { + + private final QueryService queryService; + + @Inject + public QueryController(QueryService queryService) { + this.queryService = queryService; + } + + @Post + public HttpResponse query(@Body QueryRequest request) { + if (request == null || request.query() == null || request.query().isBlank()) { + return HttpResponse.badRequest(); + } + try { + List games = queryService.run(request.query()); + return HttpResponse.ok(QueryResponse.of(games)); + } catch (IllegalArgumentException e) { + return HttpResponse.badRequest(); + } + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/api/QueryRequest.java b/jvm/src/main/java/com/muchq/chess_indexer/api/QueryRequest.java new file mode 100644 index 00000000..526d0ff9 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/api/QueryRequest.java @@ -0,0 +1,3 @@ +package com.muchq.chess_indexer.api; + +public record QueryRequest(String query) {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/api/QueryResponse.java b/jvm/src/main/java/com/muchq/chess_indexer/api/QueryResponse.java new file mode 100644 index 00000000..4e76463e --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/api/QueryResponse.java @@ -0,0 +1,10 @@ +package com.muchq.chess_indexer.api; + +import com.muchq.chess_indexer.model.GameSummary; +import java.util.List; + +public record QueryResponse(int count, List games) { + public static QueryResponse of(List games) { + return new QueryResponse(games.size(), games); + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/config/IndexerConfig.java b/jvm/src/main/java/com/muchq/chess_indexer/config/IndexerConfig.java new file mode 100644 index 00000000..468661a2 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/config/IndexerConfig.java @@ -0,0 +1,56 @@ +package com.muchq.chess_indexer.config; + +public record IndexerConfig( + String dbUrl, + String dbUser, + String dbPassword, + String sqsQueueUrl, + String awsRegion, + int workerPollSeconds, + int workerBatchSize, + int apiQueryLimit +) { + + public static IndexerConfig fromEnv() { + String dbUrl = getenvOrThrow("INDEXER_DB_URL"); + String dbUser = getenvOrDefault("INDEXER_DB_USER", "postgres"); + String dbPassword = getenvOrDefault("INDEXER_DB_PASSWORD", ""); + String sqsQueueUrl = getenvOrDefault("INDEXER_SQS_QUEUE_URL", ""); + String awsRegion = getenvOrDefault("AWS_REGION", "us-east-1"); + + int workerPollSeconds = parseInt(getenvOrDefault("INDEXER_WORKER_POLL_SECONDS", "10")); + int workerBatchSize = parseInt(getenvOrDefault("INDEXER_WORKER_BATCH_SIZE", "5")); + int apiQueryLimit = parseInt(getenvOrDefault("INDEXER_API_QUERY_LIMIT", "100")); + + return new IndexerConfig( + dbUrl, + dbUser, + dbPassword, + sqsQueueUrl, + awsRegion, + workerPollSeconds, + workerBatchSize, + apiQueryLimit); + } + + private static String getenvOrDefault(String key, String defaultValue) { + String value = System.getenv(key); + return value == null ? defaultValue : value; + } + + private static String getenvOrThrow(String key) { + String value = System.getenv(key); + if (value == null || value.isBlank()) { + throw new IllegalStateException("Missing required env var: " + key); + } + return value; + } + + private static int parseInt(String raw) { + try { + return Integer.parseInt(raw); + } catch (NumberFormatException e) { + throw new IllegalStateException("Failed to parse int env var: " + raw, e); + } + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/config/IndexerModule.java b/jvm/src/main/java/com/muchq/chess_indexer/config/IndexerModule.java new file mode 100644 index 00000000..8d3d5ea0 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/config/IndexerModule.java @@ -0,0 +1,68 @@ +package com.muchq.chess_indexer.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.muchq.chess_com_api.ChessClient; +import com.muchq.http_client.core.HttpClient; +import com.muchq.http_client.jdk11.Jdk11HttpClient; +import com.muchq.json.JsonUtils; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import io.micronaut.context.annotation.Context; +import io.micronaut.context.annotation.Factory; +import jakarta.inject.Singleton; +import java.time.Clock; +import javax.sql.DataSource; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; + +@Factory +public class IndexerModule { + + @Context + public IndexerConfig indexerConfig() { + return IndexerConfig.fromEnv(); + } + + @Context + public Clock clock() { + return Clock.systemUTC(); + } + + @Context + public ObjectMapper objectMapper() { + return JsonUtils.mapper(); + } + + @Context + public HttpClient httpClient() { + return new Jdk11HttpClient(java.net.http.HttpClient.newHttpClient()); + } + + @Context + public ChessClient chessClient(HttpClient httpClient, ObjectMapper objectMapper) { + return new ChessClient(httpClient, objectMapper); + } + + @Singleton + public DataSource dataSource(IndexerConfig config) { + HikariConfig hikari = new HikariConfig(); + hikari.setJdbcUrl(config.dbUrl()); + hikari.setUsername(config.dbUser()); + hikari.setPassword(config.dbPassword()); + hikari.setMaximumPoolSize(10); + hikari.setMinimumIdle(1); + hikari.setPoolName("chess-indexer"); + return new HikariDataSource(hikari); + } + + @Singleton + public SqsClient sqsClient(IndexerConfig config) { + return SqsClient.builder() + .region(Region.of(config.awsRegion())) + .credentialsProvider(DefaultCredentialsProvider.create()) + .httpClientBuilder(UrlConnectionHttpClient.builder()) + .build(); + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/db/GameDao.java b/jvm/src/main/java/com/muchq/chess_indexer/db/GameDao.java new file mode 100644 index 00000000..11b22ab0 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/db/GameDao.java @@ -0,0 +1,125 @@ +package com.muchq.chess_indexer.db; + +import com.muchq.chess_indexer.model.GameRecord; +import com.muchq.chess_indexer.model.GameSummary; +import jakarta.inject.Singleton; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import javax.sql.DataSource; + +@Singleton +public class GameDao { + + private final DataSource dataSource; + + public GameDao(DataSource dataSource) { + this.dataSource = dataSource; + } + + public UUID upsert(GameRecord record) { + try (Connection connection = dataSource.getConnection(); + PreparedStatement statement = connection.prepareStatement(""" + INSERT INTO games ( + id, platform, game_uuid, end_time, rated, time_class, rules, eco, + white_username, white_elo, black_username, black_elo, result, pgn, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (platform, game_uuid) + DO UPDATE SET + end_time = EXCLUDED.end_time, + rated = EXCLUDED.rated, + time_class = EXCLUDED.time_class, + rules = EXCLUDED.rules, + eco = EXCLUDED.eco, + white_username = EXCLUDED.white_username, + white_elo = EXCLUDED.white_elo, + black_username = EXCLUDED.black_username, + black_elo = EXCLUDED.black_elo, + result = EXCLUDED.result, + pgn = EXCLUDED.pgn + RETURNING id + """)) { + UUID id = record.id() != null ? record.id() : UUID.randomUUID(); + statement.setObject(1, id); + statement.setString(2, record.platform()); + statement.setString(3, record.gameUuid()); + statement.setObject(4, record.endTime().atOffset(ZoneOffset.UTC)); + statement.setBoolean(5, record.rated()); + statement.setString(6, record.timeClass()); + statement.setString(7, record.rules()); + statement.setString(8, record.eco()); + statement.setString(9, record.whiteUsername()); + statement.setInt(10, record.whiteElo()); + statement.setString(11, record.blackUsername()); + statement.setInt(12, record.blackElo()); + statement.setString(13, record.result()); + statement.setString(14, record.pgn()); + statement.setObject(15, Instant.now().atOffset(ZoneOffset.UTC)); + try (ResultSet rs = statement.executeQuery()) { + if (rs.next()) { + return rs.getObject(1, UUID.class); + } + } + throw new RuntimeException("Failed to upsert game record"); + } catch (SQLException e) { + throw new RuntimeException("Failed to upsert game", e); + } + } + + public void linkToRequest(UUID requestId, UUID gameId) { + try (Connection connection = dataSource.getConnection(); + PreparedStatement statement = connection.prepareStatement(""" + INSERT INTO index_request_games (index_request_id, game_id) + VALUES (?, ?) + ON CONFLICT DO NOTHING + """)) { + statement.setObject(1, requestId); + statement.setObject(2, gameId); + statement.executeUpdate(); + } catch (SQLException e) { + throw new RuntimeException("Failed to link request to game", e); + } + } + + public List query(String sql, List params) { + try (Connection connection = dataSource.getConnection(); + PreparedStatement statement = connection.prepareStatement(sql)) { + for (int i = 0; i < params.size(); i++) { + statement.setObject(i + 1, params.get(i)); + } + + try (ResultSet rs = statement.executeQuery()) { + List results = new ArrayList<>(); + while (rs.next()) { + results.add(mapSummary(rs)); + } + return results; + } + } catch (SQLException e) { + throw new RuntimeException("Failed to query games", e); + } + } + + private GameSummary mapSummary(ResultSet rs) throws SQLException { + return new GameSummary( + rs.getObject("id", UUID.class), + rs.getString("platform"), + rs.getString("game_uuid"), + rs.getObject("end_time", java.time.OffsetDateTime.class).toInstant(), + rs.getString("white_username"), + rs.getInt("white_elo"), + rs.getString("black_username"), + rs.getInt("black_elo"), + rs.getString("result"), + rs.getString("time_class"), + rs.getString("eco"), + rs.getBoolean("rated") + ); + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/db/GameFeaturesDao.java b/jvm/src/main/java/com/muchq/chess_indexer/db/GameFeaturesDao.java new file mode 100644 index 00000000..534f2be1 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/db/GameFeaturesDao.java @@ -0,0 +1,44 @@ +package com.muchq.chess_indexer.db; + +import com.muchq.chess_indexer.model.GameFeatures; +import jakarta.inject.Singleton; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import javax.sql.DataSource; + +@Singleton +public class GameFeaturesDao { + + private final DataSource dataSource; + + public GameFeaturesDao(DataSource dataSource) { + this.dataSource = dataSource; + } + + public void upsert(GameFeatures features) { + try (Connection connection = dataSource.getConnection(); + PreparedStatement statement = connection.prepareStatement(""" + INSERT INTO game_features ( + game_id, total_plies, has_castle, has_promotion, has_check, has_checkmate + ) VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT (game_id) + DO UPDATE SET + total_plies = EXCLUDED.total_plies, + has_castle = EXCLUDED.has_castle, + has_promotion = EXCLUDED.has_promotion, + has_check = EXCLUDED.has_check, + has_checkmate = EXCLUDED.has_checkmate + """)) { + statement.setObject(1, features.gameId()); + statement.setInt(2, features.totalPlies()); + statement.setBoolean(3, features.hasCastle()); + statement.setBoolean(4, features.hasPromotion()); + statement.setBoolean(5, features.hasCheck()); + statement.setBoolean(6, features.hasCheckmate()); + statement.executeUpdate(); + } catch (SQLException e) { + throw new RuntimeException("Failed to upsert game features", e); + } + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/db/GameMotifDao.java b/jvm/src/main/java/com/muchq/chess_indexer/db/GameMotifDao.java new file mode 100644 index 00000000..9f4c767c --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/db/GameMotifDao.java @@ -0,0 +1,44 @@ +package com.muchq.chess_indexer.db; + +import jakarta.inject.Singleton; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; +import java.util.UUID; +import javax.sql.DataSource; + +@Singleton +public class GameMotifDao { + + private final DataSource dataSource; + + public GameMotifDao(DataSource dataSource) { + this.dataSource = dataSource; + } + + public void replaceMotifs(UUID gameId, List motifs) { + try (Connection connection = dataSource.getConnection()) { + try (PreparedStatement deleteStmt = connection.prepareStatement( + "DELETE FROM game_motifs WHERE game_id = ?")) { + deleteStmt.setObject(1, gameId); + deleteStmt.executeUpdate(); + } + + try (PreparedStatement insertStmt = connection.prepareStatement( + "INSERT INTO game_motifs (game_id, motif_name, first_ply) VALUES (?, ?, ?)")) { + for (MotifRecord motif : motifs) { + insertStmt.setObject(1, gameId); + insertStmt.setString(2, motif.name()); + insertStmt.setInt(3, motif.firstPly()); + insertStmt.addBatch(); + } + insertStmt.executeBatch(); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to update motifs", e); + } + } + + public record MotifRecord(String name, int firstPly) {} +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/db/IndexRequestDao.java b/jvm/src/main/java/com/muchq/chess_indexer/db/IndexRequestDao.java new file mode 100644 index 00000000..fd98e945 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/db/IndexRequestDao.java @@ -0,0 +1,106 @@ +package com.muchq.chess_indexer.db; + +import com.muchq.chess_indexer.model.IndexRequest; +import com.muchq.chess_indexer.model.IndexRequestStatus; +import jakarta.inject.Singleton; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.util.Optional; +import java.util.UUID; +import javax.sql.DataSource; + +@Singleton +public class IndexRequestDao { + + private final DataSource dataSource; + + public IndexRequestDao(DataSource dataSource) { + this.dataSource = dataSource; + } + + public IndexRequest create(String platform, String username, LocalDate startDate, LocalDate endDate) { + UUID id = UUID.randomUUID(); + Instant now = Instant.now(); + IndexRequestStatus status = IndexRequestStatus.PENDING; + + try (Connection connection = dataSource.getConnection(); + PreparedStatement statement = connection.prepareStatement(""" + INSERT INTO index_requests ( + id, platform, username, start_date, end_date, status, games_indexed, error_message, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """)) { + statement.setObject(1, id); + statement.setString(2, platform); + statement.setString(3, username); + statement.setObject(4, startDate); + statement.setObject(5, endDate); + statement.setString(6, status.name()); + statement.setInt(7, 0); + statement.setString(8, null); + statement.setObject(9, now.atOffset(ZoneOffset.UTC)); + statement.setObject(10, now.atOffset(ZoneOffset.UTC)); + statement.executeUpdate(); + } catch (SQLException e) { + throw new RuntimeException("Failed to create index request", e); + } + + return new IndexRequest(id, platform, username, startDate, endDate, status, 0, null, now, now); + } + + public Optional findById(UUID id) { + try (Connection connection = dataSource.getConnection(); + PreparedStatement statement = connection.prepareStatement(""" + SELECT id, platform, username, start_date, end_date, status, games_indexed, error_message, created_at, updated_at + FROM index_requests + WHERE id = ? + """)) { + statement.setObject(1, id); + try (ResultSet rs = statement.executeQuery()) { + if (!rs.next()) { + return Optional.empty(); + } + return Optional.of(mapRow(rs)); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to fetch index request", e); + } + } + + public void updateStatus(UUID id, IndexRequestStatus status, int gamesIndexed, String errorMessage) { + Instant now = Instant.now(); + try (Connection connection = dataSource.getConnection(); + PreparedStatement statement = connection.prepareStatement(""" + UPDATE index_requests + SET status = ?, games_indexed = ?, error_message = ?, updated_at = ? + WHERE id = ? + """)) { + statement.setString(1, status.name()); + statement.setInt(2, gamesIndexed); + statement.setString(3, errorMessage); + statement.setObject(4, now.atOffset(ZoneOffset.UTC)); + statement.setObject(5, id); + statement.executeUpdate(); + } catch (SQLException e) { + throw new RuntimeException("Failed to update index request status", e); + } + } + + private IndexRequest mapRow(ResultSet rs) throws SQLException { + UUID id = rs.getObject("id", UUID.class); + String platform = rs.getString("platform"); + String username = rs.getString("username"); + LocalDate startDate = rs.getObject("start_date", LocalDate.class); + LocalDate endDate = rs.getObject("end_date", LocalDate.class); + IndexRequestStatus status = IndexRequestStatus.valueOf(rs.getString("status")); + int gamesIndexed = rs.getInt("games_indexed"); + String errorMessage = rs.getString("error_message"); + Instant createdAt = rs.getObject("created_at", java.time.OffsetDateTime.class).toInstant(); + Instant updatedAt = rs.getObject("updated_at", java.time.OffsetDateTime.class).toInstant(); + return new IndexRequest(id, platform, username, startDate, endDate, status, gamesIndexed, errorMessage, createdAt, updatedAt); + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/db/SchemaMigrator.java b/jvm/src/main/java/com/muchq/chess_indexer/db/SchemaMigrator.java new file mode 100644 index 00000000..9da482dc --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/db/SchemaMigrator.java @@ -0,0 +1,99 @@ +package com.muchq.chess_indexer.db; + +import io.micronaut.context.event.StartupEvent; +import io.micronaut.runtime.event.annotation.EventListener; +import jakarta.inject.Singleton; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import javax.sql.DataSource; + +@Singleton +public class SchemaMigrator { + + private final DataSource dataSource; + + public SchemaMigrator(DataSource dataSource) { + this.dataSource = dataSource; + } + + @EventListener + public void onStartup(StartupEvent event) { + migrate(); + } + + public void migrate() { + try (Connection connection = dataSource.getConnection(); + Statement statement = connection.createStatement()) { + statement.executeUpdate(""" + CREATE TABLE IF NOT EXISTS index_requests ( + id UUID PRIMARY KEY, + platform TEXT NOT NULL, + username TEXT NOT NULL, + start_date DATE NOT NULL, + end_date DATE NOT NULL, + status TEXT NOT NULL, + games_indexed INTEGER NOT NULL DEFAULT 0, + error_message TEXT, + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + updated_at TIMESTAMP WITH TIME ZONE NOT NULL + ); + """); + + statement.executeUpdate(""" + CREATE TABLE IF NOT EXISTS games ( + id UUID PRIMARY KEY, + platform TEXT NOT NULL, + game_uuid TEXT NOT NULL, + end_time TIMESTAMP WITH TIME ZONE NOT NULL, + rated BOOLEAN NOT NULL, + time_class TEXT, + rules TEXT, + eco TEXT, + white_username TEXT, + white_elo INTEGER, + black_username TEXT, + black_elo INTEGER, + result TEXT, + pgn TEXT, + created_at TIMESTAMP WITH TIME ZONE NOT NULL + ); + """); + + statement.executeUpdate(""" + CREATE UNIQUE INDEX IF NOT EXISTS games_platform_uuid_idx + ON games(platform, game_uuid); + """); + + statement.executeUpdate(""" + CREATE TABLE IF NOT EXISTS game_features ( + game_id UUID PRIMARY KEY REFERENCES games(id) ON DELETE CASCADE, + total_plies INTEGER NOT NULL, + has_castle BOOLEAN NOT NULL, + has_promotion BOOLEAN NOT NULL, + has_check BOOLEAN NOT NULL, + has_checkmate BOOLEAN NOT NULL + ); + """); + + statement.executeUpdate(""" + CREATE TABLE IF NOT EXISTS game_motifs ( + game_id UUID NOT NULL REFERENCES games(id) ON DELETE CASCADE, + motif_name TEXT NOT NULL, + first_ply INTEGER NOT NULL, + PRIMARY KEY (game_id, motif_name) + ); + """); + + statement.executeUpdate(""" + CREATE TABLE IF NOT EXISTS index_request_games ( + index_request_id UUID NOT NULL REFERENCES index_requests(id) ON DELETE CASCADE, + game_id UUID NOT NULL REFERENCES games(id) ON DELETE CASCADE, + PRIMARY KEY (index_request_id, game_id) + ); + """); + } catch (SQLException e) { + throw new RuntimeException("Failed to migrate schema", e); + } + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/features/FeatureExtractionResult.java b/jvm/src/main/java/com/muchq/chess_indexer/features/FeatureExtractionResult.java new file mode 100644 index 00000000..c90d8c01 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/features/FeatureExtractionResult.java @@ -0,0 +1,6 @@ +package com.muchq.chess_indexer.features; + +import com.muchq.chess_indexer.model.GameFeatures; +import java.util.List; + +public record FeatureExtractionResult(GameFeatures features, List motifs) {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/features/FeatureExtractor.java b/jvm/src/main/java/com/muchq/chess_indexer/features/FeatureExtractor.java new file mode 100644 index 00000000..6c4d03c5 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/features/FeatureExtractor.java @@ -0,0 +1,79 @@ +package com.muchq.chess_indexer.features; + +import com.muchq.chess_indexer.model.GameFeatures; +import com.muchq.chess_indexer.pgn.PgnGame; +import com.muchq.chess_indexer.pgn.PgnParser; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +public class FeatureExtractor { + + private final PgnParser parser = new PgnParser(); + + public FeatureExtractionResult extract(UUID gameId, String pgn) { + PgnGame game = parser.parse(pgn); + List moves = game.moves(); + + boolean hasCastle = false; + boolean hasPromotion = false; + boolean hasCheck = false; + boolean hasCheckmate = false; + + List motifs = new ArrayList<>(); + + int ply = 0; + int castleFirst = -1; + int promotionFirst = -1; + int checkFirst = -1; + int checkmateFirst = -1; + + for (String move : moves) { + ply++; + if (castleFirst == -1 && isCastle(move)) { + castleFirst = ply; + hasCastle = true; + } + if (promotionFirst == -1 && move.contains("=")) { + promotionFirst = ply; + hasPromotion = true; + } + if (checkmateFirst == -1 && move.contains("#")) { + checkmateFirst = ply; + hasCheckmate = true; + } + if (checkFirst == -1 && move.contains("+")) { + checkFirst = ply; + hasCheck = true; + } + } + + if (castleFirst != -1) { + motifs.add(new Motif("castle", castleFirst)); + } + if (promotionFirst != -1) { + motifs.add(new Motif("promotion", promotionFirst)); + } + if (checkFirst != -1) { + motifs.add(new Motif("check", checkFirst)); + } + if (checkmateFirst != -1) { + motifs.add(new Motif("checkmate", checkmateFirst)); + } + + GameFeatures features = new GameFeatures( + gameId, + moves.size(), + hasCastle, + hasPromotion, + hasCheck, + hasCheckmate + ); + + return new FeatureExtractionResult(features, motifs); + } + + private boolean isCastle(String move) { + return move.startsWith("O-O-O") || move.startsWith("O-O"); + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/features/Motif.java b/jvm/src/main/java/com/muchq/chess_indexer/features/Motif.java new file mode 100644 index 00000000..80d326e4 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/features/Motif.java @@ -0,0 +1,3 @@ +package com.muchq.chess_indexer.features; + +public record Motif(String name, int firstPly) {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/ingest/GameIngestor.java b/jvm/src/main/java/com/muchq/chess_indexer/ingest/GameIngestor.java new file mode 100644 index 00000000..0b28501f --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/ingest/GameIngestor.java @@ -0,0 +1,131 @@ +package com.muchq.chess_indexer.ingest; + +import com.muchq.chess_com_api.ChessClient; +import com.muchq.chess_com_api.GamesResponse; +import com.muchq.chess_com_api.PlayedGame; +import com.muchq.chess_indexer.db.GameDao; +import com.muchq.chess_indexer.db.GameFeaturesDao; +import com.muchq.chess_indexer.db.GameMotifDao; +import com.muchq.chess_indexer.features.FeatureExtractionResult; +import com.muchq.chess_indexer.features.FeatureExtractor; +import com.muchq.chess_indexer.features.Motif; +import com.muchq.chess_indexer.model.GameRecord; +import com.muchq.chess_indexer.model.IndexRequest; +import jakarta.inject.Singleton; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.YearMonth; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +@Singleton +public class GameIngestor { + + private final ChessClient chessClient; + private final GameDao gameDao; + private final GameFeaturesDao featuresDao; + private final GameMotifDao motifDao; + private final FeatureExtractor extractor; + + public GameIngestor(ChessClient chessClient, + GameDao gameDao, + GameFeaturesDao featuresDao, + GameMotifDao motifDao) { + this.chessClient = chessClient; + this.gameDao = gameDao; + this.featuresDao = featuresDao; + this.motifDao = motifDao; + this.extractor = new FeatureExtractor(); + } + + public int ingest(IndexRequest request) { + List months = monthsBetween(request.startDate(), request.endDate()); + Instant startInstant = request.startDate().atStartOfDay(ZoneOffset.UTC).toInstant(); + Instant endInstant = request.endDate().atTime(LocalTime.MAX).atZone(ZoneOffset.UTC).toInstant(); + + int count = 0; + for (YearMonth month : months) { + Optional response = chessClient.fetchGames(request.username(), month); + if (response.isEmpty()) { + continue; + } + for (PlayedGame game : response.get().games()) { + if (game.endTime().isBefore(startInstant) || game.endTime().isAfter(endInstant)) { + continue; + } + UUID gameId = persistGame(request, game); + if (gameId != null) { + count++; + } + } + } + return count; + } + + private UUID persistGame(IndexRequest request, PlayedGame playedGame) { + GameRecord record = new GameRecord( + null, + request.platform(), + playedGame.uuid(), + playedGame.endTime(), + playedGame.rated(), + playedGame.timeClass(), + playedGame.rules(), + playedGame.eco(), + playedGame.whiteResult().username(), + playedGame.whiteResult().rating(), + playedGame.blackResult().username(), + playedGame.blackResult().rating(), + resultString(playedGame), + playedGame.pgn() + ); + + UUID gameId = gameDao.upsert(record); + gameDao.linkToRequest(request.id(), gameId); + + FeatureExtractionResult extraction = extractor.extract(gameId, playedGame.pgn()); + featuresDao.upsert(extraction.features()); + motifDao.replaceMotifs(gameId, toMotifRecords(extraction.motifs())); + + return gameId; + } + + private List toMotifRecords(List motifs) { + List records = new ArrayList<>(motifs.size()); + for (Motif motif : motifs) { + records.add(new GameMotifDao.MotifRecord(motif.name(), motif.firstPly())); + } + return records; + } + + private String resultString(PlayedGame game) { + String white = game.whiteResult().result(); + String black = game.blackResult().result(); + if ("win".equalsIgnoreCase(white)) { + return "1-0"; + } + if ("win".equalsIgnoreCase(black)) { + return "0-1"; + } + if ("agreed".equalsIgnoreCase(white) || "agreed".equalsIgnoreCase(black)) { + return "1/2-1/2"; + } + return "*"; + } + + private List monthsBetween(LocalDate start, LocalDate end) { + List months = new ArrayList<>(); + YearMonth current = YearMonth.from(start); + YearMonth last = YearMonth.from(end); + + while (!current.isAfter(last)) { + months.add(current); + current = current.plusMonths(1); + } + return months; + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/ingest/IndexRequestService.java b/jvm/src/main/java/com/muchq/chess_indexer/ingest/IndexRequestService.java new file mode 100644 index 00000000..7955eab2 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/ingest/IndexRequestService.java @@ -0,0 +1,27 @@ +package com.muchq.chess_indexer.ingest; + +import com.muchq.chess_indexer.db.IndexRequestDao; +import com.muchq.chess_indexer.model.IndexRequest; +import com.muchq.chess_indexer.model.IndexRequestStatus; +import com.muchq.chess_indexer.queue.IndexRequestQueue; +import jakarta.inject.Singleton; +import java.time.LocalDate; + +@Singleton +public class IndexRequestService { + + private final IndexRequestDao indexRequestDao; + private final IndexRequestQueue queue; + + public IndexRequestService(IndexRequestDao indexRequestDao, IndexRequestQueue queue) { + this.indexRequestDao = indexRequestDao; + this.queue = queue; + } + + public IndexRequest submit(String platform, String username, LocalDate startDate, LocalDate endDate) { + IndexRequest request = indexRequestDao.create(platform, username, startDate, endDate); + queue.enqueue(new IndexRequestQueue.IndexRequestMessage(request.id().toString())); + indexRequestDao.updateStatus(request.id(), IndexRequestStatus.QUEUED, 0, null); + return request; + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/model/GameFeatures.java b/jvm/src/main/java/com/muchq/chess_indexer/model/GameFeatures.java new file mode 100644 index 00000000..de2cdcab --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/model/GameFeatures.java @@ -0,0 +1,12 @@ +package com.muchq.chess_indexer.model; + +import java.util.UUID; + +public record GameFeatures( + UUID gameId, + int totalPlies, + boolean hasCastle, + boolean hasPromotion, + boolean hasCheck, + boolean hasCheckmate +) {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/model/GameRecord.java b/jvm/src/main/java/com/muchq/chess_indexer/model/GameRecord.java new file mode 100644 index 00000000..35809f33 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/model/GameRecord.java @@ -0,0 +1,21 @@ +package com.muchq.chess_indexer.model; + +import java.time.Instant; +import java.util.UUID; + +public record GameRecord( + UUID id, + String platform, + String gameUuid, + Instant endTime, + boolean rated, + String timeClass, + String rules, + String eco, + String whiteUsername, + int whiteElo, + String blackUsername, + int blackElo, + String result, + String pgn +) {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/model/GameSummary.java b/jvm/src/main/java/com/muchq/chess_indexer/model/GameSummary.java new file mode 100644 index 00000000..48b9f49d --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/model/GameSummary.java @@ -0,0 +1,19 @@ +package com.muchq.chess_indexer.model; + +import java.time.Instant; +import java.util.UUID; + +public record GameSummary( + UUID id, + String platform, + String gameUuid, + Instant endTime, + String whiteUsername, + int whiteElo, + String blackUsername, + int blackElo, + String result, + String timeClass, + String eco, + boolean rated +) {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/model/IndexRequest.java b/jvm/src/main/java/com/muchq/chess_indexer/model/IndexRequest.java new file mode 100644 index 00000000..f82c12c9 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/model/IndexRequest.java @@ -0,0 +1,18 @@ +package com.muchq.chess_indexer.model; + +import java.time.Instant; +import java.time.LocalDate; +import java.util.UUID; + +public record IndexRequest( + UUID id, + String platform, + String username, + LocalDate startDate, + LocalDate endDate, + IndexRequestStatus status, + int gamesIndexed, + String errorMessage, + Instant createdAt, + Instant updatedAt +) {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/model/IndexRequestStatus.java b/jvm/src/main/java/com/muchq/chess_indexer/model/IndexRequestStatus.java new file mode 100644 index 00000000..ec910b3b --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/model/IndexRequestStatus.java @@ -0,0 +1,9 @@ +package com.muchq.chess_indexer.model; + +public enum IndexRequestStatus { + PENDING, + QUEUED, + RUNNING, + COMPLETED, + FAILED, +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/pgn/PgnGame.java b/jvm/src/main/java/com/muchq/chess_indexer/pgn/PgnGame.java new file mode 100644 index 00000000..8cd1883d --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/pgn/PgnGame.java @@ -0,0 +1,6 @@ +package com.muchq.chess_indexer.pgn; + +import java.util.List; +import java.util.Map; + +public record PgnGame(Map tags, List moves) {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/pgn/PgnParser.java b/jvm/src/main/java/com/muchq/chess_indexer/pgn/PgnParser.java new file mode 100644 index 00000000..f3427885 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/pgn/PgnParser.java @@ -0,0 +1,119 @@ +package com.muchq.chess_indexer.pgn; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class PgnParser { + + private static final Pattern TAG_PATTERN = Pattern.compile("\\[(\\w+)\\s+\"([^\"]*)\"\\]"); + + public PgnGame parse(String pgn) { + Map tags = new HashMap<>(); + StringBuilder movetext = new StringBuilder(); + + for (String line : pgn.split("\\R")) { + String trimmed = line.trim(); + if (trimmed.startsWith("[")) { + Matcher matcher = TAG_PATTERN.matcher(trimmed); + if (matcher.find()) { + tags.put(matcher.group(1), matcher.group(2)); + } + } else if (!trimmed.isEmpty()) { + movetext.append(trimmed).append(' '); + } + } + + String cleaned = stripComments(movetext.toString()); + cleaned = stripVariations(cleaned); + List moves = tokenizeMoves(cleaned); + + return new PgnGame(tags, moves); + } + + private String stripComments(String movetext) { + StringBuilder out = new StringBuilder(); + boolean inBrace = false; + boolean inLineComment = false; + for (int i = 0; i < movetext.length(); i++) { + char c = movetext.charAt(i); + if (inLineComment) { + if (c == '\n') { + inLineComment = false; + } + continue; + } + if (inBrace) { + if (c == '}') { + inBrace = false; + } + continue; + } + if (c == '{') { + inBrace = true; + continue; + } + if (c == ';') { + inLineComment = true; + continue; + } + out.append(c); + } + return out.toString(); + } + + private String stripVariations(String movetext) { + StringBuilder out = new StringBuilder(); + int depth = 0; + for (int i = 0; i < movetext.length(); i++) { + char c = movetext.charAt(i); + if (c == '(') { + depth++; + continue; + } + if (c == ')') { + if (depth > 0) { + depth--; + } + continue; + } + if (depth == 0) { + out.append(c); + } + } + return out.toString(); + } + + private List tokenizeMoves(String movetext) { + List moves = new ArrayList<>(); + for (String raw : movetext.trim().split("\\s+")) { + if (raw.isEmpty()) { + continue; + } + if (isMoveNumber(raw) || isResult(raw) || isNag(raw)) { + continue; + } + moves.add(stripAnnotations(raw)); + } + return moves; + } + + private boolean isMoveNumber(String token) { + return token.matches("\\d+\\.{1,3}") || token.matches("\\d+\\."); + } + + private boolean isResult(String token) { + return token.equals("1-0") || token.equals("0-1") || token.equals("1/2-1/2") || token.equals("*"); + } + + private boolean isNag(String token) { + return token.startsWith("$"); + } + + private String stripAnnotations(String token) { + return token.replaceAll("[!?]+$", ""); + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/AndExpr.java b/jvm/src/main/java/com/muchq/chess_indexer/query/AndExpr.java new file mode 100644 index 00000000..bb6bd13c --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/AndExpr.java @@ -0,0 +1,3 @@ +package com.muchq.chess_indexer.query; + +public record AndExpr(Expr left, Expr right) implements Expr {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/BooleanValue.java b/jvm/src/main/java/com/muchq/chess_indexer/query/BooleanValue.java new file mode 100644 index 00000000..716bc35f --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/BooleanValue.java @@ -0,0 +1,3 @@ +package com.muchq.chess_indexer.query; + +public record BooleanValue(boolean value) implements Value {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/CompareExpr.java b/jvm/src/main/java/com/muchq/chess_indexer/query/CompareExpr.java new file mode 100644 index 00000000..d508db03 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/CompareExpr.java @@ -0,0 +1,3 @@ +package com.muchq.chess_indexer.query; + +public record CompareExpr(Field field, CompareOp op, Value value) implements Expr {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/CompareOp.java b/jvm/src/main/java/com/muchq/chess_indexer/query/CompareOp.java new file mode 100644 index 00000000..43c4363a --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/CompareOp.java @@ -0,0 +1,20 @@ +package com.muchq.chess_indexer.query; + +public enum CompareOp { + EQ("="), + NE("!=") , + LT("<"), + LTE("<="), + GT(">"), + GTE(">="); + + private final String sql; + + CompareOp(String sql) { + this.sql = sql; + } + + public String sql() { + return sql; + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/CompiledQuery.java b/jvm/src/main/java/com/muchq/chess_indexer/query/CompiledQuery.java new file mode 100644 index 00000000..4129baa4 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/CompiledQuery.java @@ -0,0 +1,5 @@ +package com.muchq.chess_indexer.query; + +import java.util.List; + +public record CompiledQuery(String sql, List params) {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/Expr.java b/jvm/src/main/java/com/muchq/chess_indexer/query/Expr.java new file mode 100644 index 00000000..4711181c --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/Expr.java @@ -0,0 +1,4 @@ +package com.muchq.chess_indexer.query; + +public sealed interface Expr permits AndExpr, OrExpr, NotExpr, CompareExpr, InExpr, FuncCallExpr { +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/Field.java b/jvm/src/main/java/com/muchq/chess_indexer/query/Field.java new file mode 100644 index 00000000..6aa8ca38 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/Field.java @@ -0,0 +1,3 @@ +package com.muchq.chess_indexer.query; + +public record Field(String path) {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/FuncCallExpr.java b/jvm/src/main/java/com/muchq/chess_indexer/query/FuncCallExpr.java new file mode 100644 index 00000000..95da7268 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/FuncCallExpr.java @@ -0,0 +1,5 @@ +package com.muchq.chess_indexer.query; + +import java.util.List; + +public record FuncCallExpr(String name, List args) implements Expr {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/IdentValue.java b/jvm/src/main/java/com/muchq/chess_indexer/query/IdentValue.java new file mode 100644 index 00000000..4fdd5c59 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/IdentValue.java @@ -0,0 +1,3 @@ +package com.muchq.chess_indexer.query; + +public record IdentValue(String value) implements Value {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/InExpr.java b/jvm/src/main/java/com/muchq/chess_indexer/query/InExpr.java new file mode 100644 index 00000000..c3d2ff97 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/InExpr.java @@ -0,0 +1,5 @@ +package com.muchq.chess_indexer.query; + +import java.util.List; + +public record InExpr(Field field, List values) implements Expr {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/Lexer.java b/jvm/src/main/java/com/muchq/chess_indexer/query/Lexer.java new file mode 100644 index 00000000..07776057 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/Lexer.java @@ -0,0 +1,148 @@ +package com.muchq.chess_indexer.query; + +import java.util.ArrayList; +import java.util.List; + +public class Lexer { + + private final String input; + private int pos; + + public Lexer(String input) { + this.input = input; + } + + public List tokenize() { + List tokens = new ArrayList<>(); + while (pos < input.length()) { + char c = input.charAt(pos); + if (Character.isWhitespace(c)) { + pos++; + continue; + } + if (c == '(') { + tokens.add(new Token(TokenType.LPAREN, "(")); + pos++; + continue; + } + if (c == ')') { + tokens.add(new Token(TokenType.RPAREN, ")")); + pos++; + continue; + } + if (c == '[') { + tokens.add(new Token(TokenType.LBRACKET, "[")); + pos++; + continue; + } + if (c == ']') { + tokens.add(new Token(TokenType.RBRACKET, "]")); + pos++; + continue; + } + if (c == ',') { + tokens.add(new Token(TokenType.COMMA, ",")); + pos++; + continue; + } + if (c == '"') { + tokens.add(new Token(TokenType.STRING, readString())); + continue; + } + if (isOperatorStart(c)) { + tokens.add(new Token(TokenType.OP, readOperator())); + continue; + } + if (Character.isDigit(c)) { + tokens.add(new Token(TokenType.NUMBER, readNumber())); + continue; + } + if (isIdentStart(c)) { + String ident = readIdent(); + tokens.add(keywordOrIdent(ident)); + continue; + } + throw new IllegalArgumentException("Unexpected character: " + c); + } + tokens.add(new Token(TokenType.EOF, "")); + return tokens; + } + + private boolean isIdentStart(char c) { + return Character.isLetter(c) || c == '_' || c == '.'; + } + + private String readIdent() { + int start = pos; + while (pos < input.length()) { + char c = input.charAt(pos); + if (Character.isLetterOrDigit(c) || c == '_' || c == '.' || c == '-') { + pos++; + } else { + break; + } + } + return input.substring(start, pos); + } + + private Token keywordOrIdent(String ident) { + String upper = ident.toUpperCase(); + switch (upper) { + case "AND": + return new Token(TokenType.AND, ident); + case "OR": + return new Token(TokenType.OR, ident); + case "NOT": + return new Token(TokenType.NOT, ident); + case "IN": + return new Token(TokenType.IN, ident); + case "TRUE": + case "FALSE": + return new Token(TokenType.BOOLEAN, ident.toLowerCase()); + default: + return new Token(TokenType.IDENT, ident); + } + } + + private String readString() { + pos++; // skip opening quote + StringBuilder sb = new StringBuilder(); + while (pos < input.length()) { + char c = input.charAt(pos++); + if (c == '"') { + break; + } + if (c == '\\' && pos < input.length()) { + char next = input.charAt(pos++); + sb.append(next); + } else { + sb.append(c); + } + } + return sb.toString(); + } + + private String readNumber() { + int start = pos; + while (pos < input.length() && Character.isDigit(input.charAt(pos))) { + pos++; + } + return input.substring(start, pos); + } + + private boolean isOperatorStart(char c) { + return c == '=' || c == '!' || c == '<' || c == '>'; + } + + private String readOperator() { + int start = pos; + pos++; + if (pos < input.length()) { + char next = input.charAt(pos); + if ((input.charAt(start) == '!' || input.charAt(start) == '<' || input.charAt(start) == '>') && next == '=') { + pos++; + } + } + return input.substring(start, pos); + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/NotExpr.java b/jvm/src/main/java/com/muchq/chess_indexer/query/NotExpr.java new file mode 100644 index 00000000..88726c96 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/NotExpr.java @@ -0,0 +1,3 @@ +package com.muchq.chess_indexer.query; + +public record NotExpr(Expr expr) implements Expr {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/NumberValue.java b/jvm/src/main/java/com/muchq/chess_indexer/query/NumberValue.java new file mode 100644 index 00000000..0903fcc5 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/NumberValue.java @@ -0,0 +1,3 @@ +package com.muchq.chess_indexer.query; + +public record NumberValue(long value) implements Value {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/OrExpr.java b/jvm/src/main/java/com/muchq/chess_indexer/query/OrExpr.java new file mode 100644 index 00000000..682f5629 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/OrExpr.java @@ -0,0 +1,3 @@ +package com.muchq.chess_indexer.query; + +public record OrExpr(Expr left, Expr right) implements Expr {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/Parser.java b/jvm/src/main/java/com/muchq/chess_indexer/query/Parser.java new file mode 100644 index 00000000..3df3a922 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/Parser.java @@ -0,0 +1,141 @@ +package com.muchq.chess_indexer.query; + +import java.util.ArrayList; +import java.util.List; + +public class Parser { + + private final List tokens; + private int pos; + + public Parser(List tokens) { + this.tokens = tokens; + } + + public Expr parse() { + Expr expr = parseOr(); + expect(TokenType.EOF); + return expr; + } + + private Expr parseOr() { + Expr expr = parseAnd(); + while (match(TokenType.OR)) { + Expr right = parseAnd(); + expr = new OrExpr(expr, right); + } + return expr; + } + + private Expr parseAnd() { + Expr expr = parseNot(); + while (match(TokenType.AND)) { + Expr right = parseNot(); + expr = new AndExpr(expr, right); + } + return expr; + } + + private Expr parseNot() { + if (match(TokenType.NOT)) { + return new NotExpr(parseNot()); + } + return parsePrimary(); + } + + private Expr parsePrimary() { + if (match(TokenType.LPAREN)) { + Expr expr = parseOr(); + expect(TokenType.RPAREN); + return expr; + } + + if (peek(TokenType.IDENT)) { + Token ident = advance(); + if (match(TokenType.LPAREN)) { + List args = new ArrayList<>(); + if (!peek(TokenType.RPAREN)) { + args.add(parseValue()); + while (match(TokenType.COMMA)) { + args.add(parseValue()); + } + } + expect(TokenType.RPAREN); + return new FuncCallExpr(ident.text(), args); + } + + Field field = new Field(ident.text()); + if (match(TokenType.IN)) { + expect(TokenType.LBRACKET); + List values = new ArrayList<>(); + if (!peek(TokenType.RBRACKET)) { + values.add(parseValue()); + while (match(TokenType.COMMA)) { + values.add(parseValue()); + } + } + expect(TokenType.RBRACKET); + return new InExpr(field, values); + } + + if (peek(TokenType.OP)) { + CompareOp op = parseOp(advance().text()); + Value value = parseValue(); + return new CompareExpr(field, op, value); + } + + throw new IllegalArgumentException("Expected operator or function call after field: " + ident.text()); + } + + throw new IllegalArgumentException("Unexpected token: " + peek()); + } + + private CompareOp parseOp(String op) { + return switch (op) { + case "=" -> CompareOp.EQ; + case "!=" -> CompareOp.NE; + case "<" -> CompareOp.LT; + case "<=" -> CompareOp.LTE; + case ">" -> CompareOp.GT; + case ">=" -> CompareOp.GTE; + default -> throw new IllegalArgumentException("Unsupported operator: " + op); + }; + } + + private Value parseValue() { + Token token = advance(); + return switch (token.type()) { + case STRING -> new StringValue(token.text()); + case NUMBER -> new NumberValue(Long.parseLong(token.text())); + case BOOLEAN -> new BooleanValue(Boolean.parseBoolean(token.text())); + case IDENT -> new IdentValue(token.text()); + default -> throw new IllegalArgumentException("Unexpected value token: " + token.type()); + }; + } + + private boolean match(TokenType type) { + if (peek(type)) { + pos++; + return true; + } + return false; + } + + private boolean peek(TokenType type) { + return tokens.get(pos).type() == type; + } + + private Token peek() { + return tokens.get(pos); + } + + private Token advance() { + return tokens.get(pos++); + } + + private void expect(TokenType type) { + if (!match(type)) { + throw new IllegalArgumentException("Expected token " + type + " but found " + peek().type()); + } + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/QueryCompiler.java b/jvm/src/main/java/com/muchq/chess_indexer/query/QueryCompiler.java new file mode 100644 index 00000000..bc61dbb2 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/QueryCompiler.java @@ -0,0 +1,177 @@ +package com.muchq.chess_indexer.query; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class QueryCompiler { + + private static final Map FIELD_MAP = new HashMap<>(); + + static { + FIELD_MAP.put("platform", "g.platform"); + FIELD_MAP.put("game.uuid", "g.game_uuid"); + FIELD_MAP.put("end_time", "g.end_time"); + FIELD_MAP.put("rated", "g.rated"); + FIELD_MAP.put("time.class", "g.time_class"); + FIELD_MAP.put("rules", "g.rules"); + FIELD_MAP.put("eco", "g.eco"); + FIELD_MAP.put("result", "g.result"); + FIELD_MAP.put("white.username", "g.white_username"); + FIELD_MAP.put("white.elo", "g.white_elo"); + FIELD_MAP.put("black.username", "g.black_username"); + FIELD_MAP.put("black.elo", "g.black_elo"); + FIELD_MAP.put("features.total_plies", "f.total_plies"); + FIELD_MAP.put("features.has_castle", "f.has_castle"); + FIELD_MAP.put("features.has_promotion", "f.has_promotion"); + FIELD_MAP.put("features.has_check", "f.has_check"); + FIELD_MAP.put("features.has_checkmate", "f.has_checkmate"); + } + + public CompiledQuery compile(Expr expr, int limit) { + List params = new ArrayList<>(); + String where = compileExpr(expr, params); + String sql = """ + SELECT g.id, g.platform, g.game_uuid, g.end_time, g.white_username, g.white_elo, + g.black_username, g.black_elo, g.result, g.time_class, g.eco, g.rated + FROM games g + LEFT JOIN game_features f ON g.id = f.game_id + WHERE %s + ORDER BY g.end_time DESC + LIMIT ? + """.formatted(where); + params.add(limit); + return new CompiledQuery(sql, params); + } + + private String compileExpr(Expr expr, List params) { + if (expr instanceof AndExpr andExpr) { + return "(" + compileExpr(andExpr.left(), params) + " AND " + compileExpr(andExpr.right(), params) + ")"; + } + if (expr instanceof OrExpr orExpr) { + return "(" + compileExpr(orExpr.left(), params) + " OR " + compileExpr(orExpr.right(), params) + ")"; + } + if (expr instanceof NotExpr notExpr) { + return "(NOT " + compileExpr(notExpr.expr(), params) + ")"; + } + if (expr instanceof CompareExpr compareExpr) { + return compileCompare(compareExpr, params); + } + if (expr instanceof InExpr inExpr) { + return compileIn(inExpr, params); + } + if (expr instanceof FuncCallExpr funcCallExpr) { + return compileFunc(funcCallExpr, params); + } + throw new IllegalArgumentException("Unsupported expression: " + expr.getClass().getSimpleName()); + } + + private String compileCompare(CompareExpr expr, List params) { + String field = expr.field().path(); + if ("player".equals(field)) { + return compilePlayerCompare(expr.op(), expr.value(), params); + } + + String column = resolveField(expr.field()); + params.add(valueToParam(expr.value())); + return column + " " + expr.op().sql() + " ?"; + } + + private String compilePlayerCompare(CompareOp op, Value value, List params) { + Object param = valueToParam(value); + if (op == CompareOp.NE) { + params.add(param); + params.add(param); + return "(g.white_username != ? AND g.black_username != ?)"; + } + params.add(param); + params.add(param); + return "(g.white_username " + op.sql() + " ? OR g.black_username " + op.sql() + " ?)"; + } + + private String compileIn(InExpr expr, List params) { + String field = expr.field().path(); + if ("player".equals(field)) { + return compilePlayerIn(expr.values(), params); + } + + String column = resolveField(expr.field()); + if (expr.values().isEmpty()) { + return "FALSE"; + } + StringBuilder sb = new StringBuilder(); + sb.append(column).append(" IN ("); + for (int i = 0; i < expr.values().size(); i++) { + if (i > 0) { + sb.append(", "); + } + sb.append("?"); + params.add(valueToParam(expr.values().get(i))); + } + sb.append(')'); + return sb.toString(); + } + + private String compilePlayerIn(List values, List params) { + if (values.isEmpty()) { + return "FALSE"; + } + StringBuilder sb = new StringBuilder(); + sb.append("(g.white_username IN ("); + for (int i = 0; i < values.size(); i++) { + if (i > 0) { + sb.append(", "); + } + sb.append("?"); + params.add(valueToParam(values.get(i))); + } + sb.append(") OR g.black_username IN ("); + for (int i = 0; i < values.size(); i++) { + if (i > 0) { + sb.append(", "); + } + sb.append("?"); + params.add(valueToParam(values.get(i))); + } + sb.append("))"); + return sb.toString(); + } + + private String compileFunc(FuncCallExpr func, List params) { + String name = func.name().toLowerCase(); + if ("motif".equals(name)) { + if (func.args().isEmpty()) { + throw new IllegalArgumentException("motif() requires a name"); + } + Object motif = valueToParam(func.args().get(0)); + params.add(motif); + return "EXISTS (SELECT 1 FROM game_motifs m WHERE m.game_id = g.id AND m.motif_name = ?)"; + } + throw new IllegalArgumentException("Unknown function: " + func.name()); + } + + private String resolveField(Field field) { + String column = FIELD_MAP.get(field.path()); + if (column == null) { + throw new IllegalArgumentException("Unknown field: " + field.path()); + } + return column; + } + + private Object valueToParam(Value value) { + if (value instanceof StringValue stringValue) { + return stringValue.value(); + } + if (value instanceof NumberValue numberValue) { + return numberValue.value(); + } + if (value instanceof BooleanValue booleanValue) { + return booleanValue.value(); + } + if (value instanceof IdentValue identValue) { + return identValue.value(); + } + throw new IllegalArgumentException("Unsupported value: " + value.getClass().getSimpleName()); + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/QueryService.java b/jvm/src/main/java/com/muchq/chess_indexer/query/QueryService.java new file mode 100644 index 00000000..fb6e72e7 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/QueryService.java @@ -0,0 +1,30 @@ +package com.muchq.chess_indexer.query; + +import com.muchq.chess_indexer.config.IndexerConfig; +import com.muchq.chess_indexer.db.GameDao; +import com.muchq.chess_indexer.model.GameSummary; +import jakarta.inject.Singleton; +import java.util.List; + +@Singleton +public class QueryService { + + private final GameDao gameDao; + private final IndexerConfig config; + + public QueryService(GameDao gameDao, IndexerConfig config) { + this.gameDao = gameDao; + this.config = config; + } + + public List run(String query) { + Lexer lexer = new Lexer(query); + Parser parser = new Parser(lexer.tokenize()); + Expr expr = parser.parse(); + + QueryCompiler compiler = new QueryCompiler(); + CompiledQuery compiled = compiler.compile(expr, config.apiQueryLimit()); + + return gameDao.query(compiled.sql(), compiled.params()); + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/StringValue.java b/jvm/src/main/java/com/muchq/chess_indexer/query/StringValue.java new file mode 100644 index 00000000..e9404b94 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/StringValue.java @@ -0,0 +1,3 @@ +package com.muchq.chess_indexer.query; + +public record StringValue(String value) implements Value {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/Token.java b/jvm/src/main/java/com/muchq/chess_indexer/query/Token.java new file mode 100644 index 00000000..402d967d --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/Token.java @@ -0,0 +1,3 @@ +package com.muchq.chess_indexer.query; + +public record Token(TokenType type, String text) {} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/TokenType.java b/jvm/src/main/java/com/muchq/chess_indexer/query/TokenType.java new file mode 100644 index 00000000..625a8b8a --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/TokenType.java @@ -0,0 +1,19 @@ +package com.muchq.chess_indexer.query; + +public enum TokenType { + IDENT, + STRING, + NUMBER, + BOOLEAN, + AND, + OR, + NOT, + IN, + LPAREN, + RPAREN, + LBRACKET, + RBRACKET, + COMMA, + OP, + EOF +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/query/Value.java b/jvm/src/main/java/com/muchq/chess_indexer/query/Value.java new file mode 100644 index 00000000..822e8a05 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/query/Value.java @@ -0,0 +1,4 @@ +package com.muchq.chess_indexer.query; + +public sealed interface Value permits NumberValue, StringValue, BooleanValue, IdentValue { +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/queue/IndexRequestQueue.java b/jvm/src/main/java/com/muchq/chess_indexer/queue/IndexRequestQueue.java new file mode 100644 index 00000000..426d7212 --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/queue/IndexRequestQueue.java @@ -0,0 +1,15 @@ +package com.muchq.chess_indexer.queue; + +import java.util.List; + +public interface IndexRequestQueue { + void enqueue(IndexRequestMessage message); + + List poll(int maxMessages, int waitSeconds); + + void delete(QueuedMessage message); + + record IndexRequestMessage(String requestId) {} + + record QueuedMessage(String receiptHandle, String body) {} +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/queue/SqsIndexRequestQueue.java b/jvm/src/main/java/com/muchq/chess_indexer/queue/SqsIndexRequestQueue.java new file mode 100644 index 00000000..0594e30d --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/queue/SqsIndexRequestQueue.java @@ -0,0 +1,64 @@ +package com.muchq.chess_indexer.queue; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.muchq.chess_indexer.config.IndexerConfig; +import jakarta.inject.Singleton; +import java.util.ArrayList; +import java.util.List; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; + +@Singleton +public class SqsIndexRequestQueue implements IndexRequestQueue { + + private final SqsClient sqsClient; + private final IndexerConfig config; + private final ObjectMapper objectMapper; + + public SqsIndexRequestQueue(SqsClient sqsClient, IndexerConfig config, ObjectMapper objectMapper) { + this.sqsClient = sqsClient; + this.config = config; + this.objectMapper = objectMapper; + } + + @Override + public void enqueue(IndexRequestMessage message) { + try { + String body = objectMapper.writeValueAsString(message); + sqsClient.sendMessage(SendMessageRequest.builder() + .queueUrl(config.sqsQueueUrl()) + .messageBody(body) + .build()); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize queue message", e); + } + } + + @Override + public List poll(int maxMessages, int waitSeconds) { + ReceiveMessageRequest request = ReceiveMessageRequest.builder() + .queueUrl(config.sqsQueueUrl()) + .maxNumberOfMessages(maxMessages) + .waitTimeSeconds(waitSeconds) + .build(); + + List messages = sqsClient.receiveMessage(request).messages(); + List queued = new ArrayList<>(messages.size()); + for (Message message : messages) { + queued.add(new QueuedMessage(message.receiptHandle(), message.body())); + } + return queued; + } + + @Override + public void delete(QueuedMessage message) { + sqsClient.deleteMessage(DeleteMessageRequest.builder() + .queueUrl(config.sqsQueueUrl()) + .receiptHandle(message.receiptHandle()) + .build()); + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/worker/IndexerWorker.java b/jvm/src/main/java/com/muchq/chess_indexer/worker/IndexerWorker.java new file mode 100644 index 00000000..11d3addb --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/worker/IndexerWorker.java @@ -0,0 +1,91 @@ +package com.muchq.chess_indexer.worker; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.muchq.chess_indexer.config.IndexerConfig; +import com.muchq.chess_indexer.db.IndexRequestDao; +import com.muchq.chess_indexer.ingest.GameIngestor; +import com.muchq.chess_indexer.model.IndexRequest; +import com.muchq.chess_indexer.model.IndexRequestStatus; +import com.muchq.chess_indexer.queue.IndexRequestQueue; +import jakarta.inject.Singleton; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Singleton +public class IndexerWorker { + + private static final Logger LOG = LoggerFactory.getLogger(IndexerWorker.class); + + private final IndexerConfig config; + private final IndexRequestDao indexRequestDao; + private final IndexRequestQueue queue; + private final GameIngestor ingestor; + private final ObjectMapper objectMapper; + + public IndexerWorker(IndexerConfig config, + IndexRequestDao indexRequestDao, + IndexRequestQueue queue, + GameIngestor ingestor, + ObjectMapper objectMapper) { + this.config = config; + this.indexRequestDao = indexRequestDao; + this.queue = queue; + this.ingestor = ingestor; + this.objectMapper = objectMapper; + } + + public void runForever() { + LOG.info("Starting indexer worker"); + while (true) { + List messages = queue.poll( + config.workerBatchSize(), + config.workerPollSeconds()); + if (messages.isEmpty()) { + continue; + } + + for (IndexRequestQueue.QueuedMessage message : messages) { + handleMessage(message); + } + } + } + + private void handleMessage(IndexRequestQueue.QueuedMessage queuedMessage) { + IndexRequestQueue.IndexRequestMessage payload = parseMessage(queuedMessage.body()); + if (payload == null) { + queue.delete(queuedMessage); + return; + } + + UUID requestId = UUID.fromString(payload.requestId()); + Optional request = indexRequestDao.findById(requestId); + if (request.isEmpty()) { + LOG.warn("Index request {} not found", requestId); + queue.delete(queuedMessage); + return; + } + + indexRequestDao.updateStatus(requestId, IndexRequestStatus.RUNNING, 0, null); + try { + int gamesIndexed = ingestor.ingest(request.get()); + indexRequestDao.updateStatus(requestId, IndexRequestStatus.COMPLETED, gamesIndexed, null); + queue.delete(queuedMessage); + } catch (Exception e) { + LOG.error("Failed to process index request {}", requestId, e); + indexRequestDao.updateStatus(requestId, IndexRequestStatus.FAILED, 0, e.getMessage()); + queue.delete(queuedMessage); + } + } + + private IndexRequestQueue.IndexRequestMessage parseMessage(String body) { + try { + return objectMapper.readValue(body, IndexRequestQueue.IndexRequestMessage.class); + } catch (Exception e) { + LOG.error("Failed to parse queue message: {}", body, e); + return null; + } + } +} diff --git a/jvm/src/main/java/com/muchq/chess_indexer/worker/WorkerApp.java b/jvm/src/main/java/com/muchq/chess_indexer/worker/WorkerApp.java new file mode 100644 index 00000000..6fa24bfa --- /dev/null +++ b/jvm/src/main/java/com/muchq/chess_indexer/worker/WorkerApp.java @@ -0,0 +1,16 @@ +package com.muchq.chess_indexer.worker; + +import io.micronaut.context.ApplicationContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WorkerApp { + private static final Logger LOG = LoggerFactory.getLogger(WorkerApp.class); + + public static void main(String[] args) { + LOG.info("Starting chess indexer worker"); + try (ApplicationContext context = ApplicationContext.run()) { + context.getBean(IndexerWorker.class).runForever(); + } + } +}