diff --git a/CHANGELOG.md b/CHANGELOG.md index 8aa4ed3dcf..1eaec3be44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti ### Breaking changes - The (Before/After)CommitTableEvent has been removed. +- The `PolarisMetricsReporter.reportMetric()` method signature has been extended to include a `receivedTimestamp` parameter of type `java.time.Instant`. ### New Features diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 6c30afb9e7..fb54b5572d 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -30,6 +30,7 @@ import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.SecurityContext; +import java.time.Clock; import java.util.EnumSet; import java.util.Optional; import java.util.function.Function; @@ -105,6 +106,7 @@ public class IcebergCatalogAdapter private final Instance externalCatalogFactories; private final StorageAccessConfigProvider storageAccessConfigProvider; private final PolarisMetricsReporter metricsReporter; + private final Clock clock; @Inject public IcebergCatalogAdapter( @@ -122,7 +124,8 @@ public IcebergCatalogAdapter( CatalogHandlerUtils catalogHandlerUtils, @Any Instance externalCatalogFactories, StorageAccessConfigProvider storageAccessConfigProvider, - PolarisMetricsReporter metricsReporter) { + PolarisMetricsReporter metricsReporter, + Clock clock) { this.diagnostics = diagnostics; this.realmContext = realmContext; this.callContext = callContext; @@ -139,6 +142,7 @@ public IcebergCatalogAdapter( this.externalCatalogFactories = externalCatalogFactories; this.storageAccessConfigProvider = storageAccessConfigProvider; this.metricsReporter = metricsReporter; + this.clock = clock; } /** @@ -722,7 +726,8 @@ public Response reportMetrics( Namespace ns = decodeNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table)); - metricsReporter.reportMetric(catalogName, tableIdentifier, reportMetricsRequest.report()); + metricsReporter.reportMetric( + catalogName, tableIdentifier, reportMetricsRequest.report(), clock.instant()); return Response.status(Response.Status.NO_CONTENT).build(); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java index 5c7b4934a9..eaff0219b4 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java @@ -21,32 +21,56 @@ import com.google.common.annotations.VisibleForTesting; import io.smallrye.common.annotation.Identifier; import jakarta.enterprise.context.ApplicationScoped; -import org.apache.commons.lang3.function.TriConsumer; +import java.time.Instant; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.metrics.MetricsReport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Default implementation of {@link PolarisMetricsReporter} that logs metrics to the configured + * logger. + * + *

This implementation is selected when {@code polaris.iceberg-metrics.reporting.type} is set to + * {@code "default"} (the default value). + * + *

By default, logging is disabled. To enable metrics logging, set the logger level for {@code + * org.apache.polaris.service.reporting} to {@code INFO} in your logging configuration. + * + * @see PolarisMetricsReporter + */ @ApplicationScoped @Identifier("default") public class DefaultMetricsReporter implements PolarisMetricsReporter { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMetricsReporter.class); - private final TriConsumer reportConsumer; + private final QuadConsumer reportConsumer; + + /** Functional interface for consuming metrics reports with timestamp. */ + @FunctionalInterface + interface QuadConsumer { + void accept(T1 t1, T2 t2, T3 t3, T4 t4); + } + /** Creates a new DefaultMetricsReporter that logs metrics to the class logger. */ public DefaultMetricsReporter() { this( - (catalogName, table, metricsReport) -> - LOGGER.info("{}.{}: {}", catalogName, table, metricsReport)); + (catalogName, table, metricsReport, receivedTimestamp) -> + LOGGER.info("{}.{} (ts={}): {}", catalogName, table, receivedTimestamp, metricsReport)); } @VisibleForTesting - DefaultMetricsReporter(TriConsumer reportConsumer) { + DefaultMetricsReporter( + QuadConsumer reportConsumer) { this.reportConsumer = reportConsumer; } @Override - public void reportMetric(String catalogName, TableIdentifier table, MetricsReport metricsReport) { - reportConsumer.accept(catalogName, table, metricsReport); + public void reportMetric( + String catalogName, + TableIdentifier table, + MetricsReport metricsReport, + Instant receivedTimestamp) { + reportConsumer.accept(catalogName, table, metricsReport, receivedTimestamp); } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java index 7ffd84f4d8..b27184d559 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java @@ -18,9 +18,39 @@ */ package org.apache.polaris.service.reporting; +import java.time.Instant; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.metrics.MetricsReport; +/** + * SPI interface for reporting Iceberg metrics received by Polaris. + * + *

Implementations can be used to send metrics to external systems for analysis and monitoring. + * Custom implementations can be annotated with appropriate {@code Quarkus} scope and {@link + * io.smallrye.common.annotation.Identifier @Identifier("my-reporter-type")} for CDI discovery. + * + *

The implementation to use is selected via the {@code polaris.iceberg-metrics.reporting.type} + * configuration property, which defaults to {@code "default"}. + * + *

Implementations can inject other CDI beans for context. + * + * @see DefaultMetricsReporter + * @see MetricsReportingConfiguration + */ public interface PolarisMetricsReporter { - public void reportMetric(String catalogName, TableIdentifier table, MetricsReport metricsReport); + + /** + * Reports an Iceberg metrics report for a specific table. + * + * @param catalogName the name of the catalog containing the table + * @param table the identifier of the table the metrics are for + * @param metricsReport the Iceberg metrics report (e.g., {@link + * org.apache.iceberg.metrics.ScanReport} or {@link org.apache.iceberg.metrics.CommitReport}) + * @param receivedTimestamp the timestamp when the metrics were received by Polaris + */ + void reportMetric( + String catalogName, + TableIdentifier table, + MetricsReport metricsReport, + Instant receivedTimestamp); } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java b/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java index dfdde0f3ef..8762c3ed74 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java @@ -21,7 +21,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import org.apache.commons.lang3.function.TriConsumer; +import java.time.Instant; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.metrics.MetricsReport; import org.junit.jupiter.api.Test; @@ -31,14 +31,16 @@ public class DefaultMetricsReporterTest { @Test void testLogging() { @SuppressWarnings("unchecked") - TriConsumer mockConsumer = mock(TriConsumer.class); + DefaultMetricsReporter.QuadConsumer + mockConsumer = mock(DefaultMetricsReporter.QuadConsumer.class); DefaultMetricsReporter reporter = new DefaultMetricsReporter(mockConsumer); String warehouse = "testWarehouse"; TableIdentifier table = TableIdentifier.of("testNamespace", "testTable"); MetricsReport metricsReport = mock(MetricsReport.class); + Instant receivedTimestamp = Instant.ofEpochMilli(1234567890L); - reporter.reportMetric(warehouse, table, metricsReport); + reporter.reportMetric(warehouse, table, metricsReport, receivedTimestamp); - verify(mockConsumer).accept(warehouse, table, metricsReport); + verify(mockConsumer).accept(warehouse, table, metricsReport, receivedTimestamp); } } diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 59af4b5a6c..30303121e1 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -350,7 +350,8 @@ public String getAuthenticationScheme() { catalogHandlerUtils, externalCatalogFactory, storageAccessConfigProvider, - new DefaultMetricsReporter()); + new DefaultMetricsReporter(), + Clock.systemUTC()); // Optionally wrap with event delegator IcebergRestCatalogApiService finalRestCatalogService = catalogService;