diff --git a/polaris-core/src/main/java/org/apache/polaris/core/metrics/iceberg/MetricsRecordConverter.java b/polaris-core/src/main/java/org/apache/polaris/core/metrics/iceberg/MetricsRecordConverter.java new file mode 100644 index 0000000000..0289ce276a --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/metrics/iceberg/MetricsRecordConverter.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.metrics.iceberg; + +import java.time.Instant; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import org.apache.iceberg.metrics.CommitMetricsResult; +import org.apache.iceberg.metrics.CommitReport; +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanMetricsResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.metrics.TimerResult; +import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord; +import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord; + +/** + * Converts Iceberg metrics reports to SPI record types using a fluent builder API. + * + *
This converter extracts all relevant metrics from Iceberg's {@link ScanReport} and {@link + * CommitReport} and combines them with context information to create persistence-ready records. + * + *
Example usage: + * + *
{@code
+ * ScanMetricsRecord record = MetricsRecordConverter.forScanReport(scanReport)
+ * .catalogId(catalog.getId())
+ * .tableId(tableEntity.getId())
+ * .build();
+ * }
+ */
+public final class MetricsRecordConverter {
+
+ private MetricsRecordConverter() {
+ // Utility class
+ }
+
+ /**
+ * Creates a builder for converting a ScanReport to a ScanMetricsRecord.
+ *
+ * @param scanReport the Iceberg scan report
+ * @return builder for configuring the conversion
+ */
+ public static ScanReportBuilder forScanReport(ScanReport scanReport) {
+ return new ScanReportBuilder(scanReport);
+ }
+
+ /**
+ * Creates a builder for converting a CommitReport to a CommitMetricsRecord.
+ *
+ * @param commitReport the Iceberg commit report
+ * @return builder for configuring the conversion
+ */
+ public static CommitReportBuilder forCommitReport(CommitReport commitReport) {
+ return new CommitReportBuilder(commitReport);
+ }
+
+ /** Builder for converting ScanReport to ScanMetricsRecord. */
+ public static final class ScanReportBuilder {
+ private final ScanReport scanReport;
+ private long catalogId;
+ private long tableId;
+ private Instant timestamp;
+
+ private ScanReportBuilder(ScanReport scanReport) {
+ this.scanReport = scanReport;
+ }
+
+ public ScanReportBuilder catalogId(long catalogId) {
+ this.catalogId = catalogId;
+ return this;
+ }
+
+ /**
+ * Sets the table entity ID.
+ *
+ * This is the internal Polaris entity ID for the table. + * + * @param tableId the table entity ID + * @return this builder + */ + public ScanReportBuilder tableId(long tableId) { + this.tableId = tableId; + return this; + } + + /** + * Sets the timestamp for the metrics record. + * + *
This should be the time the metrics report was received by the server, which may differ
+ * from the time it was recorded by the client.
+ *
+ * @param timestamp the timestamp
+ * @return this builder
+ */
+ public ScanReportBuilder timestamp(Instant timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public ScanMetricsRecord build() {
+ ScanMetricsResult metrics = scanReport.scanMetrics();
+ Map This is the internal Polaris entity ID for the table.
+ *
+ * @param tableId the table entity ID
+ * @return this builder
+ */
+ public CommitReportBuilder tableId(long tableId) {
+ this.tableId = tableId;
+ return this;
+ }
+
+ /**
+ * Sets the timestamp for the metrics record.
+ *
+ * This should be the time the metrics report was received by the server, which may differ
+ * from the time it was recorded by the client.
+ *
+ * @param timestamp the timestamp
+ * @return this builder
+ */
+ public CommitReportBuilder timestamp(Instant timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public CommitMetricsRecord build() {
+ CommitMetricsResult metrics = commitReport.commitMetrics();
+ Map This record captures all relevant metrics from an Iceberg {@code CommitReport} along with
+ * contextual information such as catalog identification and table location.
+ *
+ * Common identification fields are inherited from {@link MetricsRecordIdentity}.
+ *
+ * Note: Realm ID is not included in this record. Multi-tenancy realm context should be obtained
+ * from the CDI-injected {@code RealmContext} at persistence time.
+ *
+ * Note: This type is part of the experimental Metrics Persistence SPI and may change in
+ * future releases.
+ */
+@Beta
+@PolarisImmutable
+public interface CommitMetricsRecord extends MetricsRecordIdentity {
+
+ // === Commit Context ===
+
+ /** Snapshot ID created by this commit. */
+ long snapshotId();
+
+ /** Sequence number of the snapshot. */
+ Optional This interface enables different persistence backends (JDBC, NoSQL, custom) to implement
+ * metrics storage in a way appropriate for their storage model, while allowing service code to
+ * remain backend-agnostic.
+ *
+ * Implementations should be idempotent - writing the same reportId twice should have no effect.
+ * Implementations that don't support metrics persistence can use {@link #NOOP} which silently
+ * ignores write operations and returns empty pages for queries.
+ *
+ * This interface is designed to be injected via CDI (Contexts and Dependency Injection). The
+ * deployment module (e.g., {@code polaris-quarkus-service}) should provide a {@code @Produces}
+ * method that creates the appropriate implementation based on the configured persistence backend.
+ *
+ * Example producer:
+ *
+ * Realm context is not passed in the record objects. Implementations should obtain the realm
+ * from the CDI-injected {@code RealmContext} at write/query time. This keeps catalog-specific code
+ * from needing to manage realm concerns directly.
+ *
+ * Query methods use the standard Polaris pagination pattern with {@link PageToken} for requests
+ * and {@link Page} for responses. This enables:
+ *
+ * The {@link ReportIdToken} provides a reference cursor implementation based on report ID
+ * (UUID), but backends may use other cursor strategies internally.
+ *
+ * Note: This SPI is currently experimental and not yet implemented in all persistence
+ * backends. The API may change in future releases.
+ *
+ * @see PageToken
+ * @see Page
+ * @see ReportIdToken
+ */
+@Beta
+public interface MetricsPersistence {
+
+ /** A no-op implementation for backends that don't support metrics persistence. */
+ MetricsPersistence NOOP = new NoOpMetricsPersistence();
+
+ // ============================================================================
+ // Write Operations
+ // ============================================================================
+
+ /**
+ * Persists a scan metrics record.
+ *
+ * This operation is idempotent - writing the same reportId twice has no effect.
+ *
+ * @param record the scan metrics record to persist
+ */
+ void writeScanReport(@Nonnull ScanMetricsRecord record);
+
+ /**
+ * Persists a commit metrics record.
+ *
+ * This operation is idempotent - writing the same reportId twice has no effect.
+ *
+ * @param record the commit metrics record to persist
+ */
+ void writeCommitReport(@Nonnull CommitMetricsRecord record);
+
+ // ============================================================================
+ // Query Operations
+ // ============================================================================
+
+ /**
+ * Queries scan metrics reports based on the specified criteria.
+ *
+ * Example usage:
+ *
+ * This class defines the filter parameters for metrics queries. Pagination is handled separately
+ * via {@link org.apache.polaris.core.persistence.pagination.PageToken}, which is passed as a
+ * separate parameter to query methods. This separation of concerns allows:
+ *
+ * Additional query patterns (e.g., by trace ID) can be implemented by persistence backends using
+ * the {@link #metadata()} filter map. Client-provided correlation data should be stored in the
+ * metrics record's metadata map and can be filtered using the metadata criteria.
+ *
+ * Note: This type is part of the experimental Metrics Persistence SPI and may change in
+ * future releases.
+ *
+ * Pagination is handled via the {@link org.apache.polaris.core.persistence.pagination.PageToken}
+ * passed to query methods. The token contains:
+ *
+ * Query results are returned as {@link org.apache.polaris.core.persistence.pagination.Page}
+ * which includes an encoded token for fetching the next page.
+ *
+ * @see org.apache.polaris.core.persistence.pagination.PageToken
+ * @see org.apache.polaris.core.persistence.pagination.Page
+ * @see ReportIdToken
+ */
+@Beta
+@PolarisImmutable
+public interface MetricsQueryCriteria {
+
+ // === Table Identification (optional) ===
+
+ /**
+ * Catalog ID to filter by.
+ *
+ * This is the internal catalog entity ID. Callers should resolve catalog names to IDs before
+ * querying, as catalog names can change over time.
+ */
+ OptionalLong catalogId();
+
+ /**
+ * Table entity ID to filter by.
+ *
+ * This is the internal table entity ID. Callers should resolve table names to IDs before
+ * querying, as table names can change over time.
+ *
+ * Note: Namespace is intentionally not included as a query filter. Since we query by table ID,
+ * the namespace is implicit. If users want to query by namespace, the service layer should
+ * resolve namespace to table IDs using the current catalog state, then query by those IDs. This
+ * avoids confusion with table moves over time.
+ */
+ OptionalLong tableId();
+
+ // === Time Range ===
+
+ /** Start time for the query (inclusive). */
+ Optional This enables filtering metrics by client-provided correlation data stored in the record's
+ * metadata map. For example, clients may include a trace ID in the metadata that can be queried
+ * later.
+ *
+ * Note: Metadata filtering may require custom indexes depending on the persistence backend.
+ * The OSS codebase provides basic support, but performance optimizations may be needed for
+ * high-volume deployments.
+ */
+ Map This allows the caller to add time ranges and other filters at the call site. This pattern
+ * is useful when table info is resolved in one place and time ranges are added elsewhere.
+ *
+ * Example usage:
+ *
+ * This interface defines the common fields that identify the source of a metrics report,
+ * including the report ID, catalog ID, table ID, timestamp, and metadata.
+ *
+ * Both {@link ScanMetricsRecord} and {@link CommitMetricsRecord} extend this interface to
+ * inherit these common fields while adding their own specific metrics.
+ *
+ * Entity IDs only (no names): We store only catalog ID and table ID, not their names or
+ * namespace paths. Names can change over time (via rename operations), which would make querying
+ * historical metrics by name challenging and lead to correctness issues. Queries should resolve
+ * names to IDs using the current catalog state. The table ID uniquely identifies the table, and the
+ * namespace can be derived from the table entity if needed.
+ *
+ * Realm ID: Realm ID is intentionally not included in this interface. Multi-tenancy realm
+ * context should be obtained from the CDI-injected {@code RealmContext} at persistence time. This
+ * keeps catalog-specific code from needing to manage realm concerns.
+ *
+ * Note: This type is part of the experimental Metrics Persistence SPI and may change in
+ * future releases.
+ */
+@Beta
+public interface MetricsRecordIdentity {
+
+ /**
+ * Unique identifier for this report (UUID).
+ *
+ * This ID is generated when the record is created and serves as the primary key for the
+ * metrics record in persistence storage.
+ */
+ String reportId();
+
+ /**
+ * Internal catalog ID.
+ *
+ * This matches the catalog entity ID in Polaris persistence, as defined by {@code
+ * PolarisEntityCore#getId()}. The catalog name is not stored since it can change over time;
+ * queries should resolve names to IDs using the current catalog state.
+ */
+ long catalogId();
+
+ /**
+ * Internal table entity ID.
+ *
+ * This matches the table entity ID in Polaris persistence, as defined by {@code
+ * PolarisEntityCore#getId()}. The table name is not stored since it can change over time; queries
+ * should resolve names to IDs using the current catalog state. The namespace can be derived from
+ * the table entity if needed.
+ */
+ long tableId();
+
+ /**
+ * Timestamp when the report was received.
+ *
+ * This is the server-side timestamp when the metrics report was processed, not the client-side
+ * timestamp when the operation occurred.
+ */
+ Instant timestamp();
+
+ /**
+ * Additional metadata as key-value pairs.
+ *
+ * This map can contain additional contextual information from the original Iceberg report,
+ * including client-provided trace IDs or other correlation data. Persistence implementations can
+ * store and index specific metadata fields as needed.
+ */
+ Map This implementation is used as the default when a persistence backend does not support metrics
+ * storage. All write operations are silently ignored, and all query operations return empty pages.
+ */
+final class NoOpMetricsPersistence implements MetricsPersistence {
+
+ NoOpMetricsPersistence() {}
+
+ @Override
+ public void writeScanReport(@Nonnull ScanMetricsRecord record) {
+ // No-op
+ }
+
+ @Override
+ public void writeCommitReport(@Nonnull CommitMetricsRecord record) {
+ // No-op
+ }
+
+ @Nonnull
+ @Override
+ public Page Note: This is a reference implementation provided for convenience. It is
+ * not required by the {@link MetricsPersistence} SPI contract. Persistence backends are
+ * free to implement their own {@link Token} subclass optimized for their storage model (e.g.,
+ * timestamp-based cursors, composite keys, continuation tokens).
+ *
+ * Only {@link org.apache.polaris.core.persistence.pagination.PageToken} (for requests) and
+ * {@link org.apache.polaris.core.persistence.pagination.Page} (for responses) are required by the
+ * SPI contract.
+ *
+ * This token enables cursor-based pagination for metrics queries across different storage
+ * backends. The report ID is used as the cursor because it is:
+ *
+ * Each backend implementation can use this cursor value to implement efficient pagination in
+ * whatever way is optimal for that storage system:
+ *
+ * Note: This type is part of the experimental Metrics Persistence SPI and may change in
+ * future releases.
+ */
+@Beta
+@PolarisImmutable
+@JsonSerialize(as = ImmutableReportIdToken.class)
+@JsonDeserialize(as = ImmutableReportIdToken.class)
+public interface ReportIdToken extends Token {
+
+ /** Token type identifier. Short to minimize serialized token size. */
+ String ID = "r";
+
+ /**
+ * The report ID to use as the cursor.
+ *
+ * Results should start after this report ID. This is typically the {@code reportId} of the
+ * last item from the previous page.
+ */
+ @JsonProperty("r")
+ String reportId();
+
+ @Override
+ default String getT() {
+ return ID;
+ }
+
+ /**
+ * Creates a token from a report ID.
+ *
+ * @param reportId the report ID to use as cursor
+ * @return the token, or null if reportId is null
+ */
+ static @Nullable ReportIdToken fromReportId(@Nullable String reportId) {
+ if (reportId == null) {
+ return null;
+ }
+ return ImmutableReportIdToken.builder().reportId(reportId).build();
+ }
+
+ /**
+ * Creates a token from a metrics record.
+ *
+ * @param record the record whose report ID should be used as cursor
+ * @return the token, or null if record is null
+ */
+ static @Nullable ReportIdToken fromRecord(@Nullable ScanMetricsRecord record) {
+ if (record == null) {
+ return null;
+ }
+ return fromReportId(record.reportId());
+ }
+
+ /**
+ * Creates a token from a commit metrics record.
+ *
+ * @param record the record whose report ID should be used as cursor
+ * @return the token, or null if record is null
+ */
+ static @Nullable ReportIdToken fromRecord(@Nullable CommitMetricsRecord record) {
+ if (record == null) {
+ return null;
+ }
+ return fromReportId(record.reportId());
+ }
+
+ /** Token type registration for service loader. */
+ final class ReportIdTokenType implements TokenType {
+ @Override
+ public String id() {
+ return ID;
+ }
+
+ @Override
+ public Class extends Token> javaType() {
+ return ReportIdToken.class;
+ }
+ }
+}
diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/ScanMetricsRecord.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/ScanMetricsRecord.java
new file mode 100644
index 0000000000..44947d8f75
--- /dev/null
+++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/ScanMetricsRecord.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.persistence.metrics;
+
+import com.google.common.annotations.Beta;
+import java.util.List;
+import java.util.Optional;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/**
+ * Backend-agnostic representation of an Iceberg scan metrics report.
+ *
+ * This record captures all relevant metrics from an Iceberg {@code ScanReport} along with
+ * contextual information such as catalog identification and table location.
+ *
+ * Common identification fields are inherited from {@link MetricsRecordIdentity}.
+ *
+ * Note: Realm ID is not included in this record. Multi-tenancy realm context should be obtained
+ * from the CDI-injected {@code RealmContext} at persistence time.
+ *
+ * Note: This type is part of the experimental Metrics Persistence SPI and may change in
+ * future releases.
+ */
+@Beta
+@PolarisImmutable
+public interface ScanMetricsRecord extends MetricsRecordIdentity {
+
+ // === Scan Context ===
+
+ /** Snapshot ID that was scanned. */
+ OptionalDependency Injection
+ *
+ * {@code
+ * @Produces
+ * @RequestScoped
+ * MetricsPersistence metricsPersistence(RealmContext realmContext, PersistenceBackend backend) {
+ * if (backend.supportsMetrics()) {
+ * return backend.createMetricsPersistence(realmContext);
+ * }
+ * return MetricsPersistence.NOOP;
+ * }
+ * }
+ *
+ * Multi-Tenancy
+ *
+ * Pagination
+ *
+ *
+ *
+ *
+ * {@code
+ * // First page
+ * PageToken pageToken = PageToken.fromLimit(100);
+ * Page
+ *
+ * @param criteria the query criteria (filters)
+ * @param pageToken pagination parameters (page size and optional cursor)
+ * @return page of matching scan metrics records with continuation token if more results exist
+ */
+ @Nonnull
+ Page
+ *
+ *
+ * Supported Query Patterns
+ *
+ *
+ *
+ *
+ *
+ * Pattern Fields Used Index Required
+ * By Table + Time catalogId, tableId, startTime, endTime Yes (OSS)
+ * By Time Only startTime, endTime Partial (timestamp index) Pagination
+ *
+ *
+ *
+ *
+ * {@code
+ * MetricsQueryCriteria criteria = MetricsQueryCriteria.forTable(catalogId, tableId)
+ * .startTime(startTime)
+ * .endTime(endTime)
+ * .build();
+ * }
+ *
+ * @param catalogId the catalog entity ID
+ * @param tableId the table entity ID
+ * @return a builder pre-populated with table info, ready for adding time ranges
+ */
+ static ImmutableMetricsQueryCriteria.Builder forTable(long catalogId, long tableId) {
+ return builder().catalogId(catalogId).tableId(tableId);
+ }
+
+ /**
+ * Creates empty criteria (no filters). Useful for pagination-only queries.
+ *
+ * @return empty query criteria
+ */
+ static MetricsQueryCriteria empty() {
+ return builder().build();
+ }
+}
diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/MetricsRecordIdentity.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/MetricsRecordIdentity.java
new file mode 100644
index 0000000000..51f819ea06
--- /dev/null
+++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/MetricsRecordIdentity.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.persistence.metrics;
+
+import com.google.common.annotations.Beta;
+import java.time.Instant;
+import java.util.Map;
+
+/**
+ * Base interface containing common identification fields shared by all metrics records.
+ *
+ * Design Decisions
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *